capture metrics.

This commit is contained in:
Miroslav Šedivý 2022-06-15 00:23:16 +02:00
parent 0952be6a94
commit 6deaa64884
4 changed files with 113 additions and 0 deletions

View File

@ -4,6 +4,8 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -21,6 +23,9 @@ type BroacastManagerCtx struct {
url string url string
started bool started bool
// metrics
pipelinesCounter prometheus.Counter
} }
func broadcastNew(pipelineStr string) *BroacastManagerCtx { func broadcastNew(pipelineStr string) *BroacastManagerCtx {
@ -34,6 +39,14 @@ func broadcastNew(pipelineStr string) *BroacastManagerCtx {
pipelineStr: pipelineStr, pipelineStr: pipelineStr,
url: "", url: "",
started: false, started: false,
// metrics
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture_broadcast",
Help: "Total number of created pipelines.",
}),
} }
} }
@ -102,6 +115,8 @@ func (manager *BroacastManagerCtx) createPipeline() error {
} }
manager.pipeline.Play() manager.pipeline.Play()
manager.pipelinesCounter.Inc()
return nil return nil
} }

View File

@ -6,6 +6,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -32,6 +34,10 @@ type ScreencastManagerCtx struct {
enabled bool enabled bool
started bool started bool
expired int32 expired int32
// metrics
imagesCounter prometheus.Counter
pipelinesCounter prometheus.Counter
} }
func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
@ -46,6 +52,20 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
tickerStop: make(chan struct{}), tickerStop: make(chan struct{}),
enabled: enabled, enabled: enabled,
started: false, started: false,
// metrics
imagesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "images_total",
Namespace: "neko",
Subsystem: "capture_screencast",
Help: "Total number of created images.",
}),
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture_screencast",
Help: "Total number of created pipelines.",
}),
} }
manager.wg.Add(1) manager.wg.Add(1)
@ -158,6 +178,7 @@ func (manager *ScreencastManagerCtx) createPipeline() error {
manager.pipeline.AttachAppsink("appsink") manager.pipeline.AttachAppsink("appsink")
manager.pipeline.Play() manager.pipeline.Play()
manager.pipelinesCounter.Inc()
// get first image // get first image
select { select {
@ -168,6 +189,8 @@ func (manager *ScreencastManagerCtx) createPipeline() error {
manager.imageMu.Lock() manager.imageMu.Lock()
manager.image = image manager.image = image
manager.imageMu.Unlock() manager.imageMu.Unlock()
manager.imagesCounter.Inc()
} }
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
return errors.New("timeouted while waiting for first image") return errors.New("timeouted while waiting for first image")
@ -190,6 +213,8 @@ func (manager *ScreencastManagerCtx) createPipeline() error {
manager.imageMu.Lock() manager.imageMu.Lock()
manager.image = image manager.image = image
manager.imageMu.Unlock() manager.imageMu.Unlock()
manager.imagesCounter.Inc()
} }
}() }()

View File

@ -5,6 +5,9 @@ import (
"reflect" "reflect"
"sync" "sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -27,6 +30,10 @@ type StreamSinkManagerCtx struct {
listeners map[uintptr]*func(sample types.Sample) listeners map[uintptr]*func(sample types.Sample)
listenersMu sync.Mutex listenersMu sync.Mutex
// metrics
currentListeners prometheus.Gauge
pipelinesCounter prometheus.Counter
} }
func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx { func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx {
@ -40,6 +47,28 @@ func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id str
codec: codec, codec: codec,
pipelineStr: pipelineStr, pipelineStr: pipelineStr,
listeners: map[uintptr]*func(sample types.Sample){}, listeners: map[uintptr]*func(sample types.Sample){},
// metrics
currentListeners: promauto.NewGauge(prometheus.GaugeOpts{
Name: "listeners",
Namespace: "neko",
Subsystem: "capture_streamsink",
Help: "Current number of listeners for a pipeline.",
ConstLabels: map[string]string{
"video_id": video_id,
"codec": codec.Name,
},
}),
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture_streamsink",
Help: "Total number of created pipelines.",
ConstLabels: map[string]string{
"video_id": video_id,
"codec": codec.Name,
},
}),
} }
return manager return manager
@ -90,6 +119,7 @@ func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sam
manager.listenersMu.Unlock() manager.listenersMu.Unlock()
manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener") manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener")
manager.currentListeners.Set(float64(manager.ListenersCount()))
} }
func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) { func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) {
@ -100,6 +130,7 @@ func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.
manager.listenersMu.Unlock() manager.listenersMu.Unlock()
manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener") manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener")
manager.currentListeners.Set(float64(manager.ListenersCount()))
} }
func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sample)) error { func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sample)) error {
@ -240,6 +271,7 @@ func (manager *StreamSinkManagerCtx) createPipeline() error {
} }
}() }()
manager.pipelinesCounter.Inc()
return nil return nil
} }

View File

@ -4,6 +4,8 @@ import (
"errors" "errors"
"sync" "sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -21,6 +23,10 @@ type StreamSrcManagerCtx struct {
pipeline *gst.Pipeline pipeline *gst.Pipeline
pipelineMu sync.Mutex pipelineMu sync.Mutex
pipelineStr string pipelineStr string
// metrics
pushedData map[string]prometheus.Summary
pipelinesCounter map[string]prometheus.Counter
} }
func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx { func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx {
@ -29,10 +35,39 @@ func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string
Str("submodule", "stream-src"). Str("submodule", "stream-src").
Str("video_id", video_id).Logger() Str("video_id", video_id).Logger()
pushedData := map[string]prometheus.Summary{}
pipelinesCounter := map[string]prometheus.Counter{}
for codec := range codecPipeline {
pushedData[codec] = promauto.NewSummary(prometheus.SummaryOpts{
Name: "data_bytes",
Namespace: "neko",
Subsystem: "capture_streamsrc",
Help: "Data pushed to a pipeline (in bytes).",
ConstLabels: map[string]string{
"video_id": video_id,
"codec": codec,
},
})
pipelinesCounter[codec] = promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture_streamsrc",
Help: "Total number of created pipelines.",
ConstLabels: map[string]string{
"video_id": video_id,
"codec": codec,
},
})
}
return &StreamSrcManagerCtx{ return &StreamSrcManagerCtx{
logger: logger, logger: logger,
enabled: enabled, enabled: enabled,
codecPipeline: codecPipeline, codecPipeline: codecPipeline,
// metrics
pushedData: pushedData,
pipelinesCounter: pipelinesCounter,
} }
} }
@ -86,6 +121,8 @@ func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error {
manager.pipeline.AttachAppsrc("appsrc") manager.pipeline.AttachAppsrc("appsrc")
manager.pipeline.Play() manager.pipeline.Play()
manager.pipelinesCounter[codec.Name].Inc()
return nil return nil
} }
@ -111,8 +148,12 @@ func (manager *StreamSrcManagerCtx) Push(bytes []byte) {
} }
manager.pipeline.Push(bytes) manager.pipeline.Push(bytes)
manager.pushedData[manager.codec.Name].Observe(float64(len(bytes)))
} }
func (manager *StreamSrcManagerCtx) Started() bool { func (manager *StreamSrcManagerCtx) Started() bool {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
return manager.pipeline != nil return manager.pipeline != nil
} }