Automatic SDP negotiation (#103)

This commit is contained in:
Miroslav Šedivý 2021-12-02 23:43:36 +01:00
parent a8542bc222
commit fed6ddbd4e
13 changed files with 232 additions and 47 deletions

View File

@ -2,7 +2,14 @@ import EventEmitter from 'eventemitter3'
import { OPCODE } from './data'
import { EVENT, WebSocketEvents } from './events'
import { WebSocketMessages, WebSocketPayloads, SignalProvidePayload, SignalCandidatePayload } from './messages'
import {
WebSocketMessages,
WebSocketPayloads,
SignalProvidePayload,
SignalCandidatePayload,
SignalOfferPayload,
SignalAnswerMessage,
} from './messages'
export interface BaseEvents {
info: (...message: any[]) => void
@ -180,7 +187,7 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
this._ws!.send(JSON.stringify({ event, ...payload }))
}
public async createPeer(sdp: string, lite: boolean, servers: RTCIceServer[]) {
public async createPeer(lite: boolean, servers: RTCIceServer[]) {
this.emit('debug', `creating peer`)
if (!this.socketOpen) {
this.emit(
@ -243,13 +250,32 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
}
this._peer.ontrack = this.onTrack.bind(this)
this._peer.addTransceiver('audio', { direction: 'recvonly' })
this._peer.addTransceiver('video', { direction: 'recvonly' })
this._peer.onnegotiationneeded = async () => {
this.emit('warn', `negotiation is needed`)
const d = await this._peer!.createOffer()
this._peer!.setLocalDescription(d)
this._ws!.send(
JSON.stringify({
event: EVENT.SIGNAL.OFFER,
sdp: d.sdp,
}),
)
}
this._channel = this._peer.createDataChannel('data')
this._channel.onerror = this.onError.bind(this)
this._channel.onmessage = this.onData.bind(this)
this._channel.onclose = this.onDisconnected.bind(this, new Error('peer data channel closed'))
}
public async setRemoteOffer(sdp: string) {
if (!this._peer) {
this.emit('warn', `attempting to set remote offer while disconnected`)
return
}
this._peer.setRemoteDescription({ type: 'offer', sdp })
@ -274,7 +300,16 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
}
}
private onMessage(e: MessageEvent) {
public async setRemoteAnswer(sdp: string) {
if (!this._peer) {
this.emit('warn', `attempting to set remote answer while disconnected`)
return
}
this._peer.setRemoteDescription({ type: 'answer', sdp })
}
private async onMessage(e: MessageEvent) {
const { event, ...payload } = JSON.parse(e.data) as WebSocketMessages
this.emit('debug', `received websocket event ${event} ${payload ? `with payload: ` : ''}`, payload)
@ -282,7 +317,20 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
if (event === EVENT.SIGNAL.PROVIDE) {
const { sdp, lite, ice, id } = payload as SignalProvidePayload
this._id = id
this.createPeer(sdp, lite, ice)
await this.createPeer(lite, ice)
await this.setRemoteOffer(sdp)
return
}
if (event === EVENT.SIGNAL.OFFER) {
const { sdp } = payload as SignalOfferPayload
await this.setRemoteOffer(sdp)
return
}
if (event === EVENT.SIGNAL.ANSWER) {
const { sdp } = payload as SignalAnswerMessage
await this.setRemoteAnswer(sdp)
return
}

View File

@ -14,6 +14,7 @@ export const EVENT = {
ERROR: 'system/error',
},
SIGNAL: {
OFFER: 'signal/offer',
ANSWER: 'signal/answer',
PROVIDE: 'signal/provide',
CANDIDATE: 'signal/candidate',
@ -81,7 +82,13 @@ export type ControlEvents =
export type SystemEvents = typeof EVENT.SYSTEM.DISCONNECT
export type MemberEvents = typeof EVENT.MEMBER.LIST | typeof EVENT.MEMBER.CONNECTED | typeof EVENT.MEMBER.DISCONNECTED
export type SignalEvents = typeof EVENT.SIGNAL.ANSWER | typeof EVENT.SIGNAL.PROVIDE | typeof EVENT.SIGNAL.CANDIDATE
export type SignalEvents =
| typeof EVENT.SIGNAL.OFFER
| typeof EVENT.SIGNAL.ANSWER
| typeof EVENT.SIGNAL.PROVIDE
| typeof EVENT.SIGNAL.CANDIDATE
export type ChatEvents = typeof EVENT.CHAT.MESSAGE | typeof EVENT.CHAT.EMOTE
export type ScreenEvents = typeof EVENT.SCREEN.CONFIGURATIONS | typeof EVENT.SCREEN.RESOLUTION | typeof EVENT.SCREEN.SET

View File

@ -14,6 +14,7 @@ import { Member, ScreenConfigurations, ScreenResolution } from './types'
export type WebSocketMessages =
| WebSocketMessage
| SignalProvideMessage
| SignalOfferMessage
| SignalAnswerMessage
| SignalCandidateMessage
| MemberListMessage
@ -26,6 +27,7 @@ export type WebSocketMessages =
export type WebSocketPayloads =
| SignalProvidePayload
| SignalOfferPayload
| SignalAnswerPayload
| SignalCandidatePayload
| MemberListPayload
@ -74,6 +76,14 @@ export interface SignalProvidePayload {
sdp: string
}
// signal/offer
export interface SignalOfferMessage extends WebSocketMessage, SignalOfferPayload {
event: typeof EVENT.SIGNAL.OFFER
}
export interface SignalOfferPayload {
sdp: string
}
// signal/answer
export interface SignalAnswerMessage extends WebSocketMessage, SignalAnswerPayload {
event: typeof EVENT.SIGNAL.ANSWER

View File

@ -1,5 +1,10 @@
# Changelog
## master branch
### Misc
- Automatic WebRTC SDP negotiation using onnegotiationneeded handlers. This allows adding/removing track on demand in a session.
## [n.eko v2.5](https://github.com/m1k1o/neko/releases/tag/v2.5)
### Bugs

View File

@ -104,11 +104,50 @@ func (session *Session) Send(v interface{}) error {
return session.socket.Send(v)
}
func (session *Session) SignalAnswer(sdp string) error {
func (session *Session) SignalLocalOffer(sdp string) error {
if session.peer == nil {
return nil
}
return session.peer.SignalAnswer(sdp)
session.logger.Info().Msg("signal update - LocalOffer")
return session.socket.Send(&message.SignalOffer{
Event: event.SIGNAL_OFFER,
SDP: sdp,
})
}
func (session *Session) SignalLocalAnswer(sdp string) error {
if session.peer == nil {
return nil
}
session.logger.Info().Msg("signal update - LocalAnswer")
return session.socket.Send(&message.SignalAnswer{
Event: event.SIGNAL_ANSWER,
SDP: sdp,
})
}
func (session *Session) SignalRemoteOffer(sdp string) error {
if session.peer == nil {
return nil
}
if err := session.peer.SetOffer(sdp); err != nil {
return err
}
sdp, err := session.peer.CreateAnswer()
if err != nil {
return err
}
session.logger.Info().Msg("signal update - RemoteOffer")
return session.SignalLocalAnswer(sdp)
}
func (session *Session) SignalRemoteAnswer(sdp string) error {
if session.peer == nil {
return nil
}
session.logger.Info().Msg("signal update - RemoteAnswer")
return session.peer.SetAnswer(sdp)
}
func (session *Session) SignalCandidate(data string) error {

View File

@ -6,8 +6,8 @@ const (
)
const (
SIGNAL_ANSWER = "signal/answer"
SIGNAL_OFFER = "signal/offer"
SIGNAL_ANSWER = "signal/answer"
SIGNAL_PROVIDE = "signal/provide"
SIGNAL_CANDIDATE = "signal/candidate"
)

View File

@ -24,6 +24,11 @@ type SignalProvide struct {
ICE []webrtc.ICEServer `json:"ice"`
}
type SignalOffer struct {
Event string `json:"event"`
SDP string `json:"sdp"`
}
type SignalAnswer struct {
Event string `json:"event"`
DisplayName string `json:"displayname"`

View File

@ -22,7 +22,10 @@ type Session interface {
Address() string
Kick(message string) error
Send(v interface{}) error
SignalAnswer(sdp string) error
SignalLocalOffer(sdp string) error
SignalLocalAnswer(sdp string) error
SignalRemoteOffer(sdp string) error
SignalRemoteAnswer(sdp string) error
SignalCandidate(data string) error
}

View File

@ -10,11 +10,16 @@ type Sample media.Sample
type WebRTCManager interface {
Start()
Shutdown() error
CreatePeer(id string, session Session) (string, bool, []webrtc.ICEServer, error)
CreatePeer(id string, session Session) (Peer, error)
ICELite() bool
ICEServers() []webrtc.ICEServer
}
type Peer interface {
SignalAnswer(sdp string) error
CreateOffer() (string, error)
CreateAnswer() (string, error)
SetOffer(sdp string) error
SetAnswer(sdp string) error
WriteData(v interface{}) error
Destroy() error
}

View File

@ -17,7 +17,39 @@ type Peer struct {
mu sync.Mutex
}
func (peer *Peer) SignalAnswer(sdp string) error {
func (peer *Peer) CreateOffer() (string, error) {
desc, err := peer.connection.CreateOffer(nil)
if err != nil {
return "", err
}
err = peer.connection.SetLocalDescription(desc)
if err != nil {
return "", err
}
return desc.SDP, nil
}
func (peer *Peer) CreateAnswer() (string, error) {
desc, err := peer.connection.CreateAnswer(nil)
if err != nil {
return "", err
}
err = peer.connection.SetLocalDescription(desc)
if err != nil {
return "", nil
}
return desc.SDP, nil
}
func (peer *Peer) SetOffer(sdp string) error {
return peer.connection.SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer})
}
func (peer *Peer) SetAnswer(sdp string) error {
return peer.connection.SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer})
}

View File

@ -74,7 +74,7 @@ func (manager *WebRTCManager) Shutdown() error {
return nil
}
func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (string, bool, []webrtc.ICEServer, error) {
func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (types.Peer, error) {
configuration := &webrtc.Configuration{
ICEServers: manager.config.ICEServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
@ -106,7 +106,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(&engine, i); err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
return nil, err
}
// Create API with MediaEngine and SettingEngine
@ -115,7 +115,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
// Create new peer connection
connection, err := api.NewPeerConnection(*configuration)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
return nil, err
}
negotiated := true
@ -123,7 +123,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
Negotiated: &negotiated,
})
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
return nil, err
}
connection.OnDataChannel(func(d *webrtc.DataChannel) {
@ -144,22 +144,12 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
rtpVideo, err := connection.AddTrack(manager.videoTrack)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
return nil, err
}
rtpAudio, err := connection.AddTrack(manager.audioTrack)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
description, err := connection.CreateOffer(nil)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
}
err = connection.SetLocalDescription(description)
if err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
return nil, err
}
connection.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
@ -182,6 +172,32 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
}
})
peer := &Peer{
id: id,
api: api,
engine: &engine,
manager: manager,
settings: &settings,
connection: connection,
configuration: configuration,
}
connection.OnNegotiationNeeded(func() {
manager.logger.Warn().Msg("negotiation is needed")
sdp, err := peer.CreateOffer()
if err != nil {
manager.logger.Err(err).Msg("creating offer failed")
return
}
err = session.SignalLocalOffer(sdp)
if err != nil {
manager.logger.Warn().Err(err).Msg("sending SignalLocalOffer failed")
return
}
})
connection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
manager.logger.Info().Msg("sent all ICECandidates")
@ -200,16 +216,8 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
}
})
if err := session.SetPeer(&Peer{
id: id,
api: api,
engine: &engine,
manager: manager,
settings: &settings,
connection: connection,
configuration: configuration,
}); err != nil {
return "", manager.config.ICELite, manager.config.ICEServers, err
if err := session.SetPeer(peer); err != nil {
return nil, err
}
go func() {
@ -230,7 +238,15 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri
}
}()
return description.SDP, manager.config.ICELite, manager.config.ICEServers, nil
return peer, nil
}
func (manager *WebRTCManager) ICELite() bool {
return manager.config.ICELite
}
func (manager *WebRTCManager) ICEServers() []webrtc.ICEServer {
return manager.config.ICEServers
}
func (manager *WebRTCManager) createTrack(codecName string) (*webrtc.TrackLocalStaticSample, webrtc.RTPCodecParameters, error) {

View File

@ -60,11 +60,17 @@ func (h *MessageHandler) Message(id string, raw []byte) error {
switch header.Event {
// Signal Events
case event.SIGNAL_OFFER:
payload := &message.SignalOffer{}
return errors.Wrapf(
utils.Unmarshal(payload, raw, func() error {
return h.signalRemoteOffer(id, session, payload)
}), "%s failed", header.Event)
case event.SIGNAL_ANSWER:
payload := &message.SignalAnswer{}
return errors.Wrapf(
utils.Unmarshal(payload, raw, func() error {
return h.signalAnswer(id, session, payload)
return h.signalRemoteAnswer(id, session, payload)
}), "%s failed", header.Event)
// Control Events

View File

@ -7,7 +7,12 @@ import (
)
func (h *MessageHandler) signalProvide(id string, session types.Session) error {
sdp, lite, ice, err := h.webrtc.CreatePeer(id, session)
peer, err := h.webrtc.CreatePeer(id, session)
if err != nil {
return err
}
sdp, err := peer.CreateOffer()
if err != nil {
return err
}
@ -16,8 +21,8 @@ func (h *MessageHandler) signalProvide(id string, session types.Session) error {
Event: event.SIGNAL_PROVIDE,
ID: id,
SDP: sdp,
Lite: lite,
ICE: ice,
Lite: h.webrtc.ICELite(),
ICE: h.webrtc.ICEServers(),
}); err != nil {
return err
}
@ -25,12 +30,16 @@ func (h *MessageHandler) signalProvide(id string, session types.Session) error {
return nil
}
func (h *MessageHandler) signalAnswer(id string, session types.Session, payload *message.SignalAnswer) error {
func (h *MessageHandler) signalRemoteOffer(id string, session types.Session, payload *message.SignalOffer) error {
return session.SignalRemoteOffer(payload.SDP)
}
func (h *MessageHandler) signalRemoteAnswer(id string, session types.Session, payload *message.SignalAnswer) error {
if err := session.SetName(payload.DisplayName); err != nil {
return err
}
if err := session.SignalAnswer(payload.SDP); err != nil {
if err := session.SignalRemoteAnswer(payload.SDP); err != nil {
return err
}