stream implement MoveListenerTo.

This commit is contained in:
Miroslav Šedivý 2021-10-01 13:46:10 +02:00
parent 0245c73e2b
commit 27cb473ef9
3 changed files with 94 additions and 93 deletions

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"reflect" "reflect"
"sync" "sync"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -14,8 +13,6 @@ import (
"demodesk/neko/internal/types/codec" "demodesk/neko/internal/types/codec"
) )
const newListenerTimeout = 500 * time.Millisecond
type StreamManagerCtx struct { type StreamManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
@ -32,7 +29,6 @@ type StreamManagerCtx struct {
listeners map[uintptr]*func(sample types.Sample) listeners map[uintptr]*func(sample types.Sample)
listenersMu sync.Mutex listenersMu sync.Mutex
listenersCount int
} }
func streamNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamManagerCtx { func streamNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamManagerCtx {
@ -95,34 +91,27 @@ func (manager *StreamManagerCtx) Codec() codec.RTPCodec {
return manager.codec return manager.codec
} }
func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)) (dispatcher chan interface{}, err error) { func (manager *StreamManagerCtx) start() error {
if listener == nil { if len(manager.listeners) == 0 {
return dispatcher, errors.New("listener cannot be nil")
}
manager.mu.Lock()
defer manager.mu.Unlock()
manager.listenersCount++
if manager.listenersCount == 1 {
err := manager.createPipeline() err := manager.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return dispatcher, err return err
} }
manager.logger.Info().Msgf("first listener, starting") manager.logger.Info().Msgf("first listener, starting")
} }
dispatcher = make(chan interface{}, 1) return nil
go func() { }
select {
case <-time.After(newListenerTimeout):
manager.logger.Warn().Msgf("add listener channel was not called, timeouted")
break
case <-dispatcher:
break
}
func (manager *StreamManagerCtx) stop() {
if len(manager.listeners) == 0 {
manager.destroyPipeline()
manager.logger.Info().Msgf("last listener, stopping")
}
}
func (manager *StreamManagerCtx) addListener(listener *func(sample types.Sample)) {
ptr := reflect.ValueOf(listener).Pointer() ptr := reflect.ValueOf(listener).Pointer()
manager.listenersMu.Lock() manager.listenersMu.Lock()
@ -130,16 +119,9 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)
manager.listenersMu.Unlock() manager.listenersMu.Unlock()
manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener") manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener")
}()
return dispatcher, nil
} }
func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) (dispatcher chan interface{}) { func (manager *StreamManagerCtx) removeListener(listener *func(sample types.Sample)) {
if listener == nil {
return dispatcher
}
ptr := reflect.ValueOf(listener).Pointer() ptr := reflect.ValueOf(listener).Pointer()
manager.listenersMu.Lock() manager.listenersMu.Lock()
@ -147,36 +129,70 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp
manager.listenersMu.Unlock() manager.listenersMu.Unlock()
manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener") manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener")
}
manager.mu.Lock() func (manager *StreamManagerCtx) AddListener(listener *func(sample types.Sample)) error {
manager.listenersCount--
manager.mu.Unlock()
dispatcher = make(chan interface{}, 1)
go func() {
select {
case <-time.After(newListenerTimeout):
manager.logger.Warn().Msgf("remote listener channel was not called, timeouted")
break
case <-dispatcher:
break
}
manager.mu.Lock() manager.mu.Lock()
defer manager.mu.Unlock() defer manager.mu.Unlock()
if manager.listenersCount <= 0 { if listener == nil {
manager.destroyPipeline() return errors.New("listener cannot be nil")
manager.logger.Info().Msgf("last listener, stopping")
} }
if manager.listenersCount < 0 { // start if stopped
manager.listenersCount = 0 if err := manager.start(); err != nil {
manager.logger.Error().Int("listeners-count", manager.listenersCount).Msgf("listener counter is < 0, something is wrong") return err
} }
}()
return dispatcher // add listener
manager.addListener(listener)
return nil
}
func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) error {
manager.mu.Lock()
defer manager.mu.Unlock()
if listener == nil {
return errors.New("listener cannot be nil")
}
// remove listener
manager.removeListener(listener)
// stop if started
manager.stop()
return nil
}
func (manager *StreamManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamManager) error {
manager.mu.Lock()
defer manager.mu.Unlock()
targetStream, ok := stream.(*StreamManagerCtx)
if !ok {
return errors.New("stream manager does not support moving listeners")
}
if listener == nil {
return errors.New("listener cannot be nil")
}
// start if stopped
if err := targetStream.start(); err != nil {
return err
}
// swap listeners
manager.removeListener(listener)
targetStream.addListener(listener)
// stop if started
manager.stop()
return nil
} }
func (manager *StreamManagerCtx) ListenersCount() int { func (manager *StreamManagerCtx) ListenersCount() int {
@ -187,10 +203,7 @@ func (manager *StreamManagerCtx) ListenersCount() int {
} }
func (manager *StreamManagerCtx) Started() bool { func (manager *StreamManagerCtx) Started() bool {
manager.mu.Lock() return manager.ListenersCount() > 0
defer manager.mu.Unlock()
return manager.listenersCount > 0
} }
func (manager *StreamManagerCtx) createPipeline() error { func (manager *StreamManagerCtx) createPipeline() error {

View File

@ -35,11 +35,9 @@ type ScreencastManager interface {
type StreamManager interface { type StreamManager interface {
Codec() codec.RTPCodec Codec() codec.RTPCodec
// starts pipeline if was not running before AddListener(listener *func(sample Sample)) error
// and returns dispatcher channel RemoveListener(listener *func(sample Sample)) error
NewListener(listener *func(sample Sample)) (dispatcher chan interface{}, err error) MoveListenerTo(listener *func(sample Sample), targetStream StreamManager) error
// stops pipeline if it was last listener
RemoveListener(listener *func(sample Sample)) (dispatcher chan interface{})
ListenersCount() int ListenersCount() int
Started() bool Started() bool

View File

@ -33,8 +33,8 @@ func (manager *WebRTCManagerCtx) newPeerStreamTrack(stream types.StreamManager,
}, },
} }
peer.SetStream(stream) err = peer.SetStream(stream)
return peer, nil return peer, err
} }
type PeerStreamTrack struct { type PeerStreamTrack struct {
@ -50,28 +50,18 @@ func (peer *PeerStreamTrack) SetStream(stream types.StreamManager) error {
peer.streamMu.Lock() peer.streamMu.Lock()
defer peer.streamMu.Unlock() defer peer.streamMu.Unlock()
// prepare new listener var err error
addDispatcher, err := stream.NewListener(&peer.listener)
if err != nil {
return err
}
// remove previous listener (in case it existed)
var stopDispatcher chan interface{}
if peer.stream != nil { if peer.stream != nil {
stopDispatcher = peer.stream.RemoveListener(&peer.listener) err = peer.stream.MoveListenerTo(&peer.listener, stream)
} } else {
err = peer.stream.AddListener(&peer.listener)
// add new listener
close(addDispatcher)
// stop old pipeline (in case it existed)
if stopDispatcher != nil {
close(stopDispatcher)
} }
if err != nil {
peer.stream = stream peer.stream = stream
return nil }
return err
} }
func (peer *PeerStreamTrack) RemoveStream() { func (peer *PeerStreamTrack) RemoveStream() {
@ -79,8 +69,8 @@ func (peer *PeerStreamTrack) RemoveStream() {
defer peer.streamMu.Unlock() defer peer.streamMu.Unlock()
if peer.stream != nil { if peer.stream != nil {
dispatcher := peer.stream.RemoveListener(&peer.listener) peer.stream.RemoveListener(&peer.listener)
close(dispatcher) peer.stream = nil
} }
} }