From 3eedbbf8eae161a61987d79df3a2ceb857fee9e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sat, 17 Sep 2022 18:00:25 +0200 Subject: [PATCH] broadcast pipeline fn. --- internal/capture/broadcast.go | 25 +++++++++---------- internal/capture/manager.go | 46 +++++++++++++++++++---------------- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index 6f032b33..012505b9 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -1,7 +1,6 @@ package capture import ( - "strings" "sync" "github.com/prometheus/client_golang/prometheus" @@ -17,9 +16,9 @@ type BroacastManagerCtx struct { logger zerolog.Logger mu sync.Mutex - pipeline *gst.Pipeline - pipelineStr string - pipelineMu sync.Mutex + pipeline *gst.Pipeline + pipelineMu sync.Mutex + pipelineFn func(url string) (string, error) url string started bool @@ -29,17 +28,17 @@ type BroacastManagerCtx struct { pipelinesActive prometheus.Gauge } -func broadcastNew(pipelineStr string) *BroacastManagerCtx { +func broadcastNew(pipelineFn func(url string) (string, error), defaultUrl string) *BroacastManagerCtx { logger := log.With(). Str("module", "capture"). Str("submodule", "broadcast"). Logger() return &BroacastManagerCtx{ - logger: logger, - pipelineStr: pipelineStr, - url: "", - started: false, + logger: logger, + pipelineFn: pipelineFn, + url: defaultUrl, + started: defaultUrl != "", // metrics pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{ @@ -119,10 +118,10 @@ func (manager *BroacastManagerCtx) createPipeline() error { return types.ErrCapturePipelineAlreadyExists } - var err error - - // replace {url} with valid URL - pipelineStr := strings.Replace(manager.pipelineStr, "{url}", manager.url, 1) + pipelineStr, err := manager.pipelineFn(manager.url) + if err != nil { + return err + } manager.logger.Info(). Str("src", pipelineStr). diff --git a/internal/capture/manager.go b/internal/capture/manager.go index c1ae62e2..61ecc2ee 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -32,25 +32,6 @@ type CaptureManagerCtx struct { func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { logger := log.With().Str("module", "capture").Logger() - broadcastPipeline := config.BroadcastPipeline - if broadcastPipeline == "" { - broadcastPipeline = fmt.Sprintf( - "flvmux name=mux ! rtmpsink location='{url} live=1' "+ - "pulsesrc device=%s "+ - "! audio/x-raw,channels=2 "+ - "! audioconvert "+ - "! queue "+ - "! voaacenc bitrate=%d "+ - "! mux. "+ - "ximagesrc display-name=%s show-pointer=true use-damage=false "+ - "! video/x-raw "+ - "! videoconvert "+ - "! queue "+ - "! x264enc threads=4 bitrate=%d key-int-max=15 byte-stream=true tune=zerolatency speed-preset=%s "+ - "! mux.", config.AudioDevice, config.BroadcastAudioBitrate*1000, config.Display, config.BroadcastVideoBitrate, config.BroadcastPreset, - ) - } - screencastPipeline := config.ScreencastPipeline if screencastPipeline == "" { screencastPipeline = fmt.Sprintf( @@ -69,6 +50,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt createPipeline := func() (string, error) { if pipelineConf.GstPipeline != "" { + // replace {display} with valid display return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1), nil } @@ -106,11 +88,33 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt desktop: desktop, // sinks - broadcast: broadcastNew(broadcastPipeline), + broadcast: broadcastNew(func(url string) (string, error) { + if config.BroadcastPipeline != "" { + // replace {url} with valid URL + return strings.Replace(config.BroadcastPipeline, "{url}", url, 1), nil + } + + return fmt.Sprintf( + "flvmux name=mux ! rtmpsink location='%s live=1' "+ + "pulsesrc device=%s "+ + "! audio/x-raw,channels=2 "+ + "! audioconvert "+ + "! queue "+ + "! voaacenc bitrate=%d "+ + "! mux. "+ + "ximagesrc display-name=%s show-pointer=true use-damage=false "+ + "! video/x-raw "+ + "! videoconvert "+ + "! queue "+ + "! x264enc threads=4 bitrate=%d key-int-max=15 byte-stream=true tune=zerolatency speed-preset=%s "+ + "! mux.", url, config.AudioDevice, config.BroadcastAudioBitrate*1000, config.Display, config.BroadcastVideoBitrate, config.BroadcastPreset, + ), nil + }, ""), screencast: screencastNew(config.ScreencastEnabled, screencastPipeline), audio: streamSinkNew(config.AudioCodec, func() (string, error) { if config.AudioPipeline != "" { - return config.AudioPipeline, nil + // replace {device} with valid device + return strings.Replace(config.AudioPipeline, "{device}", config.AudioDevice, 1), nil } return fmt.Sprintf(