add active pipelines.

This commit is contained in:
Miroslav Šedivý 2022-06-19 01:03:16 +02:00
parent 5ab4848580
commit 555fd803bc
4 changed files with 78 additions and 11 deletions

View File

@ -26,6 +26,7 @@ type BroacastManagerCtx struct {
// metrics // metrics
pipelinesCounter prometheus.Counter pipelinesCounter prometheus.Counter
pipelinesActive prometheus.Gauge
} }
func broadcastNew(pipelineStr string) *BroacastManagerCtx { func broadcastNew(pipelineStr string) *BroacastManagerCtx {
@ -53,6 +54,18 @@ func broadcastNew(pipelineStr string) *BroacastManagerCtx {
"codec_type": "-", "codec_type": "-",
}, },
}), }),
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "broadcast",
"video_id": "main",
"codec_name": "-",
"codec_type": "-",
},
}),
} }
} }
@ -122,6 +135,7 @@ func (manager *BroacastManagerCtx) createPipeline() error {
manager.pipeline.Play() manager.pipeline.Play()
manager.pipelinesCounter.Inc() manager.pipelinesCounter.Inc()
manager.pipelinesActive.Set(1)
return nil return nil
} }
@ -137,4 +151,6 @@ func (manager *BroacastManagerCtx) destroyPipeline() {
manager.pipeline.Destroy() manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline") manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
manager.pipelinesActive.Set(0)
} }

View File

@ -38,6 +38,7 @@ type ScreencastManagerCtx struct {
// metrics // metrics
imagesCounter prometheus.Counter imagesCounter prometheus.Counter
pipelinesCounter prometheus.Counter pipelinesCounter prometheus.Counter
pipelinesActive prometheus.Gauge
} }
func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
@ -72,6 +73,18 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
"codec_type": "-", "codec_type": "-",
}, },
}), }),
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "screencast",
"video_id": "main",
"codec_name": "-",
"codec_type": "-",
},
}),
} }
manager.wg.Add(1) manager.wg.Add(1)
@ -185,6 +198,7 @@ func (manager *ScreencastManagerCtx) createPipeline() error {
manager.pipeline.AttachAppsink("appsink") manager.pipeline.AttachAppsink("appsink")
manager.pipeline.Play() manager.pipeline.Play()
manager.pipelinesCounter.Inc() manager.pipelinesCounter.Inc()
manager.pipelinesActive.Set(1)
// get first image // get first image
select { select {
@ -192,11 +206,7 @@ func (manager *ScreencastManagerCtx) createPipeline() error {
if !ok { if !ok {
return errors.New("unable to get first image") return errors.New("unable to get first image")
} else { } else {
manager.imageMu.Lock() manager.setImage(image)
manager.image = image
manager.imageMu.Unlock()
manager.imagesCounter.Inc()
} }
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
return errors.New("timeouted while waiting for first image") return errors.New("timeouted while waiting for first image")
@ -216,15 +226,19 @@ func (manager *ScreencastManagerCtx) createPipeline() error {
return return
} }
manager.setImage(image)
}
}()
return nil
}
func (manager *ScreencastManagerCtx) setImage(image types.Sample) {
manager.imageMu.Lock() manager.imageMu.Lock()
manager.image = image manager.image = image
manager.imageMu.Unlock() manager.imageMu.Unlock()
manager.imagesCounter.Inc() manager.imagesCounter.Inc()
}
}()
return nil
} }
func (manager *ScreencastManagerCtx) destroyPipeline() { func (manager *ScreencastManagerCtx) destroyPipeline() {
@ -238,4 +252,6 @@ func (manager *ScreencastManagerCtx) destroyPipeline() {
manager.pipeline.Destroy() manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline") manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
manager.pipelinesActive.Set(0)
} }

View File

@ -34,6 +34,7 @@ type StreamSinkManagerCtx struct {
// metrics // metrics
currentListeners prometheus.Gauge currentListeners prometheus.Gauge
pipelinesCounter prometheus.Counter pipelinesCounter prometheus.Counter
pipelinesActive prometheus.Gauge
} }
func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx { func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx {
@ -72,6 +73,18 @@ func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id str
"codec_type": codec.Type.String(), "codec_type": codec.Type.String(),
}, },
}), }),
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "streamsink",
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
}),
} }
return manager return manager
@ -275,6 +288,8 @@ func (manager *StreamSinkManagerCtx) createPipeline() error {
}() }()
manager.pipelinesCounter.Inc() manager.pipelinesCounter.Inc()
manager.pipelinesActive.Set(1)
return nil return nil
} }
@ -289,4 +304,6 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() {
manager.pipeline.Destroy() manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline") manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
manager.pipelinesActive.Set(0)
} }

View File

@ -27,6 +27,7 @@ type StreamSrcManagerCtx struct {
// metrics // metrics
pushedData map[string]prometheus.Summary pushedData map[string]prometheus.Summary
pipelinesCounter map[string]prometheus.Counter pipelinesCounter map[string]prometheus.Counter
pipelinesActive map[string]prometheus.Gauge
} }
func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx { func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx {
@ -37,6 +38,7 @@ func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string
pushedData := map[string]prometheus.Summary{} pushedData := map[string]prometheus.Summary{}
pipelinesCounter := map[string]prometheus.Counter{} pipelinesCounter := map[string]prometheus.Counter{}
pipelinesActive := map[string]prometheus.Gauge{}
for codecName, pipeline := range codecPipeline { for codecName, pipeline := range codecPipeline {
codec, ok := codec.ParseStr(codecName) codec, ok := codec.ParseStr(codecName)
if !ok { if !ok {
@ -69,6 +71,18 @@ func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string
"codec_type": codec.Type.String(), "codec_type": codec.Type.String(),
}, },
}) })
pipelinesActive[codecName] = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "streamsrc",
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
})
} }
return &StreamSrcManagerCtx{ return &StreamSrcManagerCtx{
@ -133,7 +147,9 @@ func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error {
manager.pipeline.AttachAppsrc("appsrc") manager.pipeline.AttachAppsrc("appsrc")
manager.pipeline.Play() manager.pipeline.Play()
manager.pipelinesCounter[codec.Name].Inc() manager.pipelinesCounter[manager.codec.Name].Inc()
manager.pipelinesActive[manager.codec.Name].Set(1)
return nil return nil
} }
@ -148,6 +164,8 @@ func (manager *StreamSrcManagerCtx) Stop() {
manager.pipeline.Destroy() manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline") manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
manager.pipelinesActive[manager.codec.Name].Set(0)
} }
func (manager *StreamSrcManagerCtx) Push(bytes []byte) { func (manager *StreamSrcManagerCtx) Push(bytes []byte) {