diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index c084e869..c94c48d9 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -163,9 +163,12 @@ func (manager *StreamSinkManagerCtx) stop() { func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sample)) { ptr := reflect.ValueOf(listener).Pointer() + emitKeyframe := false manager.listenersMu.Lock() if manager.waitForKf { + // if this is the first listener, we need to emit a keyframe + emitKeyframe = len(manager.listenersKf) == 0 // if we're waiting for a keyframe, add it to the keyframe lobby manager.listenersKf[ptr] = listener } else { @@ -178,7 +181,7 @@ func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sam manager.currentListeners.Set(float64(manager.ListenersCount())) // if we will be waiting for a keyframe, emit one now - if manager.pipeline != nil && manager.waitForKf { + if manager.pipeline != nil && emitKeyframe { manager.pipeline.EmitVideoKeyframe() } } diff --git a/internal/webrtc/track.go b/internal/webrtc/track.go index 6e53b44c..e00d1e5e 100644 --- a/internal/webrtc/track.go +++ b/internal/webrtc/track.go @@ -16,13 +16,14 @@ import ( ) type Track struct { - logger zerolog.Logger - track *webrtc.TrackLocalStaticSample - paused bool + logger zerolog.Logger + track *webrtc.TrackLocalStaticSample + listener func(sample types.Sample) + videoAuto bool videoAutoMu sync.RWMutex - listener func(sample types.Sample) + paused bool stream types.StreamSinkManager streamMu sync.Mutex @@ -60,10 +61,6 @@ func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.Pe } t.listener = func(sample types.Sample) { - if t.paused { - return - } - err := track.WriteSample(media.Sample{ Data: sample.Data, Duration: sample.Duration, @@ -112,6 +109,12 @@ func (t *Track) SetStream(stream types.StreamSinkManager) (bool, error) { return false, nil } + // if paused, we switch the stream but don't add the listener + if t.paused { + t.stream = stream + return true, nil + } + var err error if t.stream != nil { err = t.stream.MoveListenerTo(&t.listener, stream) @@ -123,7 +126,6 @@ func (t *Track) SetStream(stream types.StreamSinkManager) (bool, error) { } t.stream = stream - return true, nil } @@ -131,13 +133,39 @@ func (t *Track) RemoveStream() { t.streamMu.Lock() defer t.streamMu.Unlock() - if t.stream != nil { - _ = t.stream.RemoveListener(&t.listener) - t.stream = nil + // if there is no stream, or paused, do nothing + if t.stream == nil || t.paused { + return } + + err := t.stream.RemoveListener(&t.listener) + if err != nil { + t.logger.Warn().Err(err).Msg("failed to remove listener from stream") + } + + t.stream = nil } func (t *Track) SetPaused(paused bool) { + t.streamMu.Lock() + defer t.streamMu.Unlock() + + // if there is no state change or no stream, do nothing + if t.paused == paused || t.stream == nil { + return + } + + var err error + if paused { + err = t.stream.RemoveListener(&t.listener) + } else { + err = t.stream.AddListener(&t.listener) + } + if err != nil { + t.logger.Warn().Err(err).Msg("failed to change listener state") + return + } + t.paused = paused }