diff --git a/client/src/neko/base.ts b/client/src/neko/base.ts index 8de61736..31ec8add 100644 --- a/client/src/neko/base.ts +++ b/client/src/neko/base.ts @@ -2,7 +2,14 @@ import EventEmitter from 'eventemitter3' import { OPCODE } from './data' import { EVENT, WebSocketEvents } from './events' -import { WebSocketMessages, WebSocketPayloads, SignalProvidePayload, SignalCandidatePayload } from './messages' +import { + WebSocketMessages, + WebSocketPayloads, + SignalProvidePayload, + SignalCandidatePayload, + SignalOfferPayload, + SignalAnswerMessage, +} from './messages' export interface BaseEvents { info: (...message: any[]) => void @@ -180,7 +187,7 @@ export abstract class BaseClient extends EventEmitter { this._ws!.send(JSON.stringify({ event, ...payload })) } - public async createPeer(sdp: string, lite: boolean, servers: RTCIceServer[]) { + public async createPeer(lite: boolean, servers: RTCIceServer[]) { this.emit('debug', `creating peer`) if (!this.socketOpen) { this.emit( @@ -243,13 +250,32 @@ export abstract class BaseClient extends EventEmitter { } this._peer.ontrack = this.onTrack.bind(this) - this._peer.addTransceiver('audio', { direction: 'recvonly' }) - this._peer.addTransceiver('video', { direction: 'recvonly' }) + + this._peer.onnegotiationneeded = async () => { + this.emit('warn', `negotiation is needed`) + + const d = await this._peer!.createOffer() + this._peer!.setLocalDescription(d) + + this._ws!.send( + JSON.stringify({ + event: EVENT.SIGNAL.OFFER, + sdp: d.sdp, + }), + ) + } this._channel = this._peer.createDataChannel('data') this._channel.onerror = this.onError.bind(this) this._channel.onmessage = this.onData.bind(this) this._channel.onclose = this.onDisconnected.bind(this, new Error('peer data channel closed')) + } + + public async setRemoteOffer(sdp: string) { + if (!this._peer) { + this.emit('warn', `attempting to set remote offer while disconnected`) + return + } this._peer.setRemoteDescription({ type: 'offer', sdp }) @@ -274,7 +300,16 @@ export abstract class BaseClient extends EventEmitter { } } - private onMessage(e: MessageEvent) { + public async setRemoteAnswer(sdp: string) { + if (!this._peer) { + this.emit('warn', `attempting to set remote answer while disconnected`) + return + } + + this._peer.setRemoteDescription({ type: 'answer', sdp }) + } + + private async onMessage(e: MessageEvent) { const { event, ...payload } = JSON.parse(e.data) as WebSocketMessages this.emit('debug', `received websocket event ${event} ${payload ? `with payload: ` : ''}`, payload) @@ -282,7 +317,20 @@ export abstract class BaseClient extends EventEmitter { if (event === EVENT.SIGNAL.PROVIDE) { const { sdp, lite, ice, id } = payload as SignalProvidePayload this._id = id - this.createPeer(sdp, lite, ice) + await this.createPeer(lite, ice) + await this.setRemoteOffer(sdp) + return + } + + if (event === EVENT.SIGNAL.OFFER) { + const { sdp } = payload as SignalOfferPayload + await this.setRemoteOffer(sdp) + return + } + + if (event === EVENT.SIGNAL.ANSWER) { + const { sdp } = payload as SignalAnswerMessage + await this.setRemoteAnswer(sdp) return } diff --git a/client/src/neko/events.ts b/client/src/neko/events.ts index 5b43d6b5..a29f5805 100644 --- a/client/src/neko/events.ts +++ b/client/src/neko/events.ts @@ -14,6 +14,7 @@ export const EVENT = { ERROR: 'system/error', }, SIGNAL: { + OFFER: 'signal/offer', ANSWER: 'signal/answer', PROVIDE: 'signal/provide', CANDIDATE: 'signal/candidate', @@ -81,7 +82,13 @@ export type ControlEvents = export type SystemEvents = typeof EVENT.SYSTEM.DISCONNECT export type MemberEvents = typeof EVENT.MEMBER.LIST | typeof EVENT.MEMBER.CONNECTED | typeof EVENT.MEMBER.DISCONNECTED -export type SignalEvents = typeof EVENT.SIGNAL.ANSWER | typeof EVENT.SIGNAL.PROVIDE | typeof EVENT.SIGNAL.CANDIDATE + +export type SignalEvents = + | typeof EVENT.SIGNAL.OFFER + | typeof EVENT.SIGNAL.ANSWER + | typeof EVENT.SIGNAL.PROVIDE + | typeof EVENT.SIGNAL.CANDIDATE + export type ChatEvents = typeof EVENT.CHAT.MESSAGE | typeof EVENT.CHAT.EMOTE export type ScreenEvents = typeof EVENT.SCREEN.CONFIGURATIONS | typeof EVENT.SCREEN.RESOLUTION | typeof EVENT.SCREEN.SET diff --git a/client/src/neko/messages.ts b/client/src/neko/messages.ts index f9ad2b59..2fa31cd3 100644 --- a/client/src/neko/messages.ts +++ b/client/src/neko/messages.ts @@ -14,6 +14,7 @@ import { Member, ScreenConfigurations, ScreenResolution } from './types' export type WebSocketMessages = | WebSocketMessage | SignalProvideMessage + | SignalOfferMessage | SignalAnswerMessage | SignalCandidateMessage | MemberListMessage @@ -26,6 +27,7 @@ export type WebSocketMessages = export type WebSocketPayloads = | SignalProvidePayload + | SignalOfferPayload | SignalAnswerPayload | SignalCandidatePayload | MemberListPayload @@ -74,6 +76,14 @@ export interface SignalProvidePayload { sdp: string } +// signal/offer +export interface SignalOfferMessage extends WebSocketMessage, SignalOfferPayload { + event: typeof EVENT.SIGNAL.OFFER +} +export interface SignalOfferPayload { + sdp: string +} + // signal/answer export interface SignalAnswerMessage extends WebSocketMessage, SignalAnswerPayload { event: typeof EVENT.SIGNAL.ANSWER diff --git a/docs/changelog.md b/docs/changelog.md index 5ef0144c..2e166e32 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,10 @@ # Changelog +## master branch + +### Misc +- Automatic WebRTC SDP negotiation using onnegotiationneeded handlers. This allows adding/removing track on demand in a session. + ## [n.eko v2.5](https://github.com/m1k1o/neko/releases/tag/v2.5) ### Bugs diff --git a/server/internal/session/session.go b/server/internal/session/session.go index d699753a..ee4a0d9d 100644 --- a/server/internal/session/session.go +++ b/server/internal/session/session.go @@ -104,11 +104,50 @@ func (session *Session) Send(v interface{}) error { return session.socket.Send(v) } -func (session *Session) SignalAnswer(sdp string) error { +func (session *Session) SignalLocalOffer(sdp string) error { if session.peer == nil { return nil } - return session.peer.SignalAnswer(sdp) + session.logger.Info().Msg("signal update - LocalOffer") + return session.socket.Send(&message.SignalOffer{ + Event: event.SIGNAL_OFFER, + SDP: sdp, + }) +} + +func (session *Session) SignalLocalAnswer(sdp string) error { + if session.peer == nil { + return nil + } + + session.logger.Info().Msg("signal update - LocalAnswer") + return session.socket.Send(&message.SignalAnswer{ + Event: event.SIGNAL_ANSWER, + SDP: sdp, + }) +} + +func (session *Session) SignalRemoteOffer(sdp string) error { + if session.peer == nil { + return nil + } + if err := session.peer.SetOffer(sdp); err != nil { + return err + } + sdp, err := session.peer.CreateAnswer() + if err != nil { + return err + } + session.logger.Info().Msg("signal update - RemoteOffer") + return session.SignalLocalAnswer(sdp) +} + +func (session *Session) SignalRemoteAnswer(sdp string) error { + if session.peer == nil { + return nil + } + session.logger.Info().Msg("signal update - RemoteAnswer") + return session.peer.SetAnswer(sdp) } func (session *Session) SignalCandidate(data string) error { diff --git a/server/internal/types/event/events.go b/server/internal/types/event/events.go index 4b2e1875..c9c9ec15 100644 --- a/server/internal/types/event/events.go +++ b/server/internal/types/event/events.go @@ -6,8 +6,8 @@ const ( ) const ( - SIGNAL_ANSWER = "signal/answer" SIGNAL_OFFER = "signal/offer" + SIGNAL_ANSWER = "signal/answer" SIGNAL_PROVIDE = "signal/provide" SIGNAL_CANDIDATE = "signal/candidate" ) diff --git a/server/internal/types/message/messages.go b/server/internal/types/message/messages.go index 2ac7e9ff..4c40caa1 100644 --- a/server/internal/types/message/messages.go +++ b/server/internal/types/message/messages.go @@ -24,6 +24,11 @@ type SignalProvide struct { ICE []webrtc.ICEServer `json:"ice"` } +type SignalOffer struct { + Event string `json:"event"` + SDP string `json:"sdp"` +} + type SignalAnswer struct { Event string `json:"event"` DisplayName string `json:"displayname"` diff --git a/server/internal/types/session.go b/server/internal/types/session.go index bbba50c2..da4eb414 100644 --- a/server/internal/types/session.go +++ b/server/internal/types/session.go @@ -22,7 +22,10 @@ type Session interface { Address() string Kick(message string) error Send(v interface{}) error - SignalAnswer(sdp string) error + SignalLocalOffer(sdp string) error + SignalLocalAnswer(sdp string) error + SignalRemoteOffer(sdp string) error + SignalRemoteAnswer(sdp string) error SignalCandidate(data string) error } diff --git a/server/internal/types/webrtc.go b/server/internal/types/webrtc.go index a546ce88..bda3524a 100644 --- a/server/internal/types/webrtc.go +++ b/server/internal/types/webrtc.go @@ -10,11 +10,16 @@ type Sample media.Sample type WebRTCManager interface { Start() Shutdown() error - CreatePeer(id string, session Session) (string, bool, []webrtc.ICEServer, error) + CreatePeer(id string, session Session) (Peer, error) + ICELite() bool + ICEServers() []webrtc.ICEServer } type Peer interface { - SignalAnswer(sdp string) error + CreateOffer() (string, error) + CreateAnswer() (string, error) + SetOffer(sdp string) error + SetAnswer(sdp string) error WriteData(v interface{}) error Destroy() error } diff --git a/server/internal/webrtc/peer.go b/server/internal/webrtc/peer.go index b8d11332..180a433b 100644 --- a/server/internal/webrtc/peer.go +++ b/server/internal/webrtc/peer.go @@ -17,7 +17,39 @@ type Peer struct { mu sync.Mutex } -func (peer *Peer) SignalAnswer(sdp string) error { +func (peer *Peer) CreateOffer() (string, error) { + desc, err := peer.connection.CreateOffer(nil) + if err != nil { + return "", err + } + + err = peer.connection.SetLocalDescription(desc) + if err != nil { + return "", err + } + + return desc.SDP, nil +} + +func (peer *Peer) CreateAnswer() (string, error) { + desc, err := peer.connection.CreateAnswer(nil) + if err != nil { + return "", err + } + + err = peer.connection.SetLocalDescription(desc) + if err != nil { + return "", nil + } + + return desc.SDP, nil +} + +func (peer *Peer) SetOffer(sdp string) error { + return peer.connection.SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}) +} + +func (peer *Peer) SetAnswer(sdp string) error { return peer.connection.SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) } diff --git a/server/internal/webrtc/webrtc.go b/server/internal/webrtc/webrtc.go index 066d27c9..07613f5f 100644 --- a/server/internal/webrtc/webrtc.go +++ b/server/internal/webrtc/webrtc.go @@ -74,7 +74,7 @@ func (manager *WebRTCManager) Shutdown() error { return nil } -func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (string, bool, []webrtc.ICEServer, error) { +func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (types.Peer, error) { configuration := &webrtc.Configuration{ ICEServers: manager.config.ICEServers, SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, @@ -106,7 +106,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri i := &interceptor.Registry{} if err := webrtc.RegisterDefaultInterceptors(&engine, i); err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err + return nil, err } // Create API with MediaEngine and SettingEngine @@ -115,7 +115,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri // Create new peer connection connection, err := api.NewPeerConnection(*configuration) if err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err + return nil, err } negotiated := true @@ -123,7 +123,7 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri Negotiated: &negotiated, }) if err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err + return nil, err } connection.OnDataChannel(func(d *webrtc.DataChannel) { @@ -144,22 +144,12 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri rtpVideo, err := connection.AddTrack(manager.videoTrack) if err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err + return nil, err } rtpAudio, err := connection.AddTrack(manager.audioTrack) if err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err - } - - description, err := connection.CreateOffer(nil) - if err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err - } - - err = connection.SetLocalDescription(description) - if err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err + return nil, err } connection.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { @@ -182,6 +172,32 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri } }) + peer := &Peer{ + id: id, + api: api, + engine: &engine, + manager: manager, + settings: &settings, + connection: connection, + configuration: configuration, + } + + connection.OnNegotiationNeeded(func() { + manager.logger.Warn().Msg("negotiation is needed") + + sdp, err := peer.CreateOffer() + if err != nil { + manager.logger.Err(err).Msg("creating offer failed") + return + } + + err = session.SignalLocalOffer(sdp) + if err != nil { + manager.logger.Warn().Err(err).Msg("sending SignalLocalOffer failed") + return + } + }) + connection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { manager.logger.Info().Msg("sent all ICECandidates") @@ -200,16 +216,8 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri } }) - if err := session.SetPeer(&Peer{ - id: id, - api: api, - engine: &engine, - manager: manager, - settings: &settings, - connection: connection, - configuration: configuration, - }); err != nil { - return "", manager.config.ICELite, manager.config.ICEServers, err + if err := session.SetPeer(peer); err != nil { + return nil, err } go func() { @@ -230,7 +238,15 @@ func (manager *WebRTCManager) CreatePeer(id string, session types.Session) (stri } }() - return description.SDP, manager.config.ICELite, manager.config.ICEServers, nil + return peer, nil +} + +func (manager *WebRTCManager) ICELite() bool { + return manager.config.ICELite +} + +func (manager *WebRTCManager) ICEServers() []webrtc.ICEServer { + return manager.config.ICEServers } func (manager *WebRTCManager) createTrack(codecName string) (*webrtc.TrackLocalStaticSample, webrtc.RTPCodecParameters, error) { diff --git a/server/internal/websocket/handler.go b/server/internal/websocket/handler.go index 693e572e..4dec3065 100644 --- a/server/internal/websocket/handler.go +++ b/server/internal/websocket/handler.go @@ -60,11 +60,17 @@ func (h *MessageHandler) Message(id string, raw []byte) error { switch header.Event { // Signal Events + case event.SIGNAL_OFFER: + payload := &message.SignalOffer{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.signalRemoteOffer(id, session, payload) + }), "%s failed", header.Event) case event.SIGNAL_ANSWER: payload := &message.SignalAnswer{} return errors.Wrapf( utils.Unmarshal(payload, raw, func() error { - return h.signalAnswer(id, session, payload) + return h.signalRemoteAnswer(id, session, payload) }), "%s failed", header.Event) // Control Events diff --git a/server/internal/websocket/signal.go b/server/internal/websocket/signal.go index 00bf9303..d416adb2 100644 --- a/server/internal/websocket/signal.go +++ b/server/internal/websocket/signal.go @@ -7,7 +7,12 @@ import ( ) func (h *MessageHandler) signalProvide(id string, session types.Session) error { - sdp, lite, ice, err := h.webrtc.CreatePeer(id, session) + peer, err := h.webrtc.CreatePeer(id, session) + if err != nil { + return err + } + + sdp, err := peer.CreateOffer() if err != nil { return err } @@ -16,8 +21,8 @@ func (h *MessageHandler) signalProvide(id string, session types.Session) error { Event: event.SIGNAL_PROVIDE, ID: id, SDP: sdp, - Lite: lite, - ICE: ice, + Lite: h.webrtc.ICELite(), + ICE: h.webrtc.ICEServers(), }); err != nil { return err } @@ -25,12 +30,16 @@ func (h *MessageHandler) signalProvide(id string, session types.Session) error { return nil } -func (h *MessageHandler) signalAnswer(id string, session types.Session, payload *message.SignalAnswer) error { +func (h *MessageHandler) signalRemoteOffer(id string, session types.Session, payload *message.SignalOffer) error { + return session.SignalRemoteOffer(payload.SDP) +} + +func (h *MessageHandler) signalRemoteAnswer(id string, session types.Session, payload *message.SignalAnswer) error { if err := session.SetName(payload.DisplayName); err != nil { return err } - if err := session.SignalAnswer(payload.SDP); err != nil { + if err := session.SignalRemoteAnswer(payload.SDP); err != nil { return err }