remove config from capture managers.

This commit is contained in:
Miroslav Šedivý 2021-02-05 13:58:02 +01:00
parent 3515c67045
commit 18b6fa0a03
5 changed files with 133 additions and 132 deletions

View File

@ -3,30 +3,29 @@ package capture
import ( import (
"fmt" "fmt"
"sync" "sync"
"strings"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"demodesk/neko/internal/config"
"demodesk/neko/internal/capture/gst" "demodesk/neko/internal/capture/gst"
) )
type BroacastManagerCtx struct { type BroacastManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
config *config.Capture pipelineStr string
pipeline *gst.Pipeline pipeline *gst.Pipeline
enabled bool enabled bool
url string url string
} }
func broadcastNew(config *config.Capture) *BroacastManagerCtx { func broadcastNew(pipelineStr string) *BroacastManagerCtx {
return &BroacastManagerCtx{ return &BroacastManagerCtx{
logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(), logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(),
mu: sync.Mutex{}, pipelineStr: pipelineStr,
config: config, enabled: false,
enabled: false, url: "",
url: "",
} }
} }
@ -68,30 +67,24 @@ func (manager *BroacastManagerCtx) Url() string {
func (manager *BroacastManagerCtx) createPipeline() error { func (manager *BroacastManagerCtx) createPipeline() error {
if manager.pipeline != nil { if manager.pipeline != nil {
return fmt.Errorf("pipeline already running") return fmt.Errorf("pipeline already exists")
} }
var err error var err error
// replace {url} with valid URL
pipelineStr := strings.Replace(manager.pipelineStr, "{url}", manager.url, 1)
manager.logger.Info(). manager.logger.Info().
Str("audio_device", manager.config.Device). Str("str", pipelineStr).
Str("video_display", manager.config.Display). Msgf("starting pipeline")
Str("broadcast_pipeline", manager.config.BroadcastPipeline).
Msgf("creating pipeline")
manager.pipeline, err = gst.CreateRTMPPipeline(
manager.config.Device,
manager.config.Display,
manager.config.BroadcastPipeline,
manager.url,
)
manager.pipeline, err = gst.CreatePipeline(pipelineStr)
if err != nil { if err != nil {
return err return err
} }
manager.pipeline.Play() manager.pipeline.Play()
manager.logger.Info().Msgf("starting pipeline")
return nil return nil
} }
@ -101,6 +94,6 @@ func (manager *BroacastManagerCtx) destroyPipeline() {
} }
manager.pipeline.Stop() manager.pipeline.Stop()
manager.logger.Info().Msgf("stopping pipeline") manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
} }

View File

@ -39,35 +39,36 @@ func init() {
registry = C.gst_registry_get() registry = C.gst_registry_get()
} }
// CreateRTMPPipeline creates a GStreamer Pipeline func GetRTMPPipeline(audioDevice string, videoDisplay string, pipelineSrc string) string {
func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineSrc string, pipelineRTMP string) (*Pipeline, error) { video := fmt.Sprintf(videoSrc, videoDisplay)
video := fmt.Sprintf(videoSrc, pipelineDisplay) audio := fmt.Sprintf(audioSrc, audioDevice)
audio := fmt.Sprintf(audioSrc, pipelineDevice)
var pipelineStr string var pipelineStr string
if pipelineSrc != "" { if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc, pipelineRTMP, pipelineDevice, pipelineDisplay) pipelineStr = fmt.Sprintf(pipelineSrc, audioDevice, videoDisplay)
} else { } else {
pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video) pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='{url} live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", audio, video)
} }
return CreatePipeline(pipelineStr) return pipelineStr
} }
// CreateJPEGPipeline creates a GStreamer Pipeline func GetJPEGPipeline(videoDisplay string, pipelineSrc string, rate string, quality string) string {
func CreateJPEGPipeline(pipelineDisplay string, pipelineSrc string, rate string, quality string) (*Pipeline, error) {
var pipelineStr string var pipelineStr string
if pipelineSrc != "" { if pipelineSrc != "" {
pipelineStr = fmt.Sprintf(pipelineSrc, pipelineDisplay) pipelineStr = fmt.Sprintf(pipelineSrc, videoDisplay)
} else { } else {
pipelineStr = fmt.Sprintf("ximagesrc display-name=%s show-pointer=true use-damage=false ! videoconvert ! videoscale ! videorate ! video/x-raw,framerate=%s ! jpegenc quality=%s" + appSink, pipelineDisplay, rate, quality) pipelineStr = fmt.Sprintf("ximagesrc display-name=%s show-pointer=true use-damage=false ! videoconvert ! videoscale ! videorate ! video/x-raw,framerate=%s ! jpegenc quality=%s" + appSink, videoDisplay, rate, quality)
} }
return CreatePipeline(pipelineStr) return pipelineStr
} }
// CreateAppPipeline creates a GStreamer Pipeline func GetAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineSrc string) (string, error) {
func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineSrc string) (*Pipeline, error) { if pipelineSrc != "" {
return fmt.Sprintf(pipelineSrc + appSink, pipelineDevice), nil
}
var pipelineStr string var pipelineStr string
switch codecRTP.Name { switch codecRTP.Name {
@ -75,7 +76,7 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS
// https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html // https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
if err := CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil { if err := CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil {
return nil, err return "", err
} }
pipelineStr = fmt.Sprintf(videoSrc + "vp8enc cpu-used=16 threads=4 deadline=1 error-resilient=partitions keyframe-max-dist=15 static-threshold=20" + appSink, pipelineDevice) pipelineStr = fmt.Sprintf(videoSrc + "vp8enc cpu-used=16 threads=4 deadline=1 error-resilient=partitions keyframe-max-dist=15 static-threshold=20" + appSink, pipelineDevice)
@ -83,14 +84,14 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS
// https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html // https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
if err := CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil { if err := CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil {
return nil, err return "", err
} }
pipelineStr = fmt.Sprintf(videoSrc + "vp9enc cpu-used=16 threads=4 deadline=1 keyframe-max-dist=15 static-threshold=20" + appSink, pipelineDevice) pipelineStr = fmt.Sprintf(videoSrc + "vp9enc cpu-used=16 threads=4 deadline=1 keyframe-max-dist=15 static-threshold=20" + appSink, pipelineDevice)
case "h264": case "h264":
var err error var err error
if err = CheckPlugins([]string{"ximagesrc"}); err != nil { if err = CheckPlugins([]string{"ximagesrc"}); err != nil {
return nil, err return "", err
} }
// https://gstreamer.freedesktop.org/documentation/x264/index.html // https://gstreamer.freedesktop.org/documentation/x264/index.html
@ -107,12 +108,12 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS
break break
} }
return nil, err return "", err
case "opus": case "opus":
// https://gstreamer.freedesktop.org/documentation/opus/opusenc.html // https://gstreamer.freedesktop.org/documentation/opus/opusenc.html
// gstreamer1.0-plugins-base // gstreamer1.0-plugins-base
if err := CheckPlugins([]string{"pulseaudio", "opus"}); err != nil { if err := CheckPlugins([]string{"pulseaudio", "opus"}); err != nil {
return nil, err return "", err
} }
pipelineStr = fmt.Sprintf(audioSrc + "opusenc bitrate=128000" + appSink, pipelineDevice) pipelineStr = fmt.Sprintf(audioSrc + "opusenc bitrate=128000" + appSink, pipelineDevice)
@ -120,7 +121,7 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS
// https://gstreamer.freedesktop.org/documentation/libav/avenc_g722.html // https://gstreamer.freedesktop.org/documentation/libav/avenc_g722.html
// gstreamer1.0-libav // gstreamer1.0-libav
if err := CheckPlugins([]string{"pulseaudio", "libav"}); err != nil { if err := CheckPlugins([]string{"pulseaudio", "libav"}); err != nil {
return nil, err return "", err
} }
pipelineStr = fmt.Sprintf(audioSrc + "avenc_g722" + appSink, pipelineDevice) pipelineStr = fmt.Sprintf(audioSrc + "avenc_g722" + appSink, pipelineDevice)
@ -128,7 +129,7 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS
// https://gstreamer.freedesktop.org/documentation/mulaw/mulawenc.html // https://gstreamer.freedesktop.org/documentation/mulaw/mulawenc.html
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
if err := CheckPlugins([]string{"pulseaudio", "mulaw"}); err != nil { if err := CheckPlugins([]string{"pulseaudio", "mulaw"}); err != nil {
return nil, err return "", err
} }
pipelineStr = fmt.Sprintf(audioSrc + "audio/x-raw, rate=8000 ! mulawenc" + appSink, pipelineDevice) pipelineStr = fmt.Sprintf(audioSrc + "audio/x-raw, rate=8000 ! mulawenc" + appSink, pipelineDevice)
@ -136,19 +137,15 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS
// https://gstreamer.freedesktop.org/documentation/alaw/alawenc.html // https://gstreamer.freedesktop.org/documentation/alaw/alawenc.html
// gstreamer1.0-plugins-good // gstreamer1.0-plugins-good
if err := CheckPlugins([]string{"pulseaudio", "alaw"}); err != nil { if err := CheckPlugins([]string{"pulseaudio", "alaw"}); err != nil {
return nil, err return "", err
} }
pipelineStr = fmt.Sprintf(audioSrc + "audio/x-raw, rate=8000 ! alawenc" + appSink, pipelineDevice) pipelineStr = fmt.Sprintf(audioSrc + "audio/x-raw, rate=8000 ! alawenc" + appSink, pipelineDevice)
default: default:
return nil, fmt.Errorf("unknown codec %s", codecRTP.Name) return "", fmt.Errorf("unknown codec %s", codecRTP.Name)
} }
if pipelineSrc != "" { return pipelineStr, nil
pipelineStr = fmt.Sprintf(pipelineSrc + appSink, pipelineDevice)
}
return CreatePipeline(pipelineStr)
} }
// CreatePipeline creates a GStreamer Pipeline // CreatePipeline creates a GStreamer Pipeline

View File

@ -8,12 +8,12 @@ import (
"demodesk/neko/internal/types" "demodesk/neko/internal/types"
"demodesk/neko/internal/config" "demodesk/neko/internal/config"
"demodesk/neko/internal/capture/gst"
) )
type CaptureManagerCtx struct { type CaptureManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
config *config.Capture
desktop types.DesktopManager desktop types.DesktopManager
streaming bool streaming bool
broadcast *BroacastManagerCtx broadcast *BroacastManagerCtx
@ -23,16 +23,49 @@ type CaptureManagerCtx struct {
} }
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
logger := log.With().Str("module", "capture").Logger()
broadcastPipeline := gst.GetRTMPPipeline(
config.Device,
config.Display,
config.BroadcastPipeline,
)
screencastPipeline := gst.GetJPEGPipeline(
config.Display,
config.ScreencastPipeline,
config.ScreencastRate,
config.ScreencastQuality,
)
audioPipeline, err := gst.GetAppPipeline(
config.AudioCodec,
config.Device,
config.AudioParams,
)
if err != nil {
logger.Panic().Err(err).Msg("unable to get pipeline")
}
videoPipeline, err := gst.GetAppPipeline(
config.VideoCodec,
config.Display,
config.VideoParams,
)
if err != nil {
logger.Panic().Err(err).Msg("unable to get pipeline")
}
return &CaptureManagerCtx{ return &CaptureManagerCtx{
logger: log.With().Str("module", "capture").Logger(), logger: logger,
mu: sync.Mutex{},
config: config,
desktop: desktop, desktop: desktop,
streaming: false, streaming: false,
broadcast: broadcastNew(config), broadcast: broadcastNew(broadcastPipeline),
screencast: screencastNew(config), screencast: screencastNew(config.Screencast, screencastPipeline),
audio: streamNew(config.AudioCodec, config.Device, config.AudioParams), audio: streamNew(config.AudioCodec, audioPipeline),
video: streamNew(config.VideoCodec, config.Display, config.VideoParams), video: streamNew(config.VideoCodec, videoPipeline),
} }
} }

View File

@ -9,41 +9,38 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"demodesk/neko/internal/config"
"demodesk/neko/internal/types" "demodesk/neko/internal/types"
"demodesk/neko/internal/capture/gst" "demodesk/neko/internal/capture/gst"
) )
type ScreencastManagerCtx struct { type ScreencastManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
config *config.Capture pipelineStr string
pipeline *gst.Pipeline pipeline *gst.Pipeline
enabled bool enabled bool
started bool started bool
emitStop chan bool emitStop chan bool
emitUpdate chan bool emitUpdate chan bool
expired int32 expired int32
sample chan types.Sample sample chan types.Sample
image types.Sample image types.Sample
} }
func screencastNew(config *config.Capture) *ScreencastManagerCtx { const screencastTimeout = 5 * time.Second
manager := &ScreencastManagerCtx{
logger: log.With().Str("module", "capture").Str("submodule", "screencast").Logger(),
config: config,
enabled: config.Screencast,
started: false,
emitStop: make(chan bool),
emitUpdate: make(chan bool),
}
if !manager.enabled { func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
return manager manager := &ScreencastManagerCtx{
logger: log.With().Str("module", "capture").Str("submodule", "screencast").Logger(),
pipelineStr: pipelineStr,
enabled: enabled,
started: false,
emitStop: make(chan bool),
emitUpdate: make(chan bool),
} }
go func() { go func() {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(screencastTimeout)
manager.logger.Debug().Msg("started emitting samples") manager.logger.Debug().Msg("started emitting samples")
for { for {
@ -133,31 +130,20 @@ func (manager *ScreencastManagerCtx) stop() {
func (manager *ScreencastManagerCtx) createPipeline() error { func (manager *ScreencastManagerCtx) createPipeline() error {
if manager.pipeline != nil { if manager.pipeline != nil {
return fmt.Errorf("pipeline already running") return fmt.Errorf("pipeline already exists")
} }
var err error var err error
manager.logger.Info(). manager.logger.Info().
Str("video_display", manager.config.Display). Str("str", manager.pipelineStr).
Str("screencast_pipeline", manager.config.ScreencastPipeline).
Msgf("creating pipeline") Msgf("creating pipeline")
manager.pipeline, err = gst.CreateJPEGPipeline( manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
manager.config.Display,
manager.config.ScreencastPipeline,
manager.config.ScreencastRate,
manager.config.ScreencastQuality,
)
if err != nil { if err != nil {
return err return err
} }
manager.logger.Info().
Str("src", manager.pipeline.Src).
Msgf("starting pipeline")
manager.pipeline.Start() manager.pipeline.Start()
manager.sample = manager.pipeline.Sample manager.sample = manager.pipeline.Sample
manager.emitUpdate <-true manager.emitUpdate <-true
@ -170,6 +156,6 @@ func (manager *ScreencastManagerCtx) destroyPipeline() {
} }
manager.pipeline.Stop() manager.pipeline.Stop()
manager.logger.Info().Msgf("stopping pipeline") manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
} }

View File

@ -14,30 +14,27 @@ import (
) )
type StreamManagerCtx struct { type StreamManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
codec codec.RTPCodec codec codec.RTPCodec
pipelineDevice string pipelineStr string
pipelineSrc string pipeline *gst.Pipeline
pipeline *gst.Pipeline sample chan types.Sample
sample chan types.Sample emmiter events.EventEmmiter
emmiter events.EventEmmiter emitUpdate chan bool
emitUpdate chan bool emitStop chan bool
emitStop chan bool enabled bool
enabled bool
} }
func streamNew(codec codec.RTPCodec, pipelineDevice string, pipelineSrc string) *StreamManagerCtx { func streamNew(codec codec.RTPCodec, pipelineStr string) *StreamManagerCtx {
manager := &StreamManagerCtx{ manager := &StreamManagerCtx{
logger: log.With().Str("module", "capture").Str("submodule", "stream").Logger(), logger: log.With().Str("module", "capture").Str("submodule", "stream").Logger(),
mu: sync.Mutex{}, codec: codec,
codec: codec, pipelineStr: pipelineStr,
pipelineDevice: pipelineDevice, emmiter: events.New(),
pipelineSrc: pipelineSrc, emitUpdate: make(chan bool),
emmiter: events.New(), emitStop: make(chan bool),
emitUpdate: make(chan bool), enabled: false,
emitStop: make(chan bool),
enabled: false,
} }
go func() { go func() {
@ -103,7 +100,7 @@ func (manager *StreamManagerCtx) Enabled() bool {
func (manager *StreamManagerCtx) createPipeline() error { func (manager *StreamManagerCtx) createPipeline() error {
if manager.pipeline != nil { if manager.pipeline != nil {
return fmt.Errorf("pipeline already running") return fmt.Errorf("pipeline already exists")
} }
var err error var err error
@ -111,19 +108,14 @@ func (manager *StreamManagerCtx) createPipeline() error {
codec := manager.Codec() codec := manager.Codec()
manager.logger.Info(). manager.logger.Info().
Str("codec", codec.Name). Str("codec", codec.Name).
Str("device", manager.pipelineDevice). Str("src", manager.pipelineStr).
Str("src", manager.pipelineSrc).
Msgf("creating pipeline") Msgf("creating pipeline")
manager.pipeline, err = gst.CreateAppPipeline(codec, manager.pipelineDevice, manager.pipelineSrc) manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
if err != nil { if err != nil {
return err return err
} }
manager.logger.Info().
Str("src", manager.pipeline.Src).
Msgf("starting pipeline")
manager.pipeline.Start() manager.pipeline.Start()
manager.sample = manager.pipeline.Sample manager.sample = manager.pipeline.Sample
@ -137,6 +129,6 @@ func (manager *StreamManagerCtx) destroyPipeline() {
} }
manager.pipeline.Stop() manager.pipeline.Stop()
manager.logger.Info().Msgf("stopping pipeline") manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil manager.pipeline = nil
} }