2021-09-27 11:50:49 +13:00
|
|
|
package webrtc
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"io"
|
|
|
|
"sync"
|
|
|
|
|
2022-07-04 10:38:46 +12:00
|
|
|
"github.com/pion/rtcp"
|
2021-09-27 11:50:49 +13:00
|
|
|
"github.com/pion/webrtc/v3"
|
|
|
|
"github.com/pion/webrtc/v3/pkg/media"
|
|
|
|
"github.com/rs/zerolog"
|
2022-03-20 23:27:41 +13:00
|
|
|
|
2022-07-14 10:58:22 +12:00
|
|
|
"github.com/demodesk/neko/pkg/types"
|
2021-09-27 11:50:49 +13:00
|
|
|
)
|
|
|
|
|
2021-12-02 08:30:18 +13:00
|
|
|
func (manager *WebRTCManagerCtx) newPeerStreamTrack(stream types.StreamSinkManager, logger zerolog.Logger) (*PeerStreamTrack, error) {
|
2021-09-27 11:50:49 +13:00
|
|
|
codec := stream.Codec()
|
|
|
|
|
|
|
|
id := codec.Type.String()
|
|
|
|
track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream")
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
logger = logger.With().Str("id", id).Logger()
|
|
|
|
|
2021-10-01 11:02:54 +13:00
|
|
|
peer := &PeerStreamTrack{
|
2021-09-27 11:50:49 +13:00
|
|
|
logger: logger,
|
|
|
|
track: track,
|
2022-03-27 11:20:38 +13:00
|
|
|
}
|
|
|
|
|
|
|
|
peer.listener = func(sample types.Sample) {
|
|
|
|
if peer.paused {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
err := track.WriteSample(media.Sample(sample))
|
|
|
|
if err != nil && errors.Is(err, io.ErrClosedPipe) {
|
|
|
|
logger.Warn().Err(err).Msg("pipeline failed to write")
|
|
|
|
}
|
2021-09-27 11:50:49 +13:00
|
|
|
}
|
|
|
|
|
2021-10-02 00:46:10 +13:00
|
|
|
err = peer.SetStream(stream)
|
|
|
|
return peer, err
|
2021-09-27 11:50:49 +13:00
|
|
|
}
|
|
|
|
|
2021-10-01 11:02:54 +13:00
|
|
|
type PeerStreamTrack struct {
|
2021-09-27 11:50:49 +13:00
|
|
|
logger zerolog.Logger
|
|
|
|
track *webrtc.TrackLocalStaticSample
|
2022-03-27 11:20:38 +13:00
|
|
|
paused bool
|
2021-09-27 11:50:49 +13:00
|
|
|
listener func(sample types.Sample)
|
|
|
|
|
2021-12-02 08:30:18 +13:00
|
|
|
stream types.StreamSinkManager
|
2021-10-01 11:02:54 +13:00
|
|
|
streamMu sync.Mutex
|
2022-07-04 10:38:46 +12:00
|
|
|
|
|
|
|
onRtcp func(rtcp.Packet)
|
|
|
|
onRtcpMu sync.RWMutex
|
2021-09-27 11:50:49 +13:00
|
|
|
}
|
|
|
|
|
2021-12-02 08:30:18 +13:00
|
|
|
func (peer *PeerStreamTrack) SetStream(stream types.StreamSinkManager) error {
|
2021-09-27 11:50:49 +13:00
|
|
|
peer.streamMu.Lock()
|
|
|
|
defer peer.streamMu.Unlock()
|
|
|
|
|
2021-10-02 00:46:10 +13:00
|
|
|
var err error
|
2021-09-27 11:50:49 +13:00
|
|
|
if peer.stream != nil {
|
2021-10-02 00:46:10 +13:00
|
|
|
err = peer.stream.MoveListenerTo(&peer.listener, stream)
|
|
|
|
} else {
|
2021-10-03 00:51:22 +13:00
|
|
|
err = stream.AddListener(&peer.listener)
|
2021-09-27 11:50:49 +13:00
|
|
|
}
|
|
|
|
|
2021-10-03 00:51:22 +13:00
|
|
|
if err == nil {
|
2021-10-02 00:46:10 +13:00
|
|
|
peer.stream = stream
|
2021-09-29 12:03:39 +13:00
|
|
|
}
|
2021-09-27 11:50:49 +13:00
|
|
|
|
2021-10-02 00:46:10 +13:00
|
|
|
return err
|
2021-09-27 11:50:49 +13:00
|
|
|
}
|
|
|
|
|
2021-10-01 11:02:54 +13:00
|
|
|
func (peer *PeerStreamTrack) RemoveStream() {
|
2021-09-27 11:50:49 +13:00
|
|
|
peer.streamMu.Lock()
|
|
|
|
defer peer.streamMu.Unlock()
|
|
|
|
|
|
|
|
if peer.stream != nil {
|
2021-10-08 08:41:19 +13:00
|
|
|
_ = peer.stream.RemoveListener(&peer.listener)
|
2021-10-02 00:46:10 +13:00
|
|
|
peer.stream = nil
|
2021-09-27 11:50:49 +13:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-01 11:02:54 +13:00
|
|
|
func (peer *PeerStreamTrack) AddToConnection(connection *webrtc.PeerConnection) error {
|
2021-09-27 11:50:49 +13:00
|
|
|
sender, err := connection.AddTrack(peer.track)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
rtcpBuf := make([]byte, 1500)
|
|
|
|
for {
|
2022-07-04 10:38:46 +12:00
|
|
|
n, _, err := sender.Read(rtcpBuf)
|
|
|
|
if err != nil {
|
|
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
peer.logger.Err(err).Msg("RTCP read error")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
packets, err := rtcp.Unmarshal(rtcpBuf[:n])
|
|
|
|
if err != nil {
|
|
|
|
peer.logger.Err(err).Msg("RTCP unmarshal error")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
peer.onRtcpMu.RLock()
|
|
|
|
handler := peer.onRtcp
|
|
|
|
peer.onRtcpMu.RUnlock()
|
|
|
|
|
|
|
|
for _, packet := range packets {
|
|
|
|
if handler != nil {
|
|
|
|
go handler(packet)
|
|
|
|
}
|
2021-09-27 11:50:49 +13:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2022-03-27 11:20:38 +13:00
|
|
|
|
|
|
|
func (peer *PeerStreamTrack) SetPaused(paused bool) {
|
|
|
|
peer.paused = paused
|
|
|
|
}
|
2022-07-04 10:38:46 +12:00
|
|
|
|
|
|
|
func (peer *PeerStreamTrack) OnRTCP(f func(rtcp.Packet)) {
|
|
|
|
peer.onRtcpMu.Lock()
|
|
|
|
defer peer.onRtcpMu.Unlock()
|
|
|
|
|
|
|
|
peer.onRtcp = f
|
|
|
|
}
|