create pipeline fn return error.

This commit is contained in:
Miroslav Šedivý 2022-09-17 17:52:55 +02:00
parent d5863d8b95
commit a6f117686f
2 changed files with 27 additions and 21 deletions

View File

@ -67,27 +67,31 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
for video_id, cnf := range config.VideoPipelines { for video_id, cnf := range config.VideoPipelines {
pipelineConf := cnf pipelineConf := cnf
createPipeline := func() string { createPipeline := func() (string, error) {
if pipelineConf.GstPipeline != "" { if pipelineConf.GstPipeline != "" {
return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1) return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1), nil
} }
screen := desktop.GetScreenSize() screen := desktop.GetScreenSize()
pipeline, err := pipelineConf.GetPipeline(*screen) pipeline, err := pipelineConf.GetPipeline(*screen)
if err != nil { if err != nil {
logger.Panic().Err(err). return "", err
Str("video_id", video_id).
Msg("unable to get video pipeline")
} }
return fmt.Sprintf( return fmt.Sprintf(
"ximagesrc display-name=%s show-pointer=false use-damage=false "+ "ximagesrc display-name=%s show-pointer=false use-damage=false "+
"%s ! appsink name=appsink", config.Display, pipeline, "%s ! appsink name=appsink", config.Display, pipeline,
) ), nil
} }
// trigger function to catch evaluation errors at startup // trigger function to catch evaluation errors at startup
pipeline := createPipeline() pipeline, err := createPipeline()
if err != nil {
logger.Panic().Err(err).
Str("video_id", video_id).
Msg("failed to create video pipeline")
}
logger.Info(). logger.Info().
Str("video_id", video_id). Str("video_id", video_id).
Str("pipeline", pipeline). Str("pipeline", pipeline).
@ -104,9 +108,9 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
// sinks // sinks
broadcast: broadcastNew(broadcastPipeline), broadcast: broadcastNew(broadcastPipeline),
screencast: screencastNew(config.ScreencastEnabled, screencastPipeline), screencast: screencastNew(config.ScreencastEnabled, screencastPipeline),
audio: streamSinkNew(config.AudioCodec, func() string { audio: streamSinkNew(config.AudioCodec, func() (string, error) {
if config.AudioPipeline != "" { if config.AudioPipeline != "" {
return config.AudioPipeline return config.AudioPipeline, nil
} }
return fmt.Sprintf( return fmt.Sprintf(
@ -116,7 +120,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
"! queue "+ "! queue "+
"! %s "+ "! %s "+
"! appsink name=appsink", config.AudioDevice, config.AudioCodec.Pipeline, "! appsink name=appsink", config.AudioDevice, config.AudioCodec.Pipeline,
) ), nil
}, "audio"), }, "audio"),
videos: videos, videos: videos,
videoIDs: config.VideoIDs, videoIDs: config.VideoIDs,

View File

@ -23,10 +23,10 @@ type StreamSinkManagerCtx struct {
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
codec codec.RTPCodec codec codec.RTPCodec
pipeline *gst.Pipeline pipeline *gst.Pipeline
pipelineMu sync.Mutex pipelineMu sync.Mutex
pipelineStr func() string pipelineFn func() (string, error)
listeners map[uintptr]*func(sample types.Sample) listeners map[uintptr]*func(sample types.Sample)
listenersMu sync.Mutex listenersMu sync.Mutex
@ -37,17 +37,17 @@ type StreamSinkManagerCtx struct {
pipelinesActive prometheus.Gauge pipelinesActive prometheus.Gauge
} }
func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx { func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), video_id string) *StreamSinkManagerCtx {
logger := log.With(). logger := log.With().
Str("module", "capture"). Str("module", "capture").
Str("submodule", "stream-sink"). Str("submodule", "stream-sink").
Str("video_id", video_id).Logger() Str("video_id", video_id).Logger()
manager := &StreamSinkManagerCtx{ manager := &StreamSinkManagerCtx{
logger: logger, logger: logger,
codec: codec, codec: codec,
pipelineStr: pipelineStr, pipelineFn: pipelineFn,
listeners: map[uintptr]*func(sample types.Sample){}, listeners: map[uintptr]*func(sample types.Sample){},
// metrics // metrics
currentListeners: promauto.NewGauge(prometheus.GaugeOpts{ currentListeners: promauto.NewGauge(prometheus.GaugeOpts{
@ -249,9 +249,11 @@ func (manager *StreamSinkManagerCtx) createPipeline() error {
return types.ErrCapturePipelineAlreadyExists return types.ErrCapturePipelineAlreadyExists
} }
var err error pipelineStr, err := manager.pipelineFn()
if err != nil {
return err
}
pipelineStr := manager.pipelineStr()
manager.logger.Info(). manager.logger.Info().
Str("codec", manager.codec.Name). Str("codec", manager.codec.Name).
Str("src", pipelineStr). Str("src", pipelineStr).