neko/internal/webrtc/peerstreamtrack.go

138 lines
2.6 KiB
Go
Raw Normal View History

2021-09-27 00:50:49 +02:00
package webrtc
import (
"errors"
"io"
"sync"
2022-07-04 00:38:46 +02:00
"github.com/pion/rtcp"
2021-09-27 00:50:49 +02:00
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/rs/zerolog"
"github.com/demodesk/neko/pkg/types"
2021-09-27 00:50:49 +02:00
)
2021-12-01 20:30:18 +01:00
func (manager *WebRTCManagerCtx) newPeerStreamTrack(stream types.StreamSinkManager, logger zerolog.Logger) (*PeerStreamTrack, error) {
2021-09-27 00:50:49 +02:00
codec := stream.Codec()
id := codec.Type.String()
track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream")
if err != nil {
return nil, err
}
logger = logger.With().Str("id", id).Logger()
2021-10-01 00:02:54 +02:00
peer := &PeerStreamTrack{
2021-09-27 00:50:49 +02:00
logger: logger,
track: track,
2022-03-26 23:20:38 +01:00
}
peer.listener = func(sample types.Sample) {
if peer.paused {
return
}
err := track.WriteSample(media.Sample(sample))
if err != nil && errors.Is(err, io.ErrClosedPipe) {
logger.Warn().Err(err).Msg("pipeline failed to write")
}
2021-09-27 00:50:49 +02:00
}
2021-10-01 13:46:10 +02:00
err = peer.SetStream(stream)
return peer, err
2021-09-27 00:50:49 +02:00
}
2021-10-01 00:02:54 +02:00
type PeerStreamTrack struct {
2021-09-27 00:50:49 +02:00
logger zerolog.Logger
track *webrtc.TrackLocalStaticSample
2022-03-26 23:20:38 +01:00
paused bool
2021-09-27 00:50:49 +02:00
listener func(sample types.Sample)
2021-12-01 20:30:18 +01:00
stream types.StreamSinkManager
2021-10-01 00:02:54 +02:00
streamMu sync.Mutex
2022-07-04 00:38:46 +02:00
onRtcp func(rtcp.Packet)
onRtcpMu sync.RWMutex
2021-09-27 00:50:49 +02:00
}
2021-12-01 20:30:18 +01:00
func (peer *PeerStreamTrack) SetStream(stream types.StreamSinkManager) error {
2021-09-27 00:50:49 +02:00
peer.streamMu.Lock()
defer peer.streamMu.Unlock()
2021-10-01 13:46:10 +02:00
var err error
2021-09-27 00:50:49 +02:00
if peer.stream != nil {
2021-10-01 13:46:10 +02:00
err = peer.stream.MoveListenerTo(&peer.listener, stream)
} else {
2021-10-02 13:51:22 +02:00
err = stream.AddListener(&peer.listener)
2021-09-27 00:50:49 +02:00
}
2021-10-02 13:51:22 +02:00
if err == nil {
2021-10-01 13:46:10 +02:00
peer.stream = stream
2021-09-29 01:03:39 +02:00
}
2021-09-27 00:50:49 +02:00
2021-10-01 13:46:10 +02:00
return err
2021-09-27 00:50:49 +02:00
}
2021-10-01 00:02:54 +02:00
func (peer *PeerStreamTrack) RemoveStream() {
2021-09-27 00:50:49 +02:00
peer.streamMu.Lock()
defer peer.streamMu.Unlock()
if peer.stream != nil {
2021-10-07 21:41:19 +02:00
_ = peer.stream.RemoveListener(&peer.listener)
2021-10-01 13:46:10 +02:00
peer.stream = nil
2021-09-27 00:50:49 +02:00
}
}
2021-10-01 00:02:54 +02:00
func (peer *PeerStreamTrack) AddToConnection(connection *webrtc.PeerConnection) error {
2021-09-27 00:50:49 +02:00
sender, err := connection.AddTrack(peer.track)
if err != nil {
return err
}
go func() {
rtcpBuf := make([]byte, 1500)
for {
2022-07-04 00:38:46 +02:00
n, _, err := sender.Read(rtcpBuf)
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
peer.logger.Err(err).Msg("RTCP read error")
continue
}
packets, err := rtcp.Unmarshal(rtcpBuf[:n])
if err != nil {
peer.logger.Err(err).Msg("RTCP unmarshal error")
continue
}
peer.onRtcpMu.RLock()
handler := peer.onRtcp
peer.onRtcpMu.RUnlock()
for _, packet := range packets {
if handler != nil {
go handler(packet)
}
2021-09-27 00:50:49 +02:00
}
}
}()
return nil
}
2022-03-26 23:20:38 +01:00
func (peer *PeerStreamTrack) SetPaused(paused bool) {
peer.paused = paused
}
2022-07-04 00:38:46 +02:00
func (peer *PeerStreamTrack) OnRTCP(f func(rtcp.Packet)) {
peer.onRtcpMu.Lock()
defer peer.onRtcpMu.Unlock()
peer.onRtcp = f
}