diff --git a/internal/capture/manager.go b/internal/capture/manager.go index d7d41a35..89bb7630 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -10,17 +10,23 @@ import ( "demodesk/neko/internal/config" "demodesk/neko/internal/types" + "demodesk/neko/internal/types/codec" ) type CaptureManagerCtx struct { logger zerolog.Logger desktop types.DesktopManager + // sinks broadcast *BroacastManagerCtx screencast *ScreencastManagerCtx audio *StreamSinkManagerCtx videos map[string]*StreamSinkManagerCtx videoIDs []string + + // sources + webcam *StreamSrcManagerCtx + microphone *StreamSrcManagerCtx } func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { @@ -95,6 +101,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt logger: logger, desktop: desktop, + // sinks broadcast: broadcastNew(broadcastPipeline), screencast: screencastNew(config.ScreencastEnabled, screencastPipeline), audio: streamSinkNew(config.AudioCodec, func() string { @@ -113,6 +120,21 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt }, "audio"), videos: videos, videoIDs: config.VideoIDs, + + // sources + webcam: streamSrcNew(map[string]string{}, "webcam"), // TODO + microphone: streamSrcNew(map[string]string{ + codec.Opus().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + + "! application/x-rtp, payload=111, encoding-name=OPUS " + + "! rtpopusdepay " + + "! decodebin " + + "! pulsesink device=audio_input", + codec.G722().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + + "! application/x-rtp clock-rate=8000 " + + "! rtpg722depay " + + "! decodebin " + + "! pulsesink device=audio_input", + }, "microphone"), } } @@ -177,6 +199,9 @@ func (manager *CaptureManagerCtx) Shutdown() error { video.shutdown() } + manager.webcam.shutdown() + manager.microphone.shutdown() + return nil } @@ -200,3 +225,11 @@ func (manager *CaptureManagerCtx) Video(videoID string) (types.StreamSinkManager func (manager *CaptureManagerCtx) VideoIDs() []string { return manager.videoIDs } + +func (manager *CaptureManagerCtx) Webcam() types.StreamSrcManager { + return manager.webcam +} + +func (manager *CaptureManagerCtx) Microphone() types.StreamSrcManager { + return manager.microphone +} diff --git a/internal/capture/streamsrc.go b/internal/capture/streamsrc.go new file mode 100644 index 00000000..7630b8da --- /dev/null +++ b/internal/capture/streamsrc.go @@ -0,0 +1,111 @@ +package capture + +import ( + "errors" + "sync" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "demodesk/neko/internal/capture/gst" + "demodesk/neko/internal/types" + "demodesk/neko/internal/types/codec" +) + +type StreamSrcManagerCtx struct { + logger zerolog.Logger + codecPipeline map[string]string // codec -> pipeline + + codec codec.RTPCodec + pipeline *gst.Pipeline + pipelineMu sync.Mutex + pipelineStr string +} + +func streamSrcNew(codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx { + logger := log.With(). + Str("module", "capture"). + Str("submodule", "stream-src"). + Str("video_id", video_id).Logger() + + return &StreamSrcManagerCtx{ + logger: logger, + codecPipeline: codecPipeline, + } +} + +func (manager *StreamSrcManagerCtx) shutdown() { + manager.logger.Info().Msgf("shutdown") + + manager.Stop() +} + +func (manager *StreamSrcManagerCtx) Codec() codec.RTPCodec { + return manager.codec +} + +func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + + if manager.pipeline != nil { + return types.ErrCapturePipelineAlreadyExists + } + + found := false + for codecName, pipeline := range manager.codecPipeline { + if codecName == codec.Name { + manager.pipelineStr = pipeline + manager.codec = codec + found = true + break + } + } + + if !found { + return errors.New("no pipeline found for a codec") + } + + var err error + + manager.logger.Info(). + Str("codec", manager.codec.Name). + Str("src", manager.pipelineStr). + Msgf("creating pipeline") + + manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr) + if err != nil { + return err + } + + manager.pipeline.Play() + return nil +} + +func (manager *StreamSrcManagerCtx) Stop() { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + + if manager.pipeline == nil { + return + } + + manager.pipeline.Stop() + manager.logger.Info().Msgf("destroying pipeline") + manager.pipeline = nil +} + +func (manager *StreamSrcManagerCtx) Push(bytes []byte) { + manager.pipelineMu.Lock() + defer manager.pipelineMu.Unlock() + + if manager.pipeline == nil { + return + } + + manager.pipeline.Push("src", bytes) +} + +func (manager *StreamSrcManagerCtx) Started() bool { + return manager.pipeline != nil +} diff --git a/internal/types/capture.go b/internal/types/capture.go index 3fcc63fe..35d19fe9 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -43,6 +43,16 @@ type StreamSinkManager interface { Started() bool } +type StreamSrcManager interface { + Codec() codec.RTPCodec + + Start(codec codec.RTPCodec) error + Stop() + Push(bytes []byte) + + Started() bool +} + type CaptureManager interface { Start() Shutdown() error @@ -52,6 +62,9 @@ type CaptureManager interface { Audio() StreamSinkManager Video(videoID string) (StreamSinkManager, bool) VideoIDs() []string + + Webcam() StreamSrcManager + Microphone() StreamSrcManager } type VideoConfig struct { diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index db1a3570..b7cae274 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -10,7 +10,6 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "demodesk/neko/internal/capture/gst" "demodesk/neko/internal/config" "demodesk/neko/internal/types" "demodesk/neko/internal/types/codec" @@ -174,29 +173,17 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin Uint8("payload-type", uint8(track.PayloadType())). Msgf("received new track") - pipelineStr := "appsrc format=time is-live=true do-timestamp=true name=src" - - //add appropriate decoder - switch strings.ToLower(codecName) { - case "opus": - pipelineStr += fmt.Sprintf(" ! application/x-rtp, payload=%d, encoding-name=OPUS ! rtpopusdepay ! decodebin", track.PayloadType()) - case "g722": - pipelineStr += " ! application/x-rtp clock-rate=8000 ! rtpg722depay ! decodebin" - default: - logger.Panic().Msgf("Unhandled codec %s", codecName) - } - - pipelineStr += " ! pulsesink device=audio_input" - logger.Info().Str("pipeline", pipelineStr).Msg("create pipeline") - - pipeline, err := gst.CreatePipeline(pipelineStr) - if err != nil { - logger.Err(err).Str("pipeline", pipelineStr).Msg("unable to create pipeline") + // parse codec + codec, ok := codec.ParseRTC(track.Codec()) + if !ok { + logger.Warn().Str("mime", track.Codec().RTPCodecCapability.MimeType).Msg("unknown codec") return } - pipeline.Play() - defer pipeline.Stop() + // add microphone + microphone := manager.capture.Microphone() + microphone.Start(codec) + defer microphone.Stop() // TODO: Ensure no new publisher took over. // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval ticker := time.NewTicker(time.Second * 3) @@ -219,8 +206,10 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin break } - pipeline.Push("src", buf[:i]) + microphone.Push(buf[:i]) } + + logger.Warn().Msg("microphone connection died") }) connection.OnDataChannel(func(dc *webrtc.DataChannel) {