From f05889bd567a159930972d2eca206b929390acab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sat, 6 Feb 2021 12:52:02 +0100 Subject: [PATCH] Capture stream: Add- and RemoveListener. --- internal/capture/stream.go | 31 +++++++++++++++++++++++-------- internal/types/capture.go | 3 ++- internal/webrtc/manager.go | 8 ++++++-- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/internal/capture/stream.go b/internal/capture/stream.go index 1a34cbc7..1fb615c6 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -3,8 +3,8 @@ package capture import ( "fmt" "sync" + "reflect" - "github.com/kataras/go-events" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -20,7 +20,8 @@ type StreamManagerCtx struct { pipelineStr string pipeline *gst.Pipeline sample chan types.Sample - emmiter events.EventEmmiter + listeners map[uintptr]func(sample types.Sample) + emitMu sync.Mutex emitUpdate chan bool emitStop chan bool started bool @@ -31,7 +32,7 @@ func streamNew(codec codec.RTPCodec, pipelineStr string) *StreamManagerCtx { logger: log.With().Str("module", "capture").Str("submodule", "stream").Logger(), codec: codec, pipelineStr: pipelineStr, - emmiter: events.New(), + listeners: map[uintptr]func(sample types.Sample){}, emitUpdate: make(chan bool), emitStop: make(chan bool), started: false, @@ -48,7 +49,11 @@ func streamNew(codec codec.RTPCodec, pipelineStr string) *StreamManagerCtx { case <-manager.emitUpdate: manager.logger.Debug().Msg("update emitting samples") case sample := <-manager.sample: - manager.emmiter.Emit("sample", sample) + manager.emitMu.Lock() + for _, emit := range manager.listeners { + emit(sample) + } + manager.emitMu.Unlock() } } }() @@ -67,10 +72,20 @@ func (manager *StreamManagerCtx) Codec() codec.RTPCodec { return manager.codec } -func (manager *StreamManagerCtx) OnSample(listener func(sample types.Sample)) { - manager.emmiter.On("sample", func(payload ...interface{}) { - listener(payload[0].(types.Sample)) - }) +func (manager *StreamManagerCtx) AddListener(listener func(sample types.Sample)) { + manager.emitMu.Lock() + defer manager.emitMu.Unlock() + + ptr := reflect.ValueOf(listener).Pointer() + manager.listeners[ptr] = listener +} + +func (manager *StreamManagerCtx) RemoveListener(listener func(sample types.Sample)) { + manager.emitMu.Lock() + defer manager.emitMu.Unlock() + + ptr := reflect.ValueOf(listener).Pointer() + delete(manager.listeners, ptr) } func (manager *StreamManagerCtx) Start() error { diff --git a/internal/types/capture.go b/internal/types/capture.go index ead74036..9527ec22 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -23,7 +23,8 @@ type ScreencastManager interface { type StreamManager interface { Codec() codec.RTPCodec - OnSample(listener func(sample Sample)) + AddListener(listener func(sample Sample)) + RemoveListener(listener func(sample Sample)) Start() error Stop() diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index aaf9d81c..e50699d5 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -48,7 +48,7 @@ func (manager *WebRTCManagerCtx) Start() { manager.logger.Panic().Err(err).Msg("unable to create audio track") } - audio.OnSample(func(sample types.Sample) { + audio.AddListener(func(sample types.Sample) { if err := manager.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { manager.logger.Warn().Err(err).Msg("audio pipeline failed to write") } @@ -70,7 +70,7 @@ func (manager *WebRTCManagerCtx) Start() { manager.logger.Panic().Err(err).Str("videoID", videoID).Msg("unable to create video track") } - video.OnSample(func(sample types.Sample) { + video.AddListener(func(sample types.Sample) { if err := track.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { manager.logger.Warn().Err(err).Str("videoID", videoID).Msg("vide pipeline failed to write") } @@ -178,6 +178,10 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (*webrtc.Sess } } + connection.OnNegotiationNeeded(func() { + logger.Warn().Msg("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!negotiation needed!") + }) + connection.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { switch state { case webrtc.PeerConnectionStateConnected: