MoveListenerTo ensure atomic execution.

This commit is contained in:
Miroslav Šedivý 2021-10-01 14:41:21 +02:00
parent 27cb473ef9
commit 8960fc2371

View File

@ -13,6 +13,8 @@ import (
"demodesk/neko/internal/types/codec" "demodesk/neko/internal/types/codec"
) )
var moveListenerMu = sync.Mutex{}
type StreamManagerCtx struct { type StreamManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
@ -167,19 +169,36 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp
return nil return nil
} }
// moving listeners between streams ensures, that target pipeline is running
// before listener is added, and stops source pipeline if there are 0 listeners
func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamManager) error { func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamManager) error {
manager.mu.Lock()
defer manager.mu.Unlock()
targetStream, ok := stream.(*StreamManagerCtx)
if !ok {
return errors.New("stream manager does not support moving listeners")
}
if listener == nil { if listener == nil {
return errors.New("listener cannot be nil") return errors.New("listener cannot be nil")
} }
targetStream, ok := stream.(*StreamManagerCtx)
if !ok {
return errors.New("target stream manager does not support moving listeners")
}
// we need to acquire both mutextes, from source stream and from target stream
// in order to do that safely (without possibility of deadlock) we need third
// global mutex, that ensures atomic locking
// lock global mutex
moveListenerMu.Lock()
// lock source stream
manager.mu.Lock()
defer manager.mu.Unlock()
// lock target stream
targetStream.mu.Lock()
defer targetStream.mu.Unlock()
// unlock global mutex
moveListenerMu.Unlock()
// start if stopped // start if stopped
if err := targetStream.start(); err != nil { if err := targetStream.start(); err != nil {
return err return err