From 527b1f08f8143cd3d0b73ca0c7f770901a94e721 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sat, 25 Jun 2022 20:12:42 +0200 Subject: [PATCH] add webrtc metrtics. --- internal/webrtc/manager.go | 32 +++++++++-- internal/webrtc/metrics.go | 115 +++++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 internal/webrtc/metrics.go diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index fbe7c3f8..3d5c32c7 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -36,8 +36,9 @@ const rtcpPLIInterval = 3 * time.Second func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx { return &WebRTCManagerCtx{ - logger: log.With().Str("module", "webrtc").Logger(), - config: config, + logger: log.With().Str("module", "webrtc").Logger(), + config: config, + metrics: newMetrics(), desktop: desktop, capture: capture, @@ -47,9 +48,10 @@ func New(desktop types.DesktopManager, capture types.CaptureManager, config *con } type WebRTCManagerCtx struct { - logger zerolog.Logger - config *config.WebRTC - peerId int32 + logger zerolog.Logger + config *config.WebRTC + metrics *metricsCtx + peerId int32 desktop types.DesktopManager capture types.CaptureManager @@ -121,6 +123,7 @@ func (manager *WebRTCManagerCtx) ICEServers() []types.ICEServer { func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID string) (*webrtc.SessionDescription, error) { id := atomic.AddInt32(&manager.peerId, 1) + manager.metrics.NewConnection(session) // add session id to logger context logger := manager.logger.With().Str("session_id", session.ID()).Int32("peer_id", id).Logger() @@ -315,6 +318,8 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin videoTrack.RemoveStream() audioTrack.RemoveStream() } + + manager.metrics.SetState(session, state) }) cursorImage := func(entry *cursor.ImageEntry) { @@ -392,5 +397,22 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin }) }) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for range ticker.C { + if connection.ConnectionState() == webrtc.PeerConnectionStateClosed { + break + } + + stats := connection.GetStats() + data, ok := stats["iceTransport"].(webrtc.TransportStats) + if ok { + manager.metrics.SetTransportStats(session, data) + } + } + }() + return offer, nil } diff --git a/internal/webrtc/metrics.go b/internal/webrtc/metrics.go new file mode 100644 index 00000000..57e719e6 --- /dev/null +++ b/internal/webrtc/metrics.go @@ -0,0 +1,115 @@ +package webrtc + +import ( + "sync" + + "github.com/pion/webrtc/v3" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "gitlab.com/demodesk/neko/server/pkg/types" +) + +type metrics struct { + connectionState prometheus.Gauge + connectionCount prometheus.Counter + + bytesSent prometheus.Gauge + bytesReceived prometheus.Gauge +} + +type metricsCtx struct { + mu sync.Mutex + + sessions map[string]metrics +} + +func newMetrics() *metricsCtx { + return &metricsCtx{ + sessions: map[string]metrics{}, + } +} + +func (m *metricsCtx) getBySession(session types.Session) metrics { + m.mu.Lock() + defer m.mu.Unlock() + + met, ok := m.sessions[session.ID()] + if ok { + return met + } + + met = metrics{ + connectionState: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "connection_state", + Namespace: "neko", + Subsystem: "webrtc", + Help: "Connection state of session.", + ConstLabels: map[string]string{ + "session_id": session.ID(), + }, + }), + connectionCount: promauto.NewCounter(prometheus.CounterOpts{ + Name: "connection_count", + Namespace: "neko", + Subsystem: "webrtc", + Help: "Connection count of a session.", + ConstLabels: map[string]string{ + "session_id": session.ID(), + }, + }), + bytesSent: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "bytes_sent", + Namespace: "neko", + Subsystem: "webrtc", + Help: "Sent bytes to a session.", + ConstLabels: map[string]string{ + "session_id": session.ID(), + }, + }), + bytesReceived: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "bytes_received", + Namespace: "neko", + Subsystem: "webrtc", + Help: "Received bytes from a session.", + ConstLabels: map[string]string{ + "session_id": session.ID(), + }, + }), + } + + m.sessions[session.ID()] = met + return met +} + +func (m *metricsCtx) NewConnection(session types.Session) { + met := m.getBySession(session) + met.connectionCount.Add(1) +} + +func (m *metricsCtx) SetState(session types.Session, state webrtc.PeerConnectionState) { + met := m.getBySession(session) + + switch state { + case webrtc.PeerConnectionStateNew: + met.connectionState.Set(0) + case webrtc.PeerConnectionStateConnecting: + met.connectionState.Set(4) + case webrtc.PeerConnectionStateConnected: + met.connectionState.Set(5) + case webrtc.PeerConnectionStateDisconnected: + met.connectionState.Set(3) + case webrtc.PeerConnectionStateFailed: + met.connectionState.Set(2) + case webrtc.PeerConnectionStateClosed: + met.connectionState.Set(1) + default: + met.connectionState.Set(-1) + } +} + +func (m *metricsCtx) SetTransportStats(session types.Session, data webrtc.TransportStats) { + met := m.getBySession(session) + + met.bytesSent.Set(float64(data.BytesSent)) + met.bytesReceived.Set(float64(data.BytesReceived)) +}