RemoveListener add dispatcher.

This commit is contained in:
Miroslav Šedivý 2021-09-29 01:03:39 +02:00
parent 5bae3503d0
commit 325af8fc5a
3 changed files with 25 additions and 7 deletions

View File

@ -135,9 +135,9 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)
return dispatcher, nil
}
func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) {
func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) (dispatcher chan interface{}) {
if listener == nil {
return
return dispatcher
}
ptr := reflect.ValueOf(listener).Pointer()
@ -152,7 +152,16 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp
manager.listenersCount--
manager.mu.Unlock()
dispatcher = make(chan interface{}, 1)
go func() {
select {
case <-time.After(newListenerTimeout):
manager.logger.Warn().Msgf("remote listener channel was not called, timeouted")
break
case <-dispatcher:
break
}
manager.mu.Lock()
defer manager.mu.Unlock()
@ -166,6 +175,8 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp
manager.logger.Error().Int("listeners-count", manager.listenersCount).Msgf("listener counter is < 0, something is wrong")
}
}()
return dispatcher
}
func (manager *StreamManagerCtx) ListenersCount() int {

View File

@ -39,7 +39,7 @@ type StreamManager interface {
// and returns dispatcher channel
NewListener(listener *func(sample Sample)) (dispatcher chan interface{}, err error)
// stops pipeline if it was last listener
RemoveListener(listener *func(sample Sample))
RemoveListener(listener *func(sample Sample)) (dispatcher chan interface{})
ListenersCount() int
Started() bool

View File

@ -52,18 +52,24 @@ func (peer *PeerTrack) SetStream(stream types.StreamManager) error {
defer peer.streamMu.Unlock()
// prepare new listener
dispatcher, err := stream.NewListener(&peer.listener)
addDispatcher, err := stream.NewListener(&peer.listener)
if err != nil {
return err
}
// remove previous listener (in case it existed)
var stopDispatcher chan interface{}
if peer.stream != nil {
peer.stream.RemoveListener(&peer.listener)
stopDispatcher = peer.stream.RemoveListener(&peer.listener)
}
// add new listener
close(dispatcher)
close(addDispatcher)
// stop old pipeline (in case it existed)
if stopDispatcher != nil {
close(stopDispatcher)
}
peer.stream = stream
return nil
@ -74,7 +80,8 @@ func (peer *PeerTrack) RemoveStream() {
defer peer.streamMu.Unlock()
if peer.stream != nil {
peer.stream.RemoveListener(&peer.listener)
dispatcher := peer.stream.RemoveListener(&peer.listener)
close(dispatcher)
}
}