From 77d2ef9aaf424f8be559e37d8d37e584be8986a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Wed, 29 Sep 2021 00:22:51 +0200 Subject: [PATCH] stream listener use channel dispatcher. --- internal/capture/stream.go | 26 ++++++++++++++++++++------ internal/types/capture.go | 5 +++-- internal/webrtc/peertrack.go | 4 ++-- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/internal/capture/stream.go b/internal/capture/stream.go index 44608cdb..1c0ee341 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -4,6 +4,7 @@ import ( "errors" "reflect" "sync" + "time" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -13,12 +14,14 @@ import ( "demodesk/neko/internal/types/codec" ) +const newListenerTimeout = 500 * time.Millisecond + type StreamManagerCtx struct { logger zerolog.Logger mu sync.Mutex wg sync.WaitGroup - codec codec.RTPCodec + codec codec.RTPCodec pipeline *gst.Pipeline pipelineMu sync.Mutex pipelineStr func() string @@ -92,9 +95,9 @@ func (manager *StreamManagerCtx) Codec() codec.RTPCodec { return manager.codec } -func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)) (addListener func(), err error) { +func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample)) (dispatcher chan interface{}, err error) { if listener == nil { - return addListener, errors.New("listener cannot be nil") + return dispatcher, errors.New("listener cannot be nil") } manager.mu.Lock() @@ -104,13 +107,22 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample) if manager.listenersCount == 1 { err := manager.createPipeline() if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) { - return addListener, err + return dispatcher, err } manager.logger.Info().Msgf("first listener, starting") } - return func() { + dispatcher = make(chan interface{}, 1) + go func() { + select { + case <-time.After(newListenerTimeout): + manager.logger.Warn().Msgf("add listener channel was not called, timeouted") + break + case <-dispatcher: + break + } + ptr := reflect.ValueOf(listener).Pointer() manager.listenersMu.Lock() @@ -118,7 +130,9 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample) manager.listenersMu.Unlock() manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener") - }, nil + }() + + return dispatcher, nil } func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) { diff --git a/internal/types/capture.go b/internal/types/capture.go index 84c35672..e308daa0 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -35,8 +35,9 @@ type ScreencastManager interface { type StreamManager interface { Codec() codec.RTPCodec - // starts pipeline if was not running before and returns register function - NewListener(listener *func(sample Sample)) (addListener func(), err error) + // starts pipeline if was not running before + // and returns dispatcher channel + NewListener(listener *func(sample Sample)) (dispatcher chan interface{}, err error) // stops pipeline if it was last listener RemoveListener(listener *func(sample Sample)) diff --git a/internal/webrtc/peertrack.go b/internal/webrtc/peertrack.go index ef7cf8c6..a2d1efc8 100644 --- a/internal/webrtc/peertrack.go +++ b/internal/webrtc/peertrack.go @@ -52,7 +52,7 @@ func (peer *PeerTrack) SetStream(stream types.StreamManager) error { defer peer.streamMu.Unlock() // prepare new listener - addListener, err := stream.NewListener(&peer.listener) + dispatcher, err := stream.NewListener(&peer.listener) if err != nil { return err } @@ -63,7 +63,7 @@ func (peer *PeerTrack) SetStream(stream types.StreamManager) error { } // add new listener - addListener() + close(dispatcher) peer.stream = stream return nil