From 82ca6e13ca75c37127a07d303b7be0df97df12cd Mon Sep 17 00:00:00 2001 From: Craig Date: Mon, 6 Apr 2020 03:42:42 +0000 Subject: [PATCH] RTMP Broadcast (WIP) --- server/internal/broadcast/manager.go | 40 +++++++++++++++++++ server/internal/gst/gst.go | 16 +++++++- server/internal/remote/manager.go | 6 +-- server/internal/types/config/broadcast.go | 48 +++++++++++++++++++++++ 4 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 server/internal/broadcast/manager.go create mode 100644 server/internal/types/config/broadcast.go diff --git a/server/internal/broadcast/manager.go b/server/internal/broadcast/manager.go new file mode 100644 index 00000000..0cb51858 --- /dev/null +++ b/server/internal/broadcast/manager.go @@ -0,0 +1,40 @@ +package broadcast + +import ( + "github.com/rs/zerolog" + "n.eko.moe/neko/internal/gst" + "n.eko.moe/neko/internal/types/config" +) + +type BroadcastManager struct { + logger zerolog.Logger + pipeline *gst.Pipeline + config *config.Broadcast +} + +func New(config *config.Broadcast) *BroadcastManager { + return &BroadcastManager{ + logger: log.With().Str("module", "remote").Logger(), + config: config + } +} + +func (manager *BroadcastManager) Start() { + var err error + manager.pipeline, err = gst.CreateRTMPPipeline( + manager.config.Device, + manager.config.Display, + manager.config.RTMP, + ) + if err != nil { + manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") + } + + manager.pipeline.Start() +} + +func (manager *BroadcastManager) Shutdown() error { + if (manager.pipeline != nil) { + manager.pipeline.Stop() + } +} diff --git a/server/internal/gst/gst.go b/server/internal/gst/gst.go index a2dd7c28..80c07979 100644 --- a/server/internal/gst/gst.go +++ b/server/internal/gst/gst.go @@ -64,8 +64,15 @@ func init() { registry = C.gst_registry_get() } -// CreatePipeline creates a GStreamer Pipeline -func CreatePipeline(codecName string, pipelineDevice string, pipelineSrc string) (*Pipeline, error) { +// CreateRTMPPipeline creates a GStreamer Pipeline +func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineRTMP string) (*Pipeline, error) { + video := fmt.Sprintf(videoSrc, pipelineDisplay) + audio := fmt.Sprintf(audioSrc, pipelineDevice) + return CreatePipeline(fmt.Sprintf("%s ! x264enc ! flv. ! %s ! faac ! flv. ! flvmux name='flv' ! rtmpsink location='%s'", video, audio, pipelineRTMP)) +} + +// CreateAppPipeline creates a GStreamer Pipeline +func CreateAppPipeline(codecName string, pipelineDevice string, pipelineSrc string) (*Pipeline, error) { pipelineStr := " ! appsink name=appsink" var clockRate float32 @@ -192,6 +199,11 @@ func CreatePipeline(codecName string, pipelineDevice string, pipelineSrc string) return nil, fmt.Errorf("unknown codec %s", codecName) } + return CreatePipeline(pipelineStr) +} + +// CreatePipeline creates a GStreamer Pipeline +func CreatePipeline(pipelineStr string) (*Pipeline, error) { pipelineStrUnsafe := C.CString(pipelineStr) defer C.free(unsafe.Pointer(pipelineStrUnsafe)) diff --git a/server/internal/remote/manager.go b/server/internal/remote/manager.go index 4c0114bf..31c72cb1 100644 --- a/server/internal/remote/manager.go +++ b/server/internal/remote/manager.go @@ -127,7 +127,7 @@ func (manager *RemoteManager) Streaming() bool { func (manager *RemoteManager) createPipelines() { var err error - manager.video, err = gst.CreatePipeline( + manager.video, err = gst.CreateAppPipeline( manager.config.VideoCodec, manager.config.Display, manager.config.VideoParams, @@ -136,7 +136,7 @@ func (manager *RemoteManager) createPipelines() { manager.logger.Panic().Err(err).Msg("unable to create video pipeline") } - manager.audio, err = gst.CreatePipeline( + manager.audio, err = gst.CreateAppPipeline( manager.config.AudioCodec, manager.config.Device, manager.config.AudioParams, @@ -161,7 +161,7 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) return err } - video, err := gst.CreatePipeline( + video, err := gst.CreateAppPipeline( manager.config.VideoCodec, manager.config.Display, manager.config.VideoParams, diff --git a/server/internal/types/config/broadcast.go b/server/internal/types/config/broadcast.go new file mode 100644 index 00000000..f31888a3 --- /dev/null +++ b/server/internal/types/config/broadcast.go @@ -0,0 +1,48 @@ +package config + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +type Broadcast struct { + Enabled string + Display string + Device string + AudioParams string + VideoParams string + RTMP string +} + +func (Broadcast) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().Bool("broadcast", false, "use PCMA audio codec") + if err := viper.BindPFlag("broadcast", cmd.PersistentFlags().Lookup("broadcast")); err != nil { + return err + } + + cmd.PersistentFlags().String("rtmp", "", "RMTP url for broadcasting") + if err := viper.BindPFlag("rtmp", cmd.PersistentFlags().Lookup("rtmp")); err != nil { + return err + } + + cmd.PersistentFlags().String("cast_audio", "", "audio codec parameters to use for broadcasting") + if err := viper.BindPFlag("cast_audio", cmd.PersistentFlags().Lookup("cast_audio")); err != nil { + return err + } + + cmd.PersistentFlags().String("cast_video", "", "video codec parameters to use for broadcasting") + if err := viper.BindPFlag("cast_video", cmd.PersistentFlags().Lookup("cast_video")); err != nil { + return err + } + + return nil +} + +func (s *Broadcast) Set() { + s.Enabled = viper.GetBool("broadcast") + s.Display = viper.GetString("display") + s.Device = viper.GetString("device") + s.AudioParams = viper.GetString("cast_audio") + s.VideoParams = viper.GetString("cast_video") + s.RTMP = viper.GetString("rtmp") +}