diff --git a/server/internal/capture/broadcast.go b/server/internal/capture/broadcast.go index f1e81335..cfbeb7c4 100644 --- a/server/internal/capture/broadcast.go +++ b/server/internal/capture/broadcast.go @@ -13,6 +13,7 @@ import ( type BroacastManagerCtx struct { logger zerolog.Logger mu sync.Mutex + sampleChannel chan types.Sample pipeline *gst.Pipeline pipelineMu sync.Mutex @@ -31,6 +32,7 @@ func broadcastNew(pipelineFn func(url string) (string, error), defaultUrl string return &BroacastManagerCtx{ logger: logger, pipelineFn: pipelineFn, + sampleChannel: make(chan types.Sample), url: defaultUrl, started: defaultUrl != "", } @@ -97,7 +99,7 @@ func (manager *BroacastManagerCtx) createPipeline() error { Str("src", pipelineStr). Msgf("starting pipeline") - manager.pipeline, err = gst.CreatePipeline(pipelineStr) + manager.pipeline, err = gst.CreatePipeline(pipelineStr, manager.sampleChannel) if err != nil { return err } diff --git a/server/internal/capture/gst/gst.go b/server/internal/capture/gst/gst.go index 7b39f85b..59646ba0 100644 --- a/server/internal/capture/gst/gst.go +++ b/server/internal/capture/gst/gst.go @@ -37,7 +37,7 @@ func init() { registry = C.gst_registry_get() } -func CreatePipeline(pipelineStr string) (*Pipeline, error) { +func CreatePipeline(pipelineStr string, sampleChannel chan types.Sample) (*Pipeline, error) { id := atomic.AddInt32(&pSerial, 1) pipelineStrUnsafe := C.CString(pipelineStr) @@ -63,7 +63,7 @@ func CreatePipeline(pipelineStr string) (*Pipeline, error) { Int("pipeline_id", int(id)).Logger(), Src: pipelineStr, Ctx: ctx, - Sample: make(chan types.Sample), + Sample: sampleChannel, } pipelines[p.id] = p diff --git a/server/internal/capture/streamsink.go b/server/internal/capture/streamsink.go index 61eb315f..d895d189 100644 --- a/server/internal/capture/streamsink.go +++ b/server/internal/capture/streamsink.go @@ -15,6 +15,7 @@ import ( type StreamSinkManagerCtx struct { logger zerolog.Logger mu sync.Mutex + sampleChannel chan types.Sample codec codec.RTPCodec pipeline *gst.Pipeline @@ -35,6 +36,7 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), vide logger: logger, codec: codec, pipelineFn: pipelineFn, + sampleChannel: make(chan types.Sample), } return manager @@ -139,7 +141,7 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { Str("src", pipelineStr). Msgf("creating pipeline") - manager.pipeline, err = gst.CreatePipeline(pipelineStr) + manager.pipeline, err = gst.CreatePipeline(pipelineStr, manager.sampleChannel) if err != nil { return err } @@ -169,9 +171,5 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() { } func (manager *StreamSinkManagerCtx) GetSampleChannel() chan types.Sample { - if manager.pipeline != nil { - return manager.pipeline.Sample - } - - return nil + return manager.sampleChannel } diff --git a/server/internal/webrtc/webrtc.go b/server/internal/webrtc/webrtc.go index 79628165..693f5729 100644 --- a/server/internal/webrtc/webrtc.go +++ b/server/internal/webrtc/webrtc.go @@ -57,16 +57,10 @@ func (manager *WebRTCManager) Start() { go func() { for { - if manager.capture.Audio().GetSampleChannel() == nil { - // Pipeline not yet initialized - time.Sleep(50 * time.Millisecond) - continue - } - sample, ok := <-manager.capture.Audio().GetSampleChannel() if !ok { manager.logger.Debug().Msg("audio capture channel is closed") - continue // TOOD: Create this goroutine when creating the pipeline. + continue } err := manager.audioTrack.WriteSample(media.Sample(sample)) @@ -88,16 +82,10 @@ func (manager *WebRTCManager) Start() { go func() { for { - if manager.capture.Video().GetSampleChannel() == nil { - // Pipeline not yet initialized - time.Sleep(50 * time.Millisecond) - continue - } - sample, ok := <-manager.capture.Video().GetSampleChannel() if !ok { manager.logger.Debug().Msg("video capture channel is closed") - continue // TOOD: Create this goroutine when creating the pipeline. + continue } err := manager.videoTrack.WriteSample(media.Sample(sample))