diff --git a/server/internal/broadcast/manager.go b/server/internal/broadcast/manager.go index 753b53bc..ae5a73d6 100644 --- a/server/internal/broadcast/manager.go +++ b/server/internal/broadcast/manager.go @@ -3,6 +3,7 @@ package broadcast import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "n.eko.moe/neko/internal/gst" "n.eko.moe/neko/internal/types/config" ) @@ -10,34 +11,49 @@ import ( type BroadcastManager struct { logger zerolog.Logger pipeline *gst.Pipeline + remote *config.Remote config *config.Broadcast } -func New(config *config.Broadcast) *BroadcastManager { +func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager { return &BroadcastManager{ logger: log.With().Str("module", "remote").Logger(), + remote: remote, config: config, } } func (manager *BroadcastManager) Start() { + if !manager.config.Enabled { + return + } + var err error manager.pipeline, err = gst.CreateRTMPPipeline( - manager.config.Device, - manager.config.Display, + manager.remote.Device, + manager.remote.Display, manager.config.RTMP, ) + + manager.logger.Info(). + Str("audio_device", manager.remote.Device). + Str("video_display", manager.remote.Display). + Str("rtmp_pipeline_src", manager.pipeline.Src). + Msgf("RTMP pipeline is starting...") + if err != nil { manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") + return } - manager.pipeline.Start() + manager.pipeline.Play() } -func (manager *BroadcastManager) Shutdown() error { - if manager.pipeline != nil { - manager.pipeline.Stop() +func (manager *BroadcastManager) Stop() { + if manager.pipeline == nil { + return } - return nil + manager.pipeline.Stop() + manager.pipeline = nil } diff --git a/server/internal/gst/gst.go b/server/internal/gst/gst.go index 8999cc71..e2679de5 100644 --- a/server/internal/gst/gst.go +++ b/server/internal/gst/gst.go @@ -69,7 +69,7 @@ func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineR video := fmt.Sprintf(videoSrc, pipelineDisplay) audio := fmt.Sprintf(audioSrc, pipelineDevice) - return CreatePipeline(fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s' live=1 %s voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video), "", 0) + return CreatePipeline(fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video), "", 0) } // CreateAppPipeline creates a GStreamer Pipeline diff --git a/server/internal/remote/manager.go b/server/internal/remote/manager.go index 9ad36d92..3ed2f005 100644 --- a/server/internal/remote/manager.go +++ b/server/internal/remote/manager.go @@ -17,16 +17,15 @@ type RemoteManager struct { logger zerolog.Logger video *gst.Pipeline audio *gst.Pipeline - rtmp *gst.Pipeline config *config.Remote - broadcast *config.Broadcast + broadcast types.BroadcastManager cleanup *time.Ticker shutdown chan bool emmiter events.EventEmmiter streaming bool } -func New(config *config.Remote, broadcast *config.Broadcast) *RemoteManager { +func New(config *config.Remote, broadcast types.BroadcastManager) *RemoteManager { return &RemoteManager{ logger: log.With().Str("module", "remote").Logger(), cleanup: time.NewTicker(1 * time.Second), @@ -47,7 +46,16 @@ func (manager *RemoteManager) AudioCodec() string { } func (manager *RemoteManager) Start() { + xorg.Display(manager.config.Display) + + if !xorg.ValidScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) { + manager.logger.Warn().Msgf("invalid screen option %dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) + } else if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil { + manager.logger.Warn().Err(err).Msg("unable to change screen size") + } + manager.createPipelines() + manager.broadcast.Start() go func() { defer func() { @@ -73,10 +81,7 @@ func (manager *RemoteManager) Shutdown() error { manager.logger.Info().Msgf("remote shutting down") manager.video.Stop() manager.audio.Stop() - - if manager.broadcast.Enabled { - manager.rtmp.Stop() - } + manager.broadcast.Stop() manager.cleanup.Stop() manager.shutdown <- true @@ -96,6 +101,8 @@ func (manager *RemoteManager) OnAudioFrame(listener func(sample types.Sample)) { } func (manager *RemoteManager) StartStream() { + manager.createPipelines() + manager.logger.Info(). Str("video_display", manager.config.Display). Str("video_codec", manager.config.VideoCodec). @@ -106,28 +113,8 @@ func (manager *RemoteManager) StartStream() { Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)). Msgf("Pipelines starting...") - if manager.broadcast.Enabled { - manager.logger.Info(). - Str("rtmp_pipeline_src", manager.rtmp.Src). - Msgf("Prtmp pipeline is starting...") - } - - xorg.Display(manager.config.Display) - - if !xorg.ValidScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) { - manager.logger.Warn().Msgf("invalid screen option %dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) - } else if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil { - manager.logger.Warn().Err(err).Msg("unable to change screen size") - } - - manager.createPipelines() manager.video.Start() manager.audio.Start() - - if manager.broadcast.Enabled { - manager.rtmp.Play() - } - manager.streaming = true } @@ -135,11 +122,6 @@ func (manager *RemoteManager) StopStream() { manager.logger.Info().Msgf("Pipelines shutting down...") manager.video.Stop() manager.audio.Stop() - - if manager.broadcast.Enabled { - manager.rtmp.Stop() - } - manager.streaming = false } @@ -166,17 +148,6 @@ func (manager *RemoteManager) createPipelines() { if err != nil { manager.logger.Panic().Err(err).Msg("unable to create audio pipeline") } - - if manager.broadcast.Enabled { - manager.rtmp, err = gst.CreateRTMPPipeline( - manager.config.Device, - manager.config.Display, - manager.broadcast.RTMP, - ) - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") - } - } } func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) error { @@ -185,17 +156,11 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) } manager.video.Stop() - - if manager.broadcast.Enabled { - manager.rtmp.Stop() - } + manager.broadcast.Stop() defer func() { manager.video.Start() - - if manager.broadcast.Enabled { - manager.rtmp.Play() - } + manager.broadcast.Start() manager.logger.Info().Msg("starting video pipeline...") }() @@ -204,7 +169,8 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) return err } - video, err := gst.CreateAppPipeline( + var err error + manager.video, err = gst.CreateAppPipeline( manager.config.VideoCodec, manager.config.Display, manager.config.VideoParams, @@ -212,19 +178,6 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) if err != nil { manager.logger.Panic().Err(err).Msg("unable to create new video pipeline") } - manager.video = video - - if manager.broadcast.Enabled { - rtmp, err := gst.CreateRTMPPipeline( - manager.config.Device, - manager.config.Display, - manager.broadcast.RTMP, - ) - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create new rtmp pipeline") - } - manager.rtmp = rtmp - } return nil } diff --git a/server/internal/types/broadcast.go b/server/internal/types/broadcast.go new file mode 100644 index 00000000..7b4d698a --- /dev/null +++ b/server/internal/types/broadcast.go @@ -0,0 +1,6 @@ +package types + +type BroadcastManager interface { + Start() + Stop() +} diff --git a/server/internal/types/config/broadcast.go b/server/internal/types/config/broadcast.go index d01e5282..bfef102c 100644 --- a/server/internal/types/config/broadcast.go +++ b/server/internal/types/config/broadcast.go @@ -7,15 +7,13 @@ import ( type Broadcast struct { Enabled bool - // Display string - // Device string + RTMP string // AudioParams string // VideoParams string - RTMP string } func (Broadcast) Init(cmd *cobra.Command) error { - cmd.PersistentFlags().Bool("broadcast", false, "use PCMA audio codec") + cmd.PersistentFlags().Bool("broadcast", false, "turn on boradcasting") if err := viper.BindPFlag("broadcast", cmd.PersistentFlags().Lookup("broadcast")); err != nil { return err } @@ -25,24 +23,22 @@ func (Broadcast) Init(cmd *cobra.Command) error { return err } - cmd.PersistentFlags().String("cast_audio", "", "audio codec parameters to use for broadcasting") - if err := viper.BindPFlag("cast_audio", cmd.PersistentFlags().Lookup("cast_audio")); err != nil { - return err - } + // cmd.PersistentFlags().String("cast_audio", "", "audio codec parameters to use for broadcasting") + // if err := viper.BindPFlag("cast_audio", cmd.PersistentFlags().Lookup("cast_audio")); err != nil { + // return err + // } - cmd.PersistentFlags().String("cast_video", "", "video codec parameters to use for broadcasting") - if err := viper.BindPFlag("cast_video", cmd.PersistentFlags().Lookup("cast_video")); err != nil { - return err - } + // cmd.PersistentFlags().String("cast_video", "", "video codec parameters to use for broadcasting") + // if err := viper.BindPFlag("cast_video", cmd.PersistentFlags().Lookup("cast_video")); err != nil { + // return err + // } return nil } func (s *Broadcast) Set() { s.Enabled = viper.GetBool("broadcast") - // s.Display = viper.GetString("display") - // s.Device = viper.GetString("device") + s.RTMP = viper.GetString("rtmp") // s.AudioParams = viper.GetString("cast_audio") // s.VideoParams = viper.GetString("cast_video") - s.RTMP = viper.GetString("rtmp") } diff --git a/server/neko.go b/server/neko.go index e962edbd..447979b8 100644 --- a/server/neko.go +++ b/server/neko.go @@ -6,6 +6,7 @@ import ( "os/signal" "runtime" + "n.eko.moe/neko/internal/broadcast" "n.eko.moe/neko/internal/http" "n.eko.moe/neko/internal/remote" "n.eko.moe/neko/internal/session" @@ -109,6 +110,7 @@ type Neko struct { server *http.Server sessionManager *session.SessionManager remoteManager *remote.RemoteManager + broadcastManager *broadcast.BroadcastManager webRTCManager *webrtc.WebRTCManager webSocketHandler *websocket.WebSocketHandler } @@ -118,8 +120,9 @@ func (neko *Neko) Preflight() { } func (neko *Neko) Start() { + broadcastManager := broadcast.New(neko.Remote, neko.Broadcast) - remoteManager := remote.New(neko.Remote, neko.Broadcast) + remoteManager := remote.New(neko.Remote, broadcastManager) remoteManager.Start() sessionManager := session.New(remoteManager)