stream listener use channel dispatcher.

This commit is contained in:
Miroslav Šedivý 2021-09-29 00:22:51 +02:00
parent 18fbdb2ff8
commit 77d2ef9aaf
3 changed files with 25 additions and 10 deletions

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"reflect" "reflect"
"sync" "sync"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -13,12 +14,14 @@ import (
"demodesk/neko/internal/types/codec" "demodesk/neko/internal/types/codec"
) )
const newListenerTimeout = 500 * time.Millisecond
type StreamManagerCtx struct { type StreamManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
codec codec.RTPCodec
codec codec.RTPCodec
pipeline *gst.Pipeline pipeline *gst.Pipeline
pipelineMu sync.Mutex pipelineMu sync.Mutex
pipelineStr func() string pipelineStr func() string
@ -92,9 +95,9 @@ func (manager *StreamManagerCtx) Codec() codec.RTPCodec {
return manager.codec return manager.codec
} }
func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)) (addListener func(), err error) { func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)) (dispatcher chan interface{}, err error) {
if listener == nil { if listener == nil {
return addListener, errors.New("listener cannot be nil") return dispatcher, errors.New("listener cannot be nil")
} }
manager.mu.Lock() manager.mu.Lock()
@ -104,13 +107,22 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)
if manager.listenersCount == 1 { if manager.listenersCount == 1 {
err := manager.createPipeline() err := manager.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return addListener, err return dispatcher, err
} }
manager.logger.Info().Msgf("first listener, starting") manager.logger.Info().Msgf("first listener, starting")
} }
return func() { dispatcher = make(chan interface{}, 1)
go func() {
select {
case <-time.After(newListenerTimeout):
manager.logger.Warn().Msgf("add listener channel was not called, timeouted")
break
case <-dispatcher:
break
}
ptr := reflect.ValueOf(listener).Pointer() ptr := reflect.ValueOf(listener).Pointer()
manager.listenersMu.Lock() manager.listenersMu.Lock()
@ -118,7 +130,9 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)
manager.listenersMu.Unlock() manager.listenersMu.Unlock()
manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener") manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener")
}, nil }()
return dispatcher, nil
} }
func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) { func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) {

View File

@ -35,8 +35,9 @@ type ScreencastManager interface {
type StreamManager interface { type StreamManager interface {
Codec() codec.RTPCodec Codec() codec.RTPCodec
// starts pipeline if was not running before and returns register function // starts pipeline if was not running before
NewListener(listener *func(sample Sample)) (addListener func(), err error) // and returns dispatcher channel
NewListener(listener *func(sample Sample)) (dispatcher chan interface{}, err error)
// stops pipeline if it was last listener // stops pipeline if it was last listener
RemoveListener(listener *func(sample Sample)) RemoveListener(listener *func(sample Sample))

View File

@ -52,7 +52,7 @@ func (peer *PeerTrack) SetStream(stream types.StreamManager) error {
defer peer.streamMu.Unlock() defer peer.streamMu.Unlock()
// prepare new listener // prepare new listener
addListener, err := stream.NewListener(&peer.listener) dispatcher, err := stream.NewListener(&peer.listener)
if err != nil { if err != nil {
return err return err
} }
@ -63,7 +63,7 @@ func (peer *PeerTrack) SetStream(stream types.StreamManager) error {
} }
// add new listener // add new listener
addListener() close(dispatcher)
peer.stream = stream peer.stream = stream
return nil return nil