diff --git a/server/internal/capture/broadcast.go b/server/internal/capture/broadcast.go index cfbeb7c..f1e8133 100644 --- a/server/internal/capture/broadcast.go +++ b/server/internal/capture/broadcast.go @@ -13,7 +13,6 @@ import ( type BroacastManagerCtx struct { logger zerolog.Logger mu sync.Mutex - sampleChannel chan types.Sample pipeline *gst.Pipeline pipelineMu sync.Mutex @@ -32,7 +31,6 @@ func broadcastNew(pipelineFn func(url string) (string, error), defaultUrl string return &BroacastManagerCtx{ logger: logger, pipelineFn: pipelineFn, - sampleChannel: make(chan types.Sample), url: defaultUrl, started: defaultUrl != "", } @@ -99,7 +97,7 @@ func (manager *BroacastManagerCtx) createPipeline() error { Str("src", pipelineStr). Msgf("starting pipeline") - manager.pipeline, err = gst.CreatePipeline(pipelineStr, manager.sampleChannel) + manager.pipeline, err = gst.CreatePipeline(pipelineStr) if err != nil { return err } diff --git a/server/internal/capture/gst/gst.go b/server/internal/capture/gst/gst.go index 5bb4cf0..c362014 100644 --- a/server/internal/capture/gst/gst.go +++ b/server/internal/capture/gst/gst.go @@ -37,7 +37,7 @@ func init() { registry = C.gst_registry_get() } -func CreatePipeline(pipelineStr string, sampleChannel chan types.Sample) (*Pipeline, error) { +func CreatePipeline(pipelineStr string) (*Pipeline, error) { id := atomic.AddInt32(&pSerial, 1) pipelineStrUnsafe := C.CString(pipelineStr) @@ -61,19 +61,20 @@ func CreatePipeline(pipelineStr string, sampleChannel chan types.Sample) (*Pipel Str("module", "capture"). Str("submodule", "gstreamer"). Int("pipeline_id", int(id)).Logger(), - Src: pipelineStr, - Ctx: ctx, - Sample: sampleChannel, + Src: pipelineStr, + Ctx: ctx, } pipelines[p.id] = p return p, nil } -func (p *Pipeline) AttachAppsink(sinkName string) { +func (p *Pipeline) AttachAppsink(sinkName string, sampleChannel chan types.Sample) { sinkNameUnsafe := C.CString(sinkName) defer C.free(unsafe.Pointer(sinkNameUnsafe)) + p.Sample = sampleChannel + C.gstreamer_pipeline_attach_appsink(p.Ctx, sinkNameUnsafe) } diff --git a/server/internal/capture/streamsink.go b/server/internal/capture/streamsink.go index d895d18..2aa3b36 100644 --- a/server/internal/capture/streamsink.go +++ b/server/internal/capture/streamsink.go @@ -13,9 +13,9 @@ import ( ) type StreamSinkManagerCtx struct { - logger zerolog.Logger - mu sync.Mutex - sampleChannel chan types.Sample + logger zerolog.Logger + mu sync.Mutex + sampleChannel chan types.Sample codec codec.RTPCodec pipeline *gst.Pipeline @@ -33,9 +33,9 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), vide Str("video_id", video_id).Logger() manager := &StreamSinkManagerCtx{ - logger: logger, - codec: codec, - pipelineFn: pipelineFn, + logger: logger, + codec: codec, + pipelineFn: pipelineFn, sampleChannel: make(chan types.Sample), } @@ -141,7 +141,7 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { Str("src", pipelineStr). Msgf("creating pipeline") - manager.pipeline, err = gst.CreatePipeline(pipelineStr, manager.sampleChannel) + manager.pipeline, err = gst.CreatePipeline(pipelineStr) if err != nil { return err } @@ -151,7 +151,7 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { appsinkSubfix = "video" } - manager.pipeline.AttachAppsink("appsink" + appsinkSubfix) + manager.pipeline.AttachAppsink("appsink"+appsinkSubfix, manager.sampleChannel) manager.pipeline.Play() return nil