From 161d121e5961f8d35f817c1450dc0cd199515016 Mon Sep 17 00:00:00 2001 From: mbattista Date: Wed, 25 Jan 2023 01:11:42 +0100 Subject: [PATCH] channel direct from the pipeline --- server/internal/capture/streamsink.go | 28 +++++---------------------- server/internal/webrtc/webrtc.go | 10 ++++++++++ 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/server/internal/capture/streamsink.go b/server/internal/capture/streamsink.go index 5a8c06f..01d2bd9 100644 --- a/server/internal/capture/streamsink.go +++ b/server/internal/capture/streamsink.go @@ -17,7 +17,6 @@ import ( type StreamSinkManagerCtx struct { logger zerolog.Logger mu sync.Mutex - wg sync.WaitGroup codec codec.RTPCodec pipeline *gst.Pipeline @@ -29,7 +28,6 @@ type StreamSinkManagerCtx struct { listenersMu sync.Mutex changeFramerate int16 - sampleChannel chan types.Sample } func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), video_id string) *StreamSinkManagerCtx { @@ -44,7 +42,6 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), vide pipelineFn: pipelineFn, changeFramerate: 0, adaptiveFramerate: false, - sampleChannel: make(chan types.Sample, 100), } return manager @@ -54,7 +51,6 @@ func (manager *StreamSinkManagerCtx) shutdown() { manager.logger.Info().Msgf("shutdown") manager.destroyPipeline() - manager.wg.Wait() } func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec { @@ -168,24 +164,6 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { manager.pipeline.AttachAppsink("appsink" + appsinkSubfix) manager.pipeline.Play() - manager.wg.Add(1) - pipeline := manager.pipeline - - go func() { - manager.logger.Debug().Msg("started emitting samples") - defer manager.wg.Done() - - for { - sample, ok := <-pipeline.Sample - if !ok { - manager.logger.Debug().Msg("stopped emitting samples") - return - } - - manager.sampleChannel <- sample - } - }() - return nil } @@ -203,7 +181,11 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() { } func (manager *StreamSinkManagerCtx) GetSampleChannel() (chan types.Sample) { - return manager.sampleChannel + if manager.pipeline != nil { + return manager.pipeline.Sample + } + + return nil } func (manager *StreamSinkManagerCtx) SetChangeFramerate(rate int16) { diff --git a/server/internal/webrtc/webrtc.go b/server/internal/webrtc/webrtc.go index f8c4004..c0d7e15 100644 --- a/server/internal/webrtc/webrtc.go +++ b/server/internal/webrtc/webrtc.go @@ -57,6 +57,11 @@ func (manager *WebRTCManager) Start() { go func() { for { + if manager.capture.Audio().GetSampleChannel() == nil { + // Pipeline not yet initialized + time.Sleep(50 * time.Millisecond) + continue + } newSample := <- manager.capture.Audio().GetSampleChannel() err := manager.audioTrack.WriteSample(media.Sample(newSample)) if err != nil && errors.Is(err, io.ErrClosedPipe) { @@ -77,6 +82,11 @@ func (manager *WebRTCManager) Start() { go func() { for { + if manager.capture.Video().GetSampleChannel() == nil { + // Pipeline not yet initialized + time.Sleep(50 * time.Millisecond) + continue + } newSample := <- manager.capture.Video().GetSampleChannel() err := manager.videoTrack.WriteSample(media.Sample(newSample)) if err != nil && errors.Is(err, io.ErrClosedPipe) {