From a6f117686f9219de7afecaed26fe4efaaa790f34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sat, 17 Sep 2022 17:52:55 +0200 Subject: [PATCH] create pipeline fn return error. --- internal/capture/manager.go | 24 ++++++++++++++---------- internal/capture/streamsink.go | 24 +++++++++++++----------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/internal/capture/manager.go b/internal/capture/manager.go index 61ef8edb..c1ae62e2 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -67,27 +67,31 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt for video_id, cnf := range config.VideoPipelines { pipelineConf := cnf - createPipeline := func() string { + createPipeline := func() (string, error) { if pipelineConf.GstPipeline != "" { - return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1) + return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1), nil } screen := desktop.GetScreenSize() pipeline, err := pipelineConf.GetPipeline(*screen) if err != nil { - logger.Panic().Err(err). - Str("video_id", video_id). - Msg("unable to get video pipeline") + return "", err } return fmt.Sprintf( "ximagesrc display-name=%s show-pointer=false use-damage=false "+ "%s ! appsink name=appsink", config.Display, pipeline, - ) + ), nil } // trigger function to catch evaluation errors at startup - pipeline := createPipeline() + pipeline, err := createPipeline() + if err != nil { + logger.Panic().Err(err). + Str("video_id", video_id). + Msg("failed to create video pipeline") + } + logger.Info(). Str("video_id", video_id). Str("pipeline", pipeline). @@ -104,9 +108,9 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt // sinks broadcast: broadcastNew(broadcastPipeline), screencast: screencastNew(config.ScreencastEnabled, screencastPipeline), - audio: streamSinkNew(config.AudioCodec, func() string { + audio: streamSinkNew(config.AudioCodec, func() (string, error) { if config.AudioPipeline != "" { - return config.AudioPipeline + return config.AudioPipeline, nil } return fmt.Sprintf( @@ -116,7 +120,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt "! queue "+ "! %s "+ "! appsink name=appsink", config.AudioDevice, config.AudioCodec.Pipeline, - ) + ), nil }, "audio"), videos: videos, videoIDs: config.VideoIDs, diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index 954ed452..a6ab78d8 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -23,10 +23,10 @@ type StreamSinkManagerCtx struct { mu sync.Mutex wg sync.WaitGroup - codec codec.RTPCodec - pipeline *gst.Pipeline - pipelineMu sync.Mutex - pipelineStr func() string + codec codec.RTPCodec + pipeline *gst.Pipeline + pipelineMu sync.Mutex + pipelineFn func() (string, error) listeners map[uintptr]*func(sample types.Sample) listenersMu sync.Mutex @@ -37,17 +37,17 @@ type StreamSinkManagerCtx struct { pipelinesActive prometheus.Gauge } -func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx { +func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), video_id string) *StreamSinkManagerCtx { logger := log.With(). Str("module", "capture"). Str("submodule", "stream-sink"). Str("video_id", video_id).Logger() manager := &StreamSinkManagerCtx{ - logger: logger, - codec: codec, - pipelineStr: pipelineStr, - listeners: map[uintptr]*func(sample types.Sample){}, + logger: logger, + codec: codec, + pipelineFn: pipelineFn, + listeners: map[uintptr]*func(sample types.Sample){}, // metrics currentListeners: promauto.NewGauge(prometheus.GaugeOpts{ @@ -249,9 +249,11 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { return types.ErrCapturePipelineAlreadyExists } - var err error + pipelineStr, err := manager.pipelineFn() + if err != nil { + return err + } - pipelineStr := manager.pipelineStr() manager.logger.Info(). Str("codec", manager.codec.Name). Str("src", pipelineStr).