add webrtc metrtics.

This commit is contained in:
Miroslav Šedivý 2022-06-25 20:12:42 +02:00
parent 555fd803bc
commit 527b1f08f8
2 changed files with 142 additions and 5 deletions

View File

@ -38,6 +38,7 @@ func New(desktop types.DesktopManager, capture types.CaptureManager, config *con
return &WebRTCManagerCtx{ return &WebRTCManagerCtx{
logger: log.With().Str("module", "webrtc").Logger(), logger: log.With().Str("module", "webrtc").Logger(),
config: config, config: config,
metrics: newMetrics(),
desktop: desktop, desktop: desktop,
capture: capture, capture: capture,
@ -49,6 +50,7 @@ func New(desktop types.DesktopManager, capture types.CaptureManager, config *con
type WebRTCManagerCtx struct { type WebRTCManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
config *config.WebRTC config *config.WebRTC
metrics *metricsCtx
peerId int32 peerId int32
desktop types.DesktopManager desktop types.DesktopManager
@ -121,6 +123,7 @@ func (manager *WebRTCManagerCtx) ICEServers() []types.ICEServer {
func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID string) (*webrtc.SessionDescription, error) { func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID string) (*webrtc.SessionDescription, error) {
id := atomic.AddInt32(&manager.peerId, 1) id := atomic.AddInt32(&manager.peerId, 1)
manager.metrics.NewConnection(session)
// add session id to logger context // add session id to logger context
logger := manager.logger.With().Str("session_id", session.ID()).Int32("peer_id", id).Logger() logger := manager.logger.With().Str("session_id", session.ID()).Int32("peer_id", id).Logger()
@ -315,6 +318,8 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin
videoTrack.RemoveStream() videoTrack.RemoveStream()
audioTrack.RemoveStream() audioTrack.RemoveStream()
} }
manager.metrics.SetState(session, state)
}) })
cursorImage := func(entry *cursor.ImageEntry) { cursorImage := func(entry *cursor.ImageEntry) {
@ -392,5 +397,22 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin
}) })
}) })
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if connection.ConnectionState() == webrtc.PeerConnectionStateClosed {
break
}
stats := connection.GetStats()
data, ok := stats["iceTransport"].(webrtc.TransportStats)
if ok {
manager.metrics.SetTransportStats(session, data)
}
}
}()
return offer, nil return offer, nil
} }

115
internal/webrtc/metrics.go Normal file
View File

@ -0,0 +1,115 @@
package webrtc
import (
"sync"
"github.com/pion/webrtc/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"gitlab.com/demodesk/neko/server/pkg/types"
)
type metrics struct {
connectionState prometheus.Gauge
connectionCount prometheus.Counter
bytesSent prometheus.Gauge
bytesReceived prometheus.Gauge
}
type metricsCtx struct {
mu sync.Mutex
sessions map[string]metrics
}
func newMetrics() *metricsCtx {
return &metricsCtx{
sessions: map[string]metrics{},
}
}
func (m *metricsCtx) getBySession(session types.Session) metrics {
m.mu.Lock()
defer m.mu.Unlock()
met, ok := m.sessions[session.ID()]
if ok {
return met
}
met = metrics{
connectionState: promauto.NewGauge(prometheus.GaugeOpts{
Name: "connection_state",
Namespace: "neko",
Subsystem: "webrtc",
Help: "Connection state of session.",
ConstLabels: map[string]string{
"session_id": session.ID(),
},
}),
connectionCount: promauto.NewCounter(prometheus.CounterOpts{
Name: "connection_count",
Namespace: "neko",
Subsystem: "webrtc",
Help: "Connection count of a session.",
ConstLabels: map[string]string{
"session_id": session.ID(),
},
}),
bytesSent: promauto.NewGauge(prometheus.GaugeOpts{
Name: "bytes_sent",
Namespace: "neko",
Subsystem: "webrtc",
Help: "Sent bytes to a session.",
ConstLabels: map[string]string{
"session_id": session.ID(),
},
}),
bytesReceived: promauto.NewGauge(prometheus.GaugeOpts{
Name: "bytes_received",
Namespace: "neko",
Subsystem: "webrtc",
Help: "Received bytes from a session.",
ConstLabels: map[string]string{
"session_id": session.ID(),
},
}),
}
m.sessions[session.ID()] = met
return met
}
func (m *metricsCtx) NewConnection(session types.Session) {
met := m.getBySession(session)
met.connectionCount.Add(1)
}
func (m *metricsCtx) SetState(session types.Session, state webrtc.PeerConnectionState) {
met := m.getBySession(session)
switch state {
case webrtc.PeerConnectionStateNew:
met.connectionState.Set(0)
case webrtc.PeerConnectionStateConnecting:
met.connectionState.Set(4)
case webrtc.PeerConnectionStateConnected:
met.connectionState.Set(5)
case webrtc.PeerConnectionStateDisconnected:
met.connectionState.Set(3)
case webrtc.PeerConnectionStateFailed:
met.connectionState.Set(2)
case webrtc.PeerConnectionStateClosed:
met.connectionState.Set(1)
default:
met.connectionState.Set(-1)
}
}
func (m *metricsCtx) SetTransportStats(session types.Session, data webrtc.TransportStats) {
met := m.getBySession(session)
met.bytesSent.Set(float64(data.BytesSent))
met.bytesReceived.Set(float64(data.BytesReceived))
}