broadcast pipeline fn.

This commit is contained in:
Miroslav Šedivý 2022-09-17 18:00:25 +02:00
parent a6f117686f
commit 3eedbbf8ea
2 changed files with 37 additions and 34 deletions

View File

@ -1,7 +1,6 @@
package capture package capture
import ( import (
"strings"
"sync" "sync"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -17,9 +16,9 @@ type BroacastManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
pipeline *gst.Pipeline pipeline *gst.Pipeline
pipelineStr string pipelineMu sync.Mutex
pipelineMu sync.Mutex pipelineFn func(url string) (string, error)
url string url string
started bool started bool
@ -29,17 +28,17 @@ type BroacastManagerCtx struct {
pipelinesActive prometheus.Gauge pipelinesActive prometheus.Gauge
} }
func broadcastNew(pipelineStr string) *BroacastManagerCtx { func broadcastNew(pipelineFn func(url string) (string, error), defaultUrl string) *BroacastManagerCtx {
logger := log.With(). logger := log.With().
Str("module", "capture"). Str("module", "capture").
Str("submodule", "broadcast"). Str("submodule", "broadcast").
Logger() Logger()
return &BroacastManagerCtx{ return &BroacastManagerCtx{
logger: logger, logger: logger,
pipelineStr: pipelineStr, pipelineFn: pipelineFn,
url: "", url: defaultUrl,
started: false, started: defaultUrl != "",
// metrics // metrics
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{ pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
@ -119,10 +118,10 @@ func (manager *BroacastManagerCtx) createPipeline() error {
return types.ErrCapturePipelineAlreadyExists return types.ErrCapturePipelineAlreadyExists
} }
var err error pipelineStr, err := manager.pipelineFn(manager.url)
if err != nil {
// replace {url} with valid URL return err
pipelineStr := strings.Replace(manager.pipelineStr, "{url}", manager.url, 1) }
manager.logger.Info(). manager.logger.Info().
Str("src", pipelineStr). Str("src", pipelineStr).

View File

@ -32,25 +32,6 @@ type CaptureManagerCtx struct {
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
logger := log.With().Str("module", "capture").Logger() logger := log.With().Str("module", "capture").Logger()
broadcastPipeline := config.BroadcastPipeline
if broadcastPipeline == "" {
broadcastPipeline = fmt.Sprintf(
"flvmux name=mux ! rtmpsink location='{url} live=1' "+
"pulsesrc device=%s "+
"! audio/x-raw,channels=2 "+
"! audioconvert "+
"! queue "+
"! voaacenc bitrate=%d "+
"! mux. "+
"ximagesrc display-name=%s show-pointer=true use-damage=false "+
"! video/x-raw "+
"! videoconvert "+
"! queue "+
"! x264enc threads=4 bitrate=%d key-int-max=15 byte-stream=true tune=zerolatency speed-preset=%s "+
"! mux.", config.AudioDevice, config.BroadcastAudioBitrate*1000, config.Display, config.BroadcastVideoBitrate, config.BroadcastPreset,
)
}
screencastPipeline := config.ScreencastPipeline screencastPipeline := config.ScreencastPipeline
if screencastPipeline == "" { if screencastPipeline == "" {
screencastPipeline = fmt.Sprintf( screencastPipeline = fmt.Sprintf(
@ -69,6 +50,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
createPipeline := func() (string, error) { createPipeline := func() (string, error) {
if pipelineConf.GstPipeline != "" { if pipelineConf.GstPipeline != "" {
// replace {display} with valid display
return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1), nil return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1), nil
} }
@ -106,11 +88,33 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
desktop: desktop, desktop: desktop,
// sinks // sinks
broadcast: broadcastNew(broadcastPipeline), broadcast: broadcastNew(func(url string) (string, error) {
if config.BroadcastPipeline != "" {
// replace {url} with valid URL
return strings.Replace(config.BroadcastPipeline, "{url}", url, 1), nil
}
return fmt.Sprintf(
"flvmux name=mux ! rtmpsink location='%s live=1' "+
"pulsesrc device=%s "+
"! audio/x-raw,channels=2 "+
"! audioconvert "+
"! queue "+
"! voaacenc bitrate=%d "+
"! mux. "+
"ximagesrc display-name=%s show-pointer=true use-damage=false "+
"! video/x-raw "+
"! videoconvert "+
"! queue "+
"! x264enc threads=4 bitrate=%d key-int-max=15 byte-stream=true tune=zerolatency speed-preset=%s "+
"! mux.", url, config.AudioDevice, config.BroadcastAudioBitrate*1000, config.Display, config.BroadcastVideoBitrate, config.BroadcastPreset,
), nil
}, ""),
screencast: screencastNew(config.ScreencastEnabled, screencastPipeline), screencast: screencastNew(config.ScreencastEnabled, screencastPipeline),
audio: streamSinkNew(config.AudioCodec, func() (string, error) { audio: streamSinkNew(config.AudioCodec, func() (string, error) {
if config.AudioPipeline != "" { if config.AudioPipeline != "" {
return config.AudioPipeline, nil // replace {device} with valid device
return strings.Replace(config.AudioPipeline, "{device}", config.AudioDevice, 1), nil
} }
return fmt.Sprintf( return fmt.Sprintf(