Stream bucket manager (#12)

* refactor webrtc.

* bind track with a single connection.

* audio+video codec.

* move stream selection to bucket manager.

* audio w/o bucket manager.

* revert peer changes.

* return video IDs.

* destroy & recreate all.

* add video ID change.

* Track -> Recevier.
This commit is contained in:
Miroslav Šedivý 2022-10-17 13:39:31 +02:00 committed by GitHub
parent 095f9fe8ee
commit 5ad5daa6bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 438 additions and 356 deletions

View File

@ -0,0 +1,86 @@
package capture
import (
"errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/pkg/types"
"github.com/demodesk/neko/pkg/types/codec"
)
type BucketsManagerCtx struct {
logger zerolog.Logger
codec codec.RTPCodec
streams map[string]*StreamSinkManagerCtx
streamIDs []string
}
func bucketsNew(codec codec.RTPCodec, streams map[string]*StreamSinkManagerCtx, streamIDs []string) *BucketsManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "buckets").
Logger()
return &BucketsManagerCtx{
logger: logger,
codec: codec,
streams: streams,
streamIDs: streamIDs,
}
}
func (m *BucketsManagerCtx) shutdown() {
m.logger.Info().Msgf("shutdown")
}
func (m *BucketsManagerCtx) destroyAll() {
for _, video := range m.streams {
if video.Started() {
video.destroyPipeline()
}
}
}
func (m *BucketsManagerCtx) recreateAll() error {
for _, video := range m.streams {
if video.Started() {
err := video.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return err
}
}
}
return nil
}
func (m *BucketsManagerCtx) IDs() []string {
return m.streamIDs
}
func (m *BucketsManagerCtx) Codec() codec.RTPCodec {
return m.codec
}
func (m *BucketsManagerCtx) SetReceiver(receiver types.Receiver) error {
receiver.OnVideoIdChange(func(videoID string) error {
videoStream, ok := m.streams[videoID]
if !ok {
return types.ErrWebRTCVideoNotFound
}
return receiver.SetStream(videoStream)
})
// TODO: Save receiver.
return nil
}
func (m *BucketsManagerCtx) RemoveReceiver(receiver types.Receiver) error {
// TODO: Unsubribe from OnVideoIdChange.
// TODO: Remove receiver.
receiver.RemoveStream()
return nil
}

View File

@ -21,8 +21,7 @@ type CaptureManagerCtx struct {
broadcast *BroacastManagerCtx broadcast *BroacastManagerCtx
screencast *ScreencastManagerCtx screencast *ScreencastManagerCtx
audio *StreamSinkManagerCtx audio *StreamSinkManagerCtx
videos map[string]*StreamSinkManagerCtx video *BucketsManagerCtx
videoIDs []string
// sources // sources
webcam *StreamSrcManagerCtx webcam *StreamSrcManagerCtx
@ -118,6 +117,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
"! appsink name=appsink", config.Display, config.ScreencastRate, config.ScreencastQuality, "! appsink name=appsink", config.Display, config.ScreencastRate, config.ScreencastQuality,
) )
}()), }()),
audio: streamSinkNew(config.AudioCodec, func() (string, error) { audio: streamSinkNew(config.AudioCodec, func() (string, error) {
if config.AudioPipeline != "" { if config.AudioPipeline != "" {
// replace {device} with valid device // replace {device} with valid device
@ -133,8 +133,7 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
"! appsink name=appsink", config.AudioDevice, config.AudioCodec.Pipeline, "! appsink name=appsink", config.AudioDevice, config.AudioCodec.Pipeline,
), nil ), nil
}, "audio"), }, "audio"),
videos: videos, video: bucketsNew(config.VideoCodec, videos, config.VideoIDs),
videoIDs: config.VideoIDs,
// sources // sources
webcam: streamSrcNew(config.WebcamEnabled, map[string]string{ webcam: streamSrcNew(config.WebcamEnabled, map[string]string{
@ -195,11 +194,7 @@ func (manager *CaptureManagerCtx) Start() {
} }
manager.desktop.OnBeforeScreenSizeChange(func() { manager.desktop.OnBeforeScreenSizeChange(func() {
for _, video := range manager.videos { manager.video.destroyAll()
if video.Started() {
video.destroyPipeline()
}
}
if manager.broadcast.Started() { if manager.broadcast.Started() {
manager.broadcast.destroyPipeline() manager.broadcast.destroyPipeline()
@ -211,13 +206,9 @@ func (manager *CaptureManagerCtx) Start() {
}) })
manager.desktop.OnAfterScreenSizeChange(func() { manager.desktop.OnAfterScreenSizeChange(func() {
for _, video := range manager.videos { err := manager.video.recreateAll()
if video.Started() { if err != nil {
err := video.createPipeline() manager.logger.Panic().Err(err).Msg("unable to recreate video pipelines")
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
manager.logger.Panic().Err(err).Msg("unable to recreate video pipeline")
}
}
} }
if manager.broadcast.Started() { if manager.broadcast.Started() {
@ -243,10 +234,7 @@ func (manager *CaptureManagerCtx) Shutdown() error {
manager.screencast.shutdown() manager.screencast.shutdown()
manager.audio.shutdown() manager.audio.shutdown()
manager.video.shutdown()
for _, video := range manager.videos {
video.shutdown()
}
manager.webcam.shutdown() manager.webcam.shutdown()
manager.microphone.shutdown() manager.microphone.shutdown()
@ -266,13 +254,8 @@ func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager {
return manager.audio return manager.audio
} }
func (manager *CaptureManagerCtx) Video(videoID string) (types.StreamSinkManager, bool) { func (manager *CaptureManagerCtx) Video() types.BucketsManager {
video, ok := manager.videos[videoID] return manager.video
return video, ok
}
func (manager *CaptureManagerCtx) VideoIDs() []string {
return manager.videoIDs
} }
func (manager *CaptureManagerCtx) Webcam() types.StreamSrcManager { func (manager *CaptureManagerCtx) Webcam() types.StreamSrcManager {

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
"github.com/pion/interceptor"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -35,11 +36,37 @@ const keepAliveInterval = 2 * time.Second
const rtcpPLIInterval = 3 * time.Second const rtcpPLIInterval = 3 * time.Second
func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx { func New(desktop types.DesktopManager, capture types.CaptureManager, config *config.WebRTC) *WebRTCManagerCtx {
configuration := webrtc.Configuration{
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}
if !config.ICELite {
ICEServers := []webrtc.ICEServer{}
for _, server := range config.ICEServers {
var credential any
if server.Credential != "" {
credential = server.Credential
} else {
credential = false
}
ICEServers = append(ICEServers, webrtc.ICEServer{
URLs: server.URLs,
Username: server.Username,
Credential: credential,
})
}
configuration.ICEServers = ICEServers
}
return &WebRTCManagerCtx{ return &WebRTCManagerCtx{
logger: log.With().Str("module", "webrtc").Logger(), logger: log.With().Str("module", "webrtc").Logger(),
config: config, config: config,
metrics: newMetrics(), metrics: newMetrics(),
webrtcConfiguration: configuration,
desktop: desktop, desktop: desktop,
capture: capture, capture: capture,
curImage: cursor.NewImage(desktop), curImage: cursor.NewImage(desktop),
@ -58,6 +85,8 @@ type WebRTCManagerCtx struct {
curImage *cursor.ImageCtx curImage *cursor.ImageCtx
curPosition *cursor.PositionCtx curPosition *cursor.PositionCtx
webrtcConfiguration webrtc.Configuration
tcpMux ice.TCPMux tcpMux ice.TCPMux
udpMux ice.UDPMux udpMux ice.UDPMux
@ -121,6 +150,70 @@ func (manager *WebRTCManagerCtx) ICEServers() []types.ICEServer {
return manager.config.ICEServers return manager.config.ICEServers
} }
func (manager *WebRTCManagerCtx) newPeerConnection(codecs []codec.RTPCodec, logger zerolog.Logger) (*webrtc.PeerConnection, error) {
// create media engine
engine := &webrtc.MediaEngine{}
for _, codec := range codecs {
if err := codec.Register(engine); err != nil {
return nil, err
}
}
// create setting engine
settings := webrtc.SettingEngine{
LoggerFactory: pionlog.New(logger),
}
settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost)
settings.SetLite(manager.config.ICELite)
var networkType []webrtc.NetworkType
// udp candidates
if manager.udpMux != nil {
settings.SetICEUDPMux(manager.udpMux)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
} else if manager.config.EphemeralMax != 0 {
_ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
}
// tcp candidates
if manager.tcpMux != nil {
settings.SetICETCPMux(manager.tcpMux)
networkType = append(networkType,
webrtc.NetworkTypeTCP4,
webrtc.NetworkTypeTCP6,
)
}
// enable support for TCP and UDP ICE candidates
settings.SetNetworkTypes(networkType)
// create interceptor registry
registry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil {
return nil, err
}
// create new API
api := webrtc.NewAPI(
webrtc.WithMediaEngine(engine),
webrtc.WithSettingEngine(settings),
webrtc.WithInterceptorRegistry(registry),
)
// create new peer connection
return api.NewPeerConnection(manager.webrtcConfiguration)
}
func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID string) (*webrtc.SessionDescription, error) { func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID string) (*webrtc.SessionDescription, error) {
id := atomic.AddInt32(&manager.peerId, 1) id := atomic.AddInt32(&manager.peerId, 1)
manager.metrics.NewConnection(session) manager.metrics.NewConnection(session)
@ -130,19 +223,16 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin
logger.Info().Msg("creating webrtc peer") logger.Info().Msg("creating webrtc peer")
// all audios must have the same codec // all audios must have the same codec
audioStream := manager.capture.Audio() audio := manager.capture.Audio()
audioCodec := audio.Codec()
// all videos must have the same codec // all videos must have the same codec
videoStream, ok := manager.capture.Video(videoID) video := manager.capture.Video()
if !ok { videoCodec := video.Codec()
return nil, types.ErrWebRTCVideoNotFound
}
manager.metrics.SetVideoID(session, videoID)
connection, err := manager.newPeerConnection([]codec.RTPCodec{ connection, err := manager.newPeerConnection([]codec.RTPCodec{
audioStream.Codec(), audioCodec,
videoStream.Codec(), videoCodec,
}, logger) }, logger)
if err != nil { if err != nil {
return nil, err return nil, err
@ -166,26 +256,37 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin
// audio track // audio track
audioTrack, err := manager.newPeerStreamTrack(audioStream, logger) audioTrack, err := NewTrack(logger, audioCodec, connection)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := audioTrack.AddToConnection(connection); err != nil { // set stream for audio track
err = audioTrack.SetStream(audio)
if err != nil {
return nil, err return nil, err
} }
// video track // video track
videoTrack, err := manager.newPeerStreamTrack(videoStream, logger) videoTrack, err := NewTrack(logger, videoCodec, connection)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := videoTrack.AddToConnection(connection); err != nil { // let video stream bucket manager handle stream subscriptions
err = video.SetReceiver(videoTrack)
if err != nil {
return nil, err return nil, err
} }
// set default video id
err = videoTrack.SetVideoID(videoID)
if err != nil {
return nil, err
}
manager.metrics.SetVideoID(session, videoID)
// data channel // data channel
dataChannel, err := connection.CreateDataChannel("data", nil) dataChannel, err := connection.CreateDataChannel("data", nil)
@ -198,13 +299,12 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin
connection: connection, connection: connection,
dataChannel: dataChannel, dataChannel: dataChannel,
changeVideo: func(videoID string) error { changeVideo: func(videoID string) error {
videoStream, ok := manager.capture.Video(videoID) if err := videoTrack.SetVideoID(videoID); err != nil {
if !ok { return err
return types.ErrWebRTCVideoNotFound
} }
manager.metrics.SetVideoID(session, videoID) manager.metrics.SetVideoID(session, videoID)
return videoTrack.SetStream(videoStream) return nil
}, },
setPaused: func(isPaused bool) { setPaused: func(isPaused bool) {
videoTrack.SetPaused(isPaused) videoTrack.SetPaused(isPaused)
@ -318,7 +418,7 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, videoID strin
connection.Close() connection.Close()
case webrtc.PeerConnectionStateClosed: case webrtc.PeerConnectionStateClosed:
session.SetWebRTCConnected(peer, false) session.SetWebRTCConnected(peer, false)
videoTrack.RemoveStream() video.RemoveReceiver(videoTrack)
audioTrack.RemoveStream() audioTrack.RemoveStream()
} }

View File

@ -1,11 +1,14 @@
package webrtc package webrtc
import ( import (
"bytes"
"encoding/binary"
"sync" "sync"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/demodesk/neko/internal/webrtc/payload"
"github.com/demodesk/neko/pkg/types" "github.com/demodesk/neko/pkg/types"
) )
@ -136,6 +139,63 @@ func (peer *WebRTCPeerCtx) SetPaused(isPaused bool) error {
return nil return nil
} }
func (peer *WebRTCPeerCtx) SendCursorPosition(x, y int) error {
peer.mu.Lock()
defer peer.mu.Unlock()
if peer.dataChannel == nil {
return types.ErrWebRTCDataChannelNotFound
}
data := payload.CursorPosition{
Header: payload.Header{
Event: payload.OP_CURSOR_POSITION,
Length: 7,
},
X: uint16(x),
Y: uint16(y),
}
buffer := &bytes.Buffer{}
if err := binary.Write(buffer, binary.BigEndian, data); err != nil {
return err
}
return peer.dataChannel.Send(buffer.Bytes())
}
func (peer *WebRTCPeerCtx) SendCursorImage(cur *types.CursorImage, img []byte) error {
peer.mu.Lock()
defer peer.mu.Unlock()
if peer.dataChannel == nil {
return types.ErrWebRTCDataChannelNotFound
}
data := payload.CursorImage{
Header: payload.Header{
Event: payload.OP_CURSOR_IMAGE,
Length: uint16(11 + len(img)),
},
Width: cur.Width,
Height: cur.Height,
Xhot: cur.Xhot,
Yhot: cur.Yhot,
}
buffer := &bytes.Buffer{}
if err := binary.Write(buffer, binary.BigEndian, data); err != nil {
return err
}
if err := binary.Write(buffer, binary.BigEndian, img); err != nil {
return err
}
return peer.dataChannel.Send(buffer.Bytes())
}
func (peer *WebRTCPeerCtx) Destroy() { func (peer *WebRTCPeerCtx) Destroy() {
peer.mu.Lock() peer.mu.Lock()
defer peer.mu.Unlock() defer peer.mu.Unlock()

View File

@ -1,104 +0,0 @@
package webrtc
import (
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
"github.com/rs/zerolog"
"github.com/demodesk/neko/internal/webrtc/pionlog"
"github.com/demodesk/neko/pkg/types/codec"
)
func (manager *WebRTCManagerCtx) newPeerConnection(codecs []codec.RTPCodec, logger zerolog.Logger) (*webrtc.PeerConnection, error) {
// create media engine
engine := &webrtc.MediaEngine{}
for _, codec := range codecs {
if err := codec.Register(engine); err != nil {
return nil, err
}
}
// create setting engine
settings := webrtc.SettingEngine{
LoggerFactory: pionlog.New(logger),
}
settings.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
settings.SetNAT1To1IPs(manager.config.NAT1To1IPs, webrtc.ICECandidateTypeHost)
settings.SetLite(manager.config.ICELite)
var networkType []webrtc.NetworkType
// udp candidates
if manager.udpMux != nil {
settings.SetICEUDPMux(manager.udpMux)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
} else if manager.config.EphemeralMax != 0 {
_ = settings.SetEphemeralUDPPortRange(manager.config.EphemeralMin, manager.config.EphemeralMax)
networkType = append(networkType,
webrtc.NetworkTypeUDP4,
webrtc.NetworkTypeUDP6,
)
}
// tcp candidates
if manager.tcpMux != nil {
settings.SetICETCPMux(manager.tcpMux)
networkType = append(networkType,
webrtc.NetworkTypeTCP4,
webrtc.NetworkTypeTCP6,
)
}
// enable support for TCP and UDP ICE candidates
settings.SetNetworkTypes(networkType)
// create interceptor registry
registry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(engine, registry); err != nil {
return nil, err
}
// create new API
api := webrtc.NewAPI(
webrtc.WithMediaEngine(engine),
webrtc.WithSettingEngine(settings),
webrtc.WithInterceptorRegistry(registry),
)
// create new peer connection
configuration := manager.peerConfiguration()
return api.NewPeerConnection(configuration)
}
func (manager *WebRTCManagerCtx) peerConfiguration() webrtc.Configuration {
if manager.config.ICELite {
return webrtc.Configuration{
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}
}
ICEServers := []webrtc.ICEServer{}
for _, server := range manager.config.ICEServers {
var credential any
if server.Credential != "" {
credential = server.Credential
} else {
credential = false
}
ICEServers = append(ICEServers, webrtc.ICEServer{
URLs: server.URLs,
Username: server.Username,
Credential: credential,
})
}
return webrtc.Configuration{
ICEServers: ICEServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}
}

View File

@ -1,137 +0,0 @@
package webrtc
import (
"errors"
"io"
"sync"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/rs/zerolog"
"github.com/demodesk/neko/pkg/types"
)
func (manager *WebRTCManagerCtx) newPeerStreamTrack(stream types.StreamSinkManager, logger zerolog.Logger) (*PeerStreamTrack, error) {
codec := stream.Codec()
id := codec.Type.String()
track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream")
if err != nil {
return nil, err
}
logger = logger.With().Str("id", id).Logger()
peer := &PeerStreamTrack{
logger: logger,
track: track,
}
peer.listener = func(sample types.Sample) {
if peer.paused {
return
}
err := track.WriteSample(media.Sample(sample))
if err != nil && errors.Is(err, io.ErrClosedPipe) {
logger.Warn().Err(err).Msg("pipeline failed to write")
}
}
err = peer.SetStream(stream)
return peer, err
}
type PeerStreamTrack struct {
logger zerolog.Logger
track *webrtc.TrackLocalStaticSample
paused bool
listener func(sample types.Sample)
stream types.StreamSinkManager
streamMu sync.Mutex
onRtcp func(rtcp.Packet)
onRtcpMu sync.RWMutex
}
func (peer *PeerStreamTrack) SetStream(stream types.StreamSinkManager) error {
peer.streamMu.Lock()
defer peer.streamMu.Unlock()
var err error
if peer.stream != nil {
err = peer.stream.MoveListenerTo(&peer.listener, stream)
} else {
err = stream.AddListener(&peer.listener)
}
if err == nil {
peer.stream = stream
}
return err
}
func (peer *PeerStreamTrack) RemoveStream() {
peer.streamMu.Lock()
defer peer.streamMu.Unlock()
if peer.stream != nil {
_ = peer.stream.RemoveListener(&peer.listener)
peer.stream = nil
}
}
func (peer *PeerStreamTrack) AddToConnection(connection *webrtc.PeerConnection) error {
sender, err := connection.AddTrack(peer.track)
if err != nil {
return err
}
go func() {
rtcpBuf := make([]byte, 1500)
for {
n, _, err := sender.Read(rtcpBuf)
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
peer.logger.Err(err).Msg("RTCP read error")
continue
}
packets, err := rtcp.Unmarshal(rtcpBuf[:n])
if err != nil {
peer.logger.Err(err).Msg("RTCP unmarshal error")
continue
}
peer.onRtcpMu.RLock()
handler := peer.onRtcp
peer.onRtcpMu.RUnlock()
for _, packet := range packets {
if handler != nil {
go handler(packet)
}
}
}
}()
return nil
}
func (peer *PeerStreamTrack) SetPaused(paused bool) {
peer.paused = paused
}
func (peer *PeerStreamTrack) OnRTCP(f func(rtcp.Packet)) {
peer.onRtcpMu.Lock()
defer peer.onRtcpMu.Unlock()
peer.onRtcp = f
}

View File

@ -1,66 +0,0 @@
package webrtc
import (
"bytes"
"encoding/binary"
"github.com/demodesk/neko/internal/webrtc/payload"
"github.com/demodesk/neko/pkg/types"
)
func (peer *WebRTCPeerCtx) SendCursorPosition(x, y int) error {
peer.mu.Lock()
defer peer.mu.Unlock()
if peer.dataChannel == nil {
return types.ErrWebRTCDataChannelNotFound
}
data := payload.CursorPosition{
Header: payload.Header{
Event: payload.OP_CURSOR_POSITION,
Length: 7,
},
X: uint16(x),
Y: uint16(y),
}
buffer := &bytes.Buffer{}
if err := binary.Write(buffer, binary.BigEndian, data); err != nil {
return err
}
return peer.dataChannel.Send(buffer.Bytes())
}
func (peer *WebRTCPeerCtx) SendCursorImage(cur *types.CursorImage, img []byte) error {
peer.mu.Lock()
defer peer.mu.Unlock()
if peer.dataChannel == nil {
return types.ErrWebRTCDataChannelNotFound
}
data := payload.CursorImage{
Header: payload.Header{
Event: payload.OP_CURSOR_IMAGE,
Length: uint16(11 + len(img)),
},
Width: cur.Width,
Height: cur.Height,
Xhot: cur.Xhot,
Yhot: cur.Yhot,
}
buffer := &bytes.Buffer{}
if err := binary.Write(buffer, binary.BigEndian, data); err != nil {
return err
}
if err := binary.Write(buffer, binary.BigEndian, img); err != nil {
return err
}
return peer.dataChannel.Send(buffer.Bytes())
}

148
internal/webrtc/track.go Normal file
View File

@ -0,0 +1,148 @@
package webrtc
import (
"errors"
"fmt"
"io"
"sync"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/rs/zerolog"
"github.com/demodesk/neko/pkg/types"
"github.com/demodesk/neko/pkg/types/codec"
)
type Track struct {
logger zerolog.Logger
track *webrtc.TrackLocalStaticSample
paused bool
listener func(sample types.Sample)
stream types.StreamSinkManager
streamMu sync.Mutex
onRtcp func(rtcp.Packet)
onRtcpMu sync.RWMutex
videoIdChange func(string) error
}
func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.PeerConnection) (*Track, error) {
id := codec.Type.String()
track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream")
if err != nil {
return nil, err
}
logger = logger.With().Str("id", id).Logger()
t := &Track{
logger: logger,
track: track,
}
t.listener = func(sample types.Sample) {
if t.paused {
return
}
err := track.WriteSample(media.Sample(sample))
if err != nil && errors.Is(err, io.ErrClosedPipe) {
logger.Warn().Err(err).Msg("pipeline failed to write")
}
}
sender, err := connection.AddTrack(t.track)
if err != nil {
return nil, err
}
go t.rtcpReader(sender)
return t, nil
}
func (t *Track) rtcpReader(sender *webrtc.RTPSender) {
rtcpBuf := make([]byte, 1500)
for {
n, _, err := sender.Read(rtcpBuf)
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
t.logger.Err(err).Msg("RTCP read error")
continue
}
packets, err := rtcp.Unmarshal(rtcpBuf[:n])
if err != nil {
t.logger.Err(err).Msg("RTCP unmarshal error")
continue
}
t.onRtcpMu.RLock()
handler := t.onRtcp
t.onRtcpMu.RUnlock()
for _, packet := range packets {
if handler != nil {
go handler(packet)
}
}
}
}
func (t *Track) SetStream(stream types.StreamSinkManager) error {
t.streamMu.Lock()
defer t.streamMu.Unlock()
var err error
if t.stream != nil {
err = t.stream.MoveListenerTo(&t.listener, stream)
} else {
err = stream.AddListener(&t.listener)
}
if err == nil {
t.stream = stream
}
return err
}
func (t *Track) RemoveStream() {
t.streamMu.Lock()
defer t.streamMu.Unlock()
if t.stream != nil {
_ = t.stream.RemoveListener(&t.listener)
t.stream = nil
}
}
func (t *Track) SetPaused(paused bool) {
t.paused = paused
}
func (t *Track) OnRTCP(f func(rtcp.Packet)) {
t.onRtcpMu.Lock()
defer t.onRtcpMu.Unlock()
t.onRtcp = f
}
func (t *Track) SetVideoID(videoID string) error {
if t.videoIdChange == nil {
return fmt.Errorf("video id change not supported")
}
return t.videoIdChange(videoID)
}
func (t *Track) OnVideoIdChange(f func(string) error) {
t.videoIdChange = f
}

View File

@ -15,7 +15,7 @@ func (h *MessageHandlerCtx) signalRequest(session types.Session, payload *messag
// use default first video, if not provided // use default first video, if not provided
if payload.Video == "" { if payload.Video == "" {
videos := h.capture.VideoIDs() videos := h.capture.Video().IDs()
payload.Video = videos[0] payload.Video = videos[0]
} }

View File

@ -49,7 +49,7 @@ func (h *MessageHandlerCtx) systemInit(session types.Session) error {
Settings: h.sessions.Settings(), Settings: h.sessions.Settings(),
ScreencastEnabled: h.capture.Screencast().Enabled(), ScreencastEnabled: h.capture.Screencast().Enabled(),
WebRTC: message.SystemWebRTC{ WebRTC: message.SystemWebRTC{
Videos: h.capture.VideoIDs(), Videos: h.capture.Video().IDs(),
}, },
}) })

View File

@ -19,6 +19,19 @@ var (
type Sample media.Sample type Sample media.Sample
type Receiver interface {
SetStream(stream StreamSinkManager) error
RemoveStream()
OnVideoIdChange(f func(string) error)
}
type BucketsManager interface {
IDs() []string
Codec() codec.RTPCodec
SetReceiver(receiver Receiver) error
RemoveReceiver(receiver Receiver) error
}
type BroadcastManager interface { type BroadcastManager interface {
Start(url string) error Start(url string) error
Stop() Stop()
@ -60,8 +73,7 @@ type CaptureManager interface {
Broadcast() BroadcastManager Broadcast() BroadcastManager
Screencast() ScreencastManager Screencast() ScreencastManager
Audio() StreamSinkManager Audio() StreamSinkManager
Video(videoID string) (StreamSinkManager, bool) Video() BucketsManager
VideoIDs() []string
Webcam() StreamSrcManager Webcam() StreamSrcManager
Microphone() StreamSrcManager Microphone() StreamSrcManager