From c676d7a3a21b756edb9dcf60db9dd88999370832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Mon, 10 Apr 2023 22:24:16 +0200 Subject: [PATCH] move collectors to metrics. --- internal/webrtc/manager.go | 74 ++--------------------------------- internal/webrtc/metrics.go | 79 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 71 deletions(-) diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index 88c4cdeb..5a5ae8ee 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -309,7 +309,6 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int, } // audio track - audioTrack, err := NewTrack(logger, audioCodec, connection) if err != nil { return nil, err @@ -593,76 +592,9 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int, }) }) - go func() { - for { - packets, ok := <-videoRtcp - if !ok { - break - } - - for _, p := range packets { - switch rtcpPacket := p.(type) { - case *rtcp.ReceiverEstimatedMaximumBitrate: // TODO: Deprecated. - metrics.SetReceiverEstimatedMaximumBitrate(rtcpPacket.Bitrate) - - case *rtcp.ReceiverReport: - l := len(rtcpPacket.Reports) - if l > 0 { - // use only last report - metrics.SetReceiverReport(rtcpPacket.Reports[l-1]) - } - } - } - } - }() - - go func() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for range ticker.C { - if connection.ConnectionState() == webrtc.PeerConnectionStateClosed { - break - } - - stats := connection.GetStats() - data, ok := stats["iceTransport"].(webrtc.TransportStats) - if ok { - metrics.SetIceTransportStats(data) - } - - data, ok = stats["sctpTransport"].(webrtc.TransportStats) - if ok { - metrics.SetSctpTransportStats(data) - } - - remoteCandidates := map[string]webrtc.ICECandidateStats{} - nominatedRemoteCandidates := map[string]struct{}{} - for _, entry := range stats { - // only remote ice candidate stats - candidate, ok := entry.(webrtc.ICECandidateStats) - if ok && candidate.Type == webrtc.StatsTypeRemoteCandidate { - metrics.NewICECandidate(candidate) - remoteCandidates[candidate.ID] = candidate - } - - // only nominated ice candidate pair stats - pair, ok := entry.(webrtc.ICECandidatePairStats) - if ok && pair.Nominated { - nominatedRemoteCandidates[pair.RemoteCandidateID] = struct{}{} - } - } - - iceCandidatesUsed := []webrtc.ICECandidateStats{} - for id := range nominatedRemoteCandidates { - if candidate, ok := remoteCandidates[id]; ok { - iceCandidatesUsed = append(iceCandidatesUsed, candidate) - } - } - - metrics.SetICECandidatesUsed(iceCandidatesUsed) - } - }() + // start metrics collectors + go metrics.rtcpReceiver(videoRtcp) + go metrics.connectionStats(connection) return offer, nil } diff --git a/internal/webrtc/metrics.go b/internal/webrtc/metrics.go index 025bb4ba..513e86a4 100644 --- a/internal/webrtc/metrics.go +++ b/internal/webrtc/metrics.go @@ -2,6 +2,7 @@ package webrtc import ( "sync" + "time" "github.com/demodesk/neko/pkg/types" "github.com/pion/rtcp" @@ -10,6 +11,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) +var connectionStatsInterval = 5 * time.Second + type metricsManager struct { mu sync.Mutex @@ -357,3 +360,79 @@ func (met *metrics) SetSctpTransportStats(data webrtc.TransportStats) { met.sctpBytesSent.Set(float64(data.BytesSent)) met.sctpBytesReceived.Set(float64(data.BytesReceived)) } + +// +// collectors +// + +func (met *metrics) rtcpReceiver(rtcpCh chan []rtcp.Packet) { + for { + packets, ok := <-rtcpCh + if !ok { + break + } + + for _, p := range packets { + switch rtcpPacket := p.(type) { + case *rtcp.ReceiverEstimatedMaximumBitrate: // TODO: Deprecated. + met.SetReceiverEstimatedMaximumBitrate(rtcpPacket.Bitrate) + + case *rtcp.ReceiverReport: + l := len(rtcpPacket.Reports) + if l > 0 { + // use only last report + met.SetReceiverReport(rtcpPacket.Reports[l-1]) + } + } + } + } +} + +func (met *metrics) connectionStats(connection *webrtc.PeerConnection) { + ticker := time.NewTicker(connectionStatsInterval) + defer ticker.Stop() + + for range ticker.C { + if connection.ConnectionState() == webrtc.PeerConnectionStateClosed { + break + } + + stats := connection.GetStats() + + data, ok := stats["iceTransport"].(webrtc.TransportStats) + if ok { + met.SetIceTransportStats(data) + } + + data, ok = stats["sctpTransport"].(webrtc.TransportStats) + if ok { + met.SetSctpTransportStats(data) + } + + remoteCandidates := map[string]webrtc.ICECandidateStats{} + nominatedRemoteCandidates := map[string]struct{}{} + for _, entry := range stats { + // only remote ice candidate stats + candidate, ok := entry.(webrtc.ICECandidateStats) + if ok && candidate.Type == webrtc.StatsTypeRemoteCandidate { + met.NewICECandidate(candidate) + remoteCandidates[candidate.ID] = candidate + } + + // only nominated ice candidate pair stats + pair, ok := entry.(webrtc.ICECandidatePairStats) + if ok && pair.Nominated { + nominatedRemoteCandidates[pair.RemoteCandidateID] = struct{}{} + } + } + + iceCandidatesUsed := []webrtc.ICECandidateStats{} + for id := range nominatedRemoteCandidates { + if candidate, ok := remoteCandidates[id]; ok { + iceCandidatesUsed = append(iceCandidatesUsed, candidate) + } + } + + met.SetICECandidatesUsed(iceCandidatesUsed) + } +}