diff --git a/internal/capture/stream.go b/internal/capture/stream.go index ddba3eda..bf93d757 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -13,6 +13,8 @@ import ( "demodesk/neko/internal/types/codec" ) +var moveListenerMu = sync.Mutex{} + type StreamManagerCtx struct { logger zerolog.Logger mu sync.Mutex @@ -167,19 +169,36 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp return nil } +// moving listeners between streams ensures, that target pipeline is running +// before listener is added, and stops source pipeline if there are 0 listeners func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamManager) error { - manager.mu.Lock() - defer manager.mu.Unlock() - - targetStream, ok := stream.(*StreamManagerCtx) - if !ok { - return errors.New("stream manager does not support moving listeners") - } - if listener == nil { return errors.New("listener cannot be nil") } + targetStream, ok := stream.(*StreamManagerCtx) + if !ok { + return errors.New("target stream manager does not support moving listeners") + } + + // we need to acquire both mutextes, from source stream and from target stream + // in order to do that safely (without possibility of deadlock) we need third + // global mutex, that ensures atomic locking + + // lock global mutex + moveListenerMu.Lock() + + // lock source stream + manager.mu.Lock() + defer manager.mu.Unlock() + + // lock target stream + targetStream.mu.Lock() + defer targetStream.mu.Unlock() + + // unlock global mutex + moveListenerMu.Unlock() + // start if stopped if err := targetStream.start(); err != nil { return err