diff --git a/internal/capture/stream.go b/internal/capture/stream.go index 1fb615c6..03cf3f0d 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -20,7 +20,7 @@ type StreamManagerCtx struct { pipelineStr string pipeline *gst.Pipeline sample chan types.Sample - listeners map[uintptr]func(sample types.Sample) + listeners map[uintptr]*func(sample types.Sample) emitMu sync.Mutex emitUpdate chan bool emitStop chan bool @@ -32,7 +32,7 @@ func streamNew(codec codec.RTPCodec, pipelineStr string) *StreamManagerCtx { logger: log.With().Str("module", "capture").Str("submodule", "stream").Logger(), codec: codec, pipelineStr: pipelineStr, - listeners: map[uintptr]func(sample types.Sample){}, + listeners: map[uintptr]*func(sample types.Sample){}, emitUpdate: make(chan bool), emitStop: make(chan bool), started: false, @@ -51,7 +51,7 @@ func streamNew(codec codec.RTPCodec, pipelineStr string) *StreamManagerCtx { case sample := <-manager.sample: manager.emitMu.Lock() for _, emit := range manager.listeners { - emit(sample) + (*emit)(sample) } manager.emitMu.Unlock() } @@ -72,20 +72,24 @@ func (manager *StreamManagerCtx) Codec() codec.RTPCodec { return manager.codec } -func (manager *StreamManagerCtx) AddListener(listener func(sample types.Sample)) { +func (manager *StreamManagerCtx) AddListener(listener *func(sample types.Sample)) { manager.emitMu.Lock() defer manager.emitMu.Unlock() - ptr := reflect.ValueOf(listener).Pointer() - manager.listeners[ptr] = listener + if listener != nil { + ptr := reflect.ValueOf(listener).Pointer() + manager.listeners[ptr] = listener + } } -func (manager *StreamManagerCtx) RemoveListener(listener func(sample types.Sample)) { +func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) { manager.emitMu.Lock() defer manager.emitMu.Unlock() - ptr := reflect.ValueOf(listener).Pointer() - delete(manager.listeners, ptr) + if listener != nil { + ptr := reflect.ValueOf(listener).Pointer() + delete(manager.listeners, ptr) + } } func (manager *StreamManagerCtx) Start() error { diff --git a/internal/types/capture.go b/internal/types/capture.go index 9527ec22..71afa6f1 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -23,8 +23,8 @@ type ScreencastManager interface { type StreamManager interface { Codec() codec.RTPCodec - AddListener(listener func(sample Sample)) - RemoveListener(listener func(sample Sample)) + AddListener(listener *func(sample Sample)) + RemoveListener(listener *func(sample Sample)) Start() error Stop() diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index 17748467..decdc992 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -55,9 +55,9 @@ func (manager *WebRTCManagerCtx) Start() { } } - audio.AddListener(listener) + audio.AddListener(&listener) manager.unsubscribe = append(manager.unsubscribe, func(){ - audio.RemoveListener(listener) + audio.RemoveListener(&listener) }) videoIDs := manager.capture.VideoIDs() @@ -82,9 +82,9 @@ func (manager *WebRTCManagerCtx) Start() { } } - video.AddListener(listener) + video.AddListener(&listener) manager.unsubscribe = append(manager.unsubscribe, func(){ - video.RemoveListener(listener) + video.RemoveListener(&listener) }) manager.videoTracks[videoID] = track