diff --git a/internal/capture/manager.go b/internal/capture/manager.go index b5404f53..94c57184 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -122,7 +122,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt videoIDs: config.VideoIDs, // sources - webcam: streamSrcNew(map[string]string{ + webcam: streamSrcNew(config.WebcamEnabled, map[string]string{ codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=VP8-DRAFT-IETF-01 ", codec.VP8().PayloadType) + "! rtpvp8depay " + @@ -130,7 +130,8 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt "! videoconvert " + "! videorate " + "! identity drop-allocation=true " + - "! v4l2sink sync=false device=/dev/video0", + fmt.Sprintf("! v4l2sink sync=false device=%s", config.WebcamDevice), + // TODO: Test this pipeline. codec.VP9().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + "! application/x-rtp " + "! rtpvp9depay " + @@ -138,7 +139,8 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt "! videoconvert " + "! videorate " + "! identity drop-allocation=true " + - "! v4l2sink sync=false device=/dev/video0", + fmt.Sprintf("! v4l2sink sync=false device=%s", config.WebcamDevice), + // TODO: Test this pipeline. codec.H264().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + "! application/x-rtp " + "! rtph264depay " + @@ -146,19 +148,20 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt "! videoconvert " + "! videorate " + "! identity drop-allocation=true " + - "! v4l2sink sync=false device=/dev/video0", + fmt.Sprintf("! v4l2sink sync=false device=%s", config.WebcamDevice), }, "webcam"), - microphone: streamSrcNew(map[string]string{ + microphone: streamSrcNew(config.MicrophoneEnabled, map[string]string{ codec.Opus().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=OPUS ", codec.Opus().PayloadType) + "! rtpopusdepay " + "! decodebin " + - "! pulsesink device=audio_input", + fmt.Sprintf("! pulsesink device=%s", config.MicrophoneDevice), + // TODO: Test this pipeline. codec.G722().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + "! application/x-rtp clock-rate=8000 " + "! rtpg722depay " + "! decodebin " + - "! pulsesink device=audio_input", + fmt.Sprintf("! pulsesink device=%s", config.MicrophoneDevice), }, "microphone"), } } diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index 8a78f35f..aa8d44ba 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -131,7 +131,7 @@ func (manager *ScreencastManagerCtx) start() error { defer manager.mu.Unlock() if !manager.enabled { - return errors.New("screenshot pipeline not enabled") + return errors.New("screencast not enabled") } err := manager.createPipeline() diff --git a/internal/capture/streamsrc.go b/internal/capture/streamsrc.go index bbb5ce30..1c4b76bf 100644 --- a/internal/capture/streamsrc.go +++ b/internal/capture/streamsrc.go @@ -14,6 +14,7 @@ import ( type StreamSrcManagerCtx struct { logger zerolog.Logger + enabled bool codecPipeline map[string]string // codec -> pipeline codec codec.RTPCodec @@ -22,7 +23,7 @@ type StreamSrcManagerCtx struct { pipelineStr string } -func streamSrcNew(codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx { +func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx { logger := log.With(). Str("module", "capture"). Str("submodule", "stream-src"). @@ -30,6 +31,7 @@ func streamSrcNew(codecPipeline map[string]string, video_id string) *StreamSrcMa return &StreamSrcManagerCtx{ logger: logger, + enabled: enabled, codecPipeline: codecPipeline, } } @@ -52,6 +54,10 @@ func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error { return types.ErrCapturePipelineAlreadyExists } + if !manager.enabled { + return errors.New("stream-src not enabled") + } + found := false for codecName, pipeline := range manager.codecPipeline { if codecName == codec.Name { diff --git a/internal/config/capture.go b/internal/config/capture.go index d7d9b908..b7825054 100644 --- a/internal/config/capture.go +++ b/internal/config/capture.go @@ -33,11 +33,17 @@ type Capture struct { ScreencastRate string ScreencastQuality string ScreencastPipeline string + + WebcamEnabled bool + WebcamDevice string + + MicrophoneEnabled bool + MicrophoneDevice string } func (Capture) Init(cmd *cobra.Command) error { // audio - cmd.PersistentFlags().String("capture.audio.device", "audio_output.monitor", "audio device to capture") + cmd.PersistentFlags().String("capture.audio.device", "audio_output.monitor", "pulseaudio device to capture") if err := viper.BindPFlag("capture.audio.device", cmd.PersistentFlags().Lookup("capture.audio.device")); err != nil { return err } @@ -110,6 +116,28 @@ func (Capture) Init(cmd *cobra.Command) error { return err } + // webcam + cmd.PersistentFlags().Bool("capture.webcam.enabled", false, "enable webcam stream") + if err := viper.BindPFlag("capture.webcam.enabled", cmd.PersistentFlags().Lookup("capture.webcam.enabled")); err != nil { + return err + } + + cmd.PersistentFlags().String("capture.webcam.device", "/dev/video0", "v4l2sink device used for webcam") + if err := viper.BindPFlag("capture.webcam.device", cmd.PersistentFlags().Lookup("capture.webcam.device")); err != nil { + return err + } + + // microphone + cmd.PersistentFlags().Bool("capture.microphone.enabled", true, "enable microphone stream") + if err := viper.BindPFlag("capture.microphone.enabled", cmd.PersistentFlags().Lookup("capture.microphone.enabled")); err != nil { + return err + } + + cmd.PersistentFlags().String("capture.microphone.device", "audio_input", "pulseaudio device used for microphone") + if err := viper.BindPFlag("capture.microphone.device", cmd.PersistentFlags().Lookup("capture.microphone.device")); err != nil { + return err + } + return nil } @@ -174,4 +202,12 @@ func (s *Capture) Set() { s.ScreencastRate = viper.GetString("capture.screencast.rate") s.ScreencastQuality = viper.GetString("capture.screencast.quality") s.ScreencastPipeline = viper.GetString("capture.screencast.pipeline") + + // webcam + s.WebcamEnabled = viper.GetBool("capture.webcam.enabled") + s.WebcamDevice = viper.GetString("capture.webcam.device") + + // microphone + s.MicrophoneEnabled = viper.GetBool("capture.microphone.enabled") + s.MicrophoneDevice = viper.GetString("capture.microphone.device") } diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index 299a7506..8829b028 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -157,24 +157,24 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin } connection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - if !session.Profile().CanShareMedia { - logger.Warn(). - Str("kind", track.Kind().String()). - Msgf("got track but share media is disabled for this session") + defer receiver.Stop() - receiver.Stop() - return - } - - logger.Info(). + logger := logger.With(). Str("kind", track.Kind().String()). Str("mime", track.Codec().RTPCodecCapability.MimeType). - Msgf("received new track") + Logger() + + logger.Info().Msgf("received new track") + + if !session.Profile().CanShareMedia { + logger.Warn().Msg("share media is disabled for this session") + return + } // parse codec codec, ok := codec.ParseRTC(track.Codec()) if !ok { - logger.Warn().Str("mime", track.Codec().RTPCodecCapability.MimeType).Msg("unknown codec") + logger.Warn().Msg("unknown codec") return } @@ -192,6 +192,7 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin logger.Err(err).Msg("failed to start pipeline") return } + defer srcManager.Stop() // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval ticker := time.NewTicker(time.Second * 3) @@ -199,9 +200,9 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin go func() { for range ticker.C { - rtcpSendErr := connection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) - if rtcpSendErr != nil { - fmt.Println(rtcpSendErr) + err := connection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) + if err != nil { + logger.Err(err).Msg("rtcp send err") } } }() @@ -210,16 +211,14 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin for { i, _, err := track.Read(buf) if err != nil { - logger.Warn().Err(err).Msg("failed read from pipeline") + logger.Warn().Err(err).Msg("failed read from track") break } srcManager.Push(buf[:i]) } - logger.Warn().Msg("src manager stream connection died, stopping") - srcManager.Stop() - logger.Warn().Msg("src manager stream stopped!!!!!!!!!!!!!!!") + logger.Info().Msg("track data finished") }) connection.OnDataChannel(func(dc *webrtc.DataChannel) {