diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index 7d26df65..bb6d27be 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -4,6 +4,8 @@ import ( "strings" "sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -21,6 +23,9 @@ type BroacastManagerCtx struct { url string started bool + + // metrics + pipelinesCounter prometheus.Counter } func broadcastNew(pipelineStr string) *BroacastManagerCtx { @@ -34,6 +39,14 @@ func broadcastNew(pipelineStr string) *BroacastManagerCtx { pipelineStr: pipelineStr, url: "", started: false, + + // metrics + pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{ + Name: "pipelines_total", + Namespace: "neko", + Subsystem: "capture_broadcast", + Help: "Total number of created pipelines.", + }), } } @@ -102,6 +115,8 @@ func (manager *BroacastManagerCtx) createPipeline() error { } manager.pipeline.Play() + manager.pipelinesCounter.Inc() + return nil } diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index 82d59e18..d6b815a4 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -6,6 +6,8 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -32,6 +34,10 @@ type ScreencastManagerCtx struct { enabled bool started bool expired int32 + + // metrics + imagesCounter prometheus.Counter + pipelinesCounter prometheus.Counter } func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { @@ -46,6 +52,20 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { tickerStop: make(chan struct{}), enabled: enabled, started: false, + + // metrics + imagesCounter: promauto.NewCounter(prometheus.CounterOpts{ + Name: "images_total", + Namespace: "neko", + Subsystem: "capture_screencast", + Help: "Total number of created images.", + }), + pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{ + Name: "pipelines_total", + Namespace: "neko", + Subsystem: "capture_screencast", + Help: "Total number of created pipelines.", + }), } manager.wg.Add(1) @@ -158,6 +178,7 @@ func (manager *ScreencastManagerCtx) createPipeline() error { manager.pipeline.AttachAppsink("appsink") manager.pipeline.Play() + manager.pipelinesCounter.Inc() // get first image select { @@ -168,6 +189,8 @@ func (manager *ScreencastManagerCtx) createPipeline() error { manager.imageMu.Lock() manager.image = image manager.imageMu.Unlock() + + manager.imagesCounter.Inc() } case <-time.After(1 * time.Second): return errors.New("timeouted while waiting for first image") @@ -190,6 +213,8 @@ func (manager *ScreencastManagerCtx) createPipeline() error { manager.imageMu.Lock() manager.image = image manager.imageMu.Unlock() + + manager.imagesCounter.Inc() } }() diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index 22d1f64e..8b1faad4 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -5,6 +5,9 @@ import ( "reflect" "sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -27,6 +30,10 @@ type StreamSinkManagerCtx struct { listeners map[uintptr]*func(sample types.Sample) listenersMu sync.Mutex + + // metrics + currentListeners prometheus.Gauge + pipelinesCounter prometheus.Counter } func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx { @@ -40,6 +47,28 @@ func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id str codec: codec, pipelineStr: pipelineStr, listeners: map[uintptr]*func(sample types.Sample){}, + + // metrics + currentListeners: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "listeners", + Namespace: "neko", + Subsystem: "capture_streamsink", + Help: "Current number of listeners for a pipeline.", + ConstLabels: map[string]string{ + "video_id": video_id, + "codec": codec.Name, + }, + }), + pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{ + Name: "pipelines_total", + Namespace: "neko", + Subsystem: "capture_streamsink", + Help: "Total number of created pipelines.", + ConstLabels: map[string]string{ + "video_id": video_id, + "codec": codec.Name, + }, + }), } return manager @@ -90,6 +119,7 @@ func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sam manager.listenersMu.Unlock() manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener") + manager.currentListeners.Set(float64(manager.ListenersCount())) } func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) { @@ -100,6 +130,7 @@ func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types. manager.listenersMu.Unlock() manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener") + manager.currentListeners.Set(float64(manager.ListenersCount())) } func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sample)) error { @@ -240,6 +271,7 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { } }() + manager.pipelinesCounter.Inc() return nil } diff --git a/internal/capture/streamsrc.go b/internal/capture/streamsrc.go index 368f6e0f..c6583670 100644 --- a/internal/capture/streamsrc.go +++ b/internal/capture/streamsrc.go @@ -4,6 +4,8 @@ import ( "errors" "sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -21,6 +23,10 @@ type StreamSrcManagerCtx struct { pipeline *gst.Pipeline pipelineMu sync.Mutex pipelineStr string + + // metrics + pushedData map[string]prometheus.Summary + pipelinesCounter map[string]prometheus.Counter } func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx { @@ -29,10 +35,39 @@ func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string Str("submodule", "stream-src"). Str("video_id", video_id).Logger() + pushedData := map[string]prometheus.Summary{} + pipelinesCounter := map[string]prometheus.Counter{} + for codec := range codecPipeline { + pushedData[codec] = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "data_bytes", + Namespace: "neko", + Subsystem: "capture_streamsrc", + Help: "Data pushed to a pipeline (in bytes).", + ConstLabels: map[string]string{ + "video_id": video_id, + "codec": codec, + }, + }) + pipelinesCounter[codec] = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pipelines_total", + Namespace: "neko", + Subsystem: "capture_streamsrc", + Help: "Total number of created pipelines.", + ConstLabels: map[string]string{ + "video_id": video_id, + "codec": codec, + }, + }) + } + return &StreamSrcManagerCtx{ logger: logger, enabled: enabled, codecPipeline: codecPipeline, + + // metrics + pushedData: pushedData, + pipelinesCounter: pipelinesCounter, } } @@ -86,6 +121,8 @@ func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error { manager.pipeline.AttachAppsrc("appsrc") manager.pipeline.Play() + + manager.pipelinesCounter[codec.Name].Inc() return nil } @@ -111,8 +148,12 @@ func (manager *StreamSrcManagerCtx) Push(bytes []byte) { } manager.pipeline.Push(bytes) + manager.pushedData[manager.codec.Name].Observe(float64(len(bytes))) } func (manager *StreamSrcManagerCtx) Started() bool { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + return manager.pipeline != nil }