diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index c94c48d9..adda86c2 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -4,6 +4,8 @@ import ( "errors" "reflect" "sync" + "sync/atomic" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -23,6 +25,9 @@ type StreamSinkManagerCtx struct { getBitrate func() (int, error) waitForKf bool // wait for a keyframe before sending samples + bitrate uint64 // atomic + brBuckets map[int]float64 + logger zerolog.Logger mu sync.Mutex wg sync.WaitGroup @@ -32,12 +37,13 @@ type StreamSinkManagerCtx struct { pipelineMu sync.Mutex pipelineFn func() (string, error) - listeners map[uintptr]*func(sample types.Sample) - listenersKf map[uintptr]*func(sample types.Sample) // keyframe lobby + listeners map[uintptr]types.SampleListener + listenersKf map[uintptr]types.SampleListener // keyframe lobby listenersMu sync.Mutex // metrics currentListeners prometheus.Gauge + totalBytes prometheus.Counter pipelinesCounter prometheus.Counter pipelinesActive prometheus.Gauge } @@ -54,12 +60,14 @@ func streamSinkNew(c codec.RTPCodec, pipelineFn func() (string, error), id strin // only wait for keyframes if the codec is video waitForKf: c.IsVideo(), + brBuckets: map[int]float64{}, + logger: logger, codec: c, pipelineFn: pipelineFn, - listeners: map[uintptr]*func(sample types.Sample){}, - listenersKf: map[uintptr]*func(sample types.Sample){}, + listeners: map[uintptr]types.SampleListener{}, + listenersKf: map[uintptr]types.SampleListener{}, // metrics currentListeners: promauto.NewGauge(prometheus.GaugeOpts{ @@ -73,6 +81,17 @@ func streamSinkNew(c codec.RTPCodec, pipelineFn func() (string, error), id strin "codec_type": c.Type.String(), }, }), + totalBytes: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "streamsink_bytes", + Namespace: "neko", + Subsystem: "capture", + Help: "Total number of bytes created by the pipeline.", + ConstLabels: map[string]string{ + "video_id": id, + "codec_name": c.Name, + "codec_type": c.Type.String(), + }, + }), pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{ Name: "pipelines_total", Namespace: "neko", @@ -123,6 +142,14 @@ func (manager *StreamSinkManagerCtx) ID() string { } func (manager *StreamSinkManagerCtx) Bitrate() int { + // TODO: fix bitrate switching calculation + // return real bitrate if available + //realBitrate := atomic.LoadUint64(&manager.bitrate) + //if realBitrate != 0 { + // return int(realBitrate) + //} + + // if we do not have function to estimate bitrate, return 0 if manager.getBitrate == nil { return 0 } @@ -161,7 +188,7 @@ func (manager *StreamSinkManagerCtx) stop() { } } -func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sample)) { +func (manager *StreamSinkManagerCtx) addListener(listener types.SampleListener) { ptr := reflect.ValueOf(listener).Pointer() emitKeyframe := false @@ -186,7 +213,7 @@ func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sam } } -func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) { +func (manager *StreamSinkManagerCtx) removeListener(listener types.SampleListener) { ptr := reflect.ValueOf(listener).Pointer() manager.listenersMu.Lock() @@ -198,7 +225,7 @@ func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types. manager.currentListeners.Set(float64(manager.ListenersCount())) } -func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sample)) error { +func (manager *StreamSinkManagerCtx) AddListener(listener types.SampleListener) error { manager.mu.Lock() defer manager.mu.Unlock() @@ -217,7 +244,7 @@ func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sam return nil } -func (manager *StreamSinkManagerCtx) RemoveListener(listener *func(sample types.Sample)) error { +func (manager *StreamSinkManagerCtx) RemoveListener(listener types.SampleListener) error { manager.mu.Lock() defer manager.mu.Unlock() @@ -236,7 +263,7 @@ func (manager *StreamSinkManagerCtx) RemoveListener(listener *func(sample types. // moving listeners between streams ensures, that target pipeline is running // before listener is added, and stops source pipeline if there are 0 listeners -func (manager *StreamSinkManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamSinkManager) error { +func (manager *StreamSinkManagerCtx) MoveListenerTo(listener types.SampleListener, stream types.StreamSinkManager) error { if listener == nil { return errors.New("listener cannot be nil") } @@ -330,20 +357,7 @@ func (manager *StreamSinkManagerCtx) CreatePipeline() error { return } - manager.listenersMu.Lock() - // if is not delta unit -> it can be decoded independently -> it is a keyframe - if manager.waitForKf && !sample.DeltaUnit && len(manager.listenersKf) > 0 { - // if current sample is a keyframe, move listeners from - // keyframe lobby to actual listeners map and clear lobby - for k, v := range manager.listenersKf { - manager.listeners[k] = v - } - manager.listenersKf = make(map[uintptr]*func(sample types.Sample)) - } - for _, emit := range manager.listeners { - (*emit)(sample) - } - manager.listenersMu.Unlock() + manager.onSample(sample) } }() @@ -353,6 +367,51 @@ func (manager *StreamSinkManagerCtx) CreatePipeline() error { return nil } +func (manager *StreamSinkManagerCtx) saveSampleBitrate(timestamp time.Time, delta float64) { + // get unix timestamp in seconds + sec := timestamp.Unix() + // last bucket is timestamp rounded to 3 seconds - 1 second + last := int((sec - 1) % 3) + // current bucket is timestamp rounded to 3 seconds + curr := int(sec % 3) + // next bucket is timestamp rounded to 3 seconds + 1 second + next := int((sec + 1) % 3) + + if manager.brBuckets[next] != 0 { + // atomic update bitrate + atomic.StoreUint64(&manager.bitrate, uint64(manager.brBuckets[last])) + // empty next bucket + manager.brBuckets[next] = 0 + } + + // add rate to current bucket + manager.brBuckets[curr] += delta +} + +func (manager *StreamSinkManagerCtx) onSample(sample types.Sample) { + manager.listenersMu.Lock() + defer manager.listenersMu.Unlock() + + // save to metrics + length := float64(sample.Length) + manager.totalBytes.Add(length) + manager.saveSampleBitrate(sample.Timestamp, length) + + // if is not delta unit -> it can be decoded independently -> it is a keyframe + if manager.waitForKf && !sample.DeltaUnit && len(manager.listenersKf) > 0 { + // if current sample is a keyframe, move listeners from + // keyframe lobby to actual listeners map and clear lobby + for k, v := range manager.listenersKf { + manager.listeners[k] = v + } + manager.listenersKf = make(map[uintptr]types.SampleListener) + } + + for _, l := range manager.listeners { + l.WriteSample(sample) + } +} + func (manager *StreamSinkManagerCtx) DestroyPipeline() { manager.pipelineMu.Lock() defer manager.pipelineMu.Unlock() @@ -366,4 +425,7 @@ func (manager *StreamSinkManagerCtx) DestroyPipeline() { manager.pipeline = nil manager.pipelinesActive.Set(0) + + manager.brBuckets = make(map[int]float64) + atomic.StoreUint64(&manager.bitrate, 0) } diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index 1dc0ef96..5b2005f1 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -321,8 +321,13 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int, videoAuto = false } + videoRtcp := make(chan []rtcp.Packet, 1) + // video track - videoTrack, err := NewTrack(logger, videoCodec, connection, WithVideoAuto(videoAuto)) + videoTrack, err := NewTrack(logger, videoCodec, connection, + WithVideoAuto(videoAuto), + WithRtcpChan(videoRtcp), + ) if err != nil { return nil, err } @@ -570,7 +575,9 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int, if err = video.RemoveReceiver(videoTrack); err != nil { logger.Err(err).Msg("failed to remove video receiver") } - audioTrack.RemoveStream() + audioTrack.Shutdown() + videoTrack.Shutdown() + close(videoRtcp) } manager.metrics.SetState(session, state) @@ -651,17 +658,24 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int, }) }) - videoTrack.OnRTCP(func(packets []rtcp.Packet) { - for _, p := range packets { - if rtcpPacket, ok := p.(*rtcp.ReceiverReport); ok { - l := len(rtcpPacket.Reports) - if l > 0 { - // use only last report - manager.metrics.SetReceiverReport(session, rtcpPacket.Reports[l-1]) + go func() { + for { + packets, ok := <-videoRtcp + if !ok { + break + } + + for _, p := range packets { + if rtcpPacket, ok := p.(*rtcp.ReceiverReport); ok { + l := len(rtcpPacket.Reports) + if l > 0 { + // use only last report + manager.metrics.SetReceiverReport(session, rtcpPacket.Reports[l-1]) + } } } } - }) + }() go func() { ticker := time.NewTicker(5 * time.Second) diff --git a/internal/webrtc/track.go b/internal/webrtc/track.go index e00d1e5e..886de03f 100644 --- a/internal/webrtc/track.go +++ b/internal/webrtc/track.go @@ -16,9 +16,11 @@ import ( ) type Track struct { - logger zerolog.Logger - track *webrtc.TrackLocalStaticSample - listener func(sample types.Sample) + logger zerolog.Logger + track *webrtc.TrackLocalStaticSample + + rtcpCh chan []rtcp.Packet + sample chan types.Sample videoAuto bool videoAutoMu sync.RWMutex @@ -27,9 +29,6 @@ type Track struct { stream types.StreamSinkManager streamMu sync.Mutex - onRtcp func([]rtcp.Packet) - onRtcpMu sync.RWMutex - bitrateChange func(int) (bool, error) videoChange func(string) (bool, error) } @@ -42,6 +41,12 @@ func WithVideoAuto(auto bool) option { } } +func WithRtcpChan(rtcp chan []rtcp.Packet) option { + return func(t *Track) { + t.rtcpCh = rtcp + } +} + func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.PeerConnection, opts ...option) (*Track, error) { id := codec.Type.String() track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream") @@ -54,49 +59,66 @@ func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.Pe t := &Track{ logger: logger, track: track, + rtcpCh: make(chan []rtcp.Packet), + sample: make(chan types.Sample), } for _, opt := range opts { opt(t) } - t.listener = func(sample types.Sample) { - err := track.WriteSample(media.Sample{ - Data: sample.Data, - Duration: sample.Duration, - }) - if err != nil && !errors.Is(err, io.ErrClosedPipe) { - logger.Warn().Err(err).Msg("failed to write sample to track") - } - } - sender, err := connection.AddTrack(t.track) if err != nil { return nil, err } go t.rtcpReader(sender) + go t.sampleReader() return t, nil } +func (t *Track) Shutdown() { + t.RemoveStream() + close(t.sample) +} + func (t *Track) rtcpReader(sender *webrtc.RTPSender) { for { packets, _, err := sender.ReadRTCP() if err != nil { - if err == io.EOF || err == io.ErrClosedPipe { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { + t.logger.Debug().Msg("track rtcp reader closed") return } - t.logger.Err(err).Msg("RTCP read error") + t.logger.Warn().Err(err).Msg("failed to read track rtcp") continue } - t.onRtcpMu.RLock() - if t.onRtcp != nil { - go t.onRtcp(packets) + if t.rtcpCh != nil { + t.rtcpCh <- packets + } + } +} + +func (t *Track) sampleReader() { + for { + sample, ok := <-t.sample + if !ok { + t.logger.Debug().Msg("track sample reader closed") + return + } + + err := t.track.WriteSample(media.Sample{ + Data: sample.Data, + Duration: sample.Duration, + Timestamp: sample.Timestamp, + }) + + if err != nil && !errors.Is(err, io.ErrClosedPipe) { + t.logger.Warn().Err(err).Msg("failed to write sample to track") } - t.onRtcpMu.RUnlock() } } @@ -117,9 +139,9 @@ func (t *Track) SetStream(stream types.StreamSinkManager) (bool, error) { var err error if t.stream != nil { - err = t.stream.MoveListenerTo(&t.listener, stream) + err = t.stream.MoveListenerTo(t, stream) } else { - err = stream.AddListener(&t.listener) + err = stream.AddListener(t) } if err != nil { return false, err @@ -138,7 +160,7 @@ func (t *Track) RemoveStream() { return } - err := t.stream.RemoveListener(&t.listener) + err := t.stream.RemoveListener(t) if err != nil { t.logger.Warn().Err(err).Msg("failed to remove listener from stream") } @@ -157,9 +179,9 @@ func (t *Track) SetPaused(paused bool) { var err error if paused { - err = t.stream.RemoveListener(&t.listener) + err = t.stream.RemoveListener(t) } else { - err = t.stream.AddListener(&t.listener) + err = t.stream.AddListener(t) } if err != nil { t.logger.Warn().Err(err).Msg("failed to change listener state") @@ -169,11 +191,8 @@ func (t *Track) SetPaused(paused bool) { t.paused = paused } -func (t *Track) OnRTCP(f func([]rtcp.Packet)) { - t.onRtcpMu.Lock() - defer t.onRtcpMu.Unlock() - - t.onRtcp = f +func (t *Track) WriteSample(sample types.Sample) { + t.sample <- sample } func (t *Track) SetBitrate(bitrate int) (bool, error) { diff --git a/pkg/gst/gst.go b/pkg/gst/gst.go index 9d4fb874..45432b4f 100644 --- a/pkg/gst/gst.go +++ b/pkg/gst/gst.go @@ -200,8 +200,8 @@ func CheckPlugins(plugins []string) error { } //export goHandlePipelineBuffer -func goHandlePipelineBuffer(pipelineID C.int, buf unsafe.Pointer, bufLen C.int, duration C.guint64, deltaUnit C.gboolean) { - defer C.free(buf) +func goHandlePipelineBuffer(pipelineID C.int, buf C.gpointer, bufLen C.int, duration C.guint64, deltaUnit C.gboolean) { + defer C.g_free(buf) pipelinesLock.Lock() pipeline, ok := pipelines[int(pipelineID)] @@ -209,7 +209,9 @@ func goHandlePipelineBuffer(pipelineID C.int, buf unsafe.Pointer, bufLen C.int, if ok { pipeline.sample <- types.Sample{ - Data: C.GoBytes(buf, bufLen), + Data: C.GoBytes(unsafe.Pointer(buf), bufLen), + Length: int(bufLen), + Timestamp: time.Now(), Duration: time.Duration(duration), DeltaUnit: deltaUnit == C.TRUE, } diff --git a/pkg/types/capture.go b/pkg/types/capture.go index 72a83cfd..485161a4 100644 --- a/pkg/types/capture.go +++ b/pkg/types/capture.go @@ -17,11 +17,20 @@ var ( ) type Sample struct { - Data []byte + // buffer with encoded media + Data []byte + Length int + // timing information + Timestamp time.Time Duration time.Duration + // metadata DeltaUnit bool // this unit cannot be decoded independently. } +type SampleListener interface { + WriteSample(Sample) +} + type Receiver interface { SetStream(stream StreamSinkManager) (changed bool, err error) RemoveStream() @@ -60,9 +69,9 @@ type StreamSinkManager interface { Codec() codec.RTPCodec Bitrate() int - AddListener(listener *func(sample Sample)) error - RemoveListener(listener *func(sample Sample)) error - MoveListenerTo(listener *func(sample Sample), targetStream StreamSinkManager) error + AddListener(listener SampleListener) error + RemoveListener(listener SampleListener) error + MoveListenerTo(listener SampleListener, targetStream StreamSinkManager) error ListenersCount() int Started() bool