From 75b1e0887483595a35b3abdd75c71ebc51e142c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 28 Nov 2021 23:16:26 +0100 Subject: [PATCH] virtual microphone OnTrack. --- internal/webrtc/manager.go | 68 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index 9a8da87e..db1a3570 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -5,10 +5,12 @@ import ( "strings" "time" + "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "demodesk/neko/internal/capture/gst" "demodesk/neko/internal/config" "demodesk/neko/internal/types" "demodesk/neko/internal/types/codec" @@ -155,8 +157,70 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin iceTrickle: manager.config.ICETrickle, } - connection.OnTrack(func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver) { - logger.Info().Interface("track", tr).Interface("receiver", r).Msg("got remote track") + connection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + // if we did not get audio + if track.Kind() != webrtc.RTPCodecTypeAudio { + logger.Warn().Msg("got remote track, but not audio") + return + } + + codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1] + + logger.Info(). + Str("stream-id", track.StreamID()). + Str("id", track.ID()). + Str("rid", track.RID()). + Str("codec", codecName). + Uint8("payload-type", uint8(track.PayloadType())). + Msgf("received new track") + + pipelineStr := "appsrc format=time is-live=true do-timestamp=true name=src" + + //add appropriate decoder + switch strings.ToLower(codecName) { + case "opus": + pipelineStr += fmt.Sprintf(" ! application/x-rtp, payload=%d, encoding-name=OPUS ! rtpopusdepay ! decodebin", track.PayloadType()) + case "g722": + pipelineStr += " ! application/x-rtp clock-rate=8000 ! rtpg722depay ! decodebin" + default: + logger.Panic().Msgf("Unhandled codec %s", codecName) + } + + pipelineStr += " ! pulsesink device=audio_input" + logger.Info().Str("pipeline", pipelineStr).Msg("create pipeline") + + pipeline, err := gst.CreatePipeline(pipelineStr) + if err != nil { + logger.Err(err).Str("pipeline", pipelineStr).Msg("unable to create pipeline") + return + } + + pipeline.Play() + defer pipeline.Stop() + + // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval + ticker := time.NewTicker(time.Second * 3) + defer ticker.Stop() + + go func() { + for range ticker.C { + rtcpSendErr := connection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) + if rtcpSendErr != nil { + fmt.Println(rtcpSendErr) + } + } + }() + + buf := make([]byte, 1400) + for { + i, _, err := track.Read(buf) + if err != nil { + logger.Warn().Err(err).Msg("failed read from pipeline") + break + } + + pipeline.Push("src", buf[:i]) + } }) connection.OnDataChannel(func(dc *webrtc.DataChannel) {