From d5bff32302899cbc03b0a32fcd2827bc9c450a92 Mon Sep 17 00:00:00 2001 From: Craig Date: Wed, 12 Feb 2020 23:13:33 +0000 Subject: [PATCH] server -> client signaling --- client/src/neko/base.ts | 93 ++++++---------- client/src/neko/events.ts | 6 -- client/src/neko/index.ts | 14 +-- client/src/neko/messages.ts | 34 +++--- server/internal/session/manager.go | 26 ----- server/internal/session/session.go | 23 ++-- server/internal/types/event/events.go | 5 - server/internal/types/message/messages.go | 11 +- server/internal/types/session.go | 6 +- server/internal/types/webrtc.go | 5 +- server/internal/webrtc/peer.go | 21 +--- server/internal/webrtc/tracks.go | 8 +- server/internal/webrtc/webrtc.go | 124 +++++++++++----------- server/internal/websocket/handler.go | 13 +-- server/internal/websocket/identity.go | 34 ------ server/internal/websocket/session.go | 8 +- server/internal/websocket/signal.go | 36 +++++++ 17 files changed, 181 insertions(+), 286 deletions(-) delete mode 100644 server/internal/websocket/identity.go create mode 100644 server/internal/websocket/signal.go diff --git a/client/src/neko/base.ts b/client/src/neko/base.ts index 8e1e908..311dbec 100644 --- a/client/src/neko/base.ts +++ b/client/src/neko/base.ts @@ -2,15 +2,7 @@ import EventEmitter from 'eventemitter3' import { OPCODE } from './data' import { EVENT, WebSocketEvents } from './events' -import { - WebSocketMessages, - WebSocketPayloads, - IdentityPayload, - SignalPayload, - MemberListPayload, - MemberPayload, - ControlPayload, -} from './messages' +import { WebSocketMessages, WebSocketPayloads, SignalProvidePayload } from './messages' export interface BaseEvents { info: (...message: any[]) => void @@ -26,6 +18,11 @@ export abstract class BaseClient extends EventEmitter { protected _timeout?: NodeJS.Timeout protected _username?: string protected _state: RTCIceConnectionState = 'disconnected' + protected _id = '' + + get id() { + return this._id + } get supported() { return typeof RTCPeerConnection !== 'undefined' && typeof RTCPeerConnection.prototype.addTransceiver !== 'undefined' @@ -93,6 +90,8 @@ export abstract class BaseClient extends EventEmitter { } this._state = 'disconnected' + this._username = undefined + this._id = '' } public sendData(event: 'wheel' | 'mousemove', data: { x: number; y: number }): void @@ -157,7 +156,7 @@ export abstract class BaseClient extends EventEmitter { this._ws!.send(JSON.stringify({ event, ...payload })) } - public createPeer() { + public createPeer(sdp: string) { this.emit('debug', `creating peer`) if (!this.socketOpen) { this.emit( @@ -175,18 +174,6 @@ export abstract class BaseClient extends EventEmitter { this._peer = new RTCPeerConnection() - this._peer.onicecandidate = event => { - if (event.candidate === null && this._peer!.localDescription) { - this.emit('debug', `sending event '${EVENT.SIGNAL.PROVIDE}' with payload`, this._peer!.localDescription.sdp) - this._ws!.send( - JSON.stringify({ - event: EVENT.SIGNAL.PROVIDE, - sdp: this._peer!.localDescription.sdp, - }), - ) - } - } - this._peer.onconnectionstatechange = event => { this.emit('debug', `peer connection state chagned`, this._peer ? this._peer.connectionState : undefined) } @@ -227,43 +214,40 @@ export abstract class BaseClient extends EventEmitter { this._channel.onmessage = this.onData.bind(this) this._channel.onclose = this.onDisconnected.bind(this, new Error('peer data channel closed')) + this._peer.setRemoteDescription({ type: 'offer', sdp }) this._peer - .createOffer() - .then(d => this._peer!.setLocalDescription(d)) + .createAnswer() + .then(d => { + this._peer!.setLocalDescription(d) + this._ws!.send( + JSON.stringify({ + event: EVENT.SIGNAL.ANSWER, + sdp: d.sdp, + username: this._username, + }), + ) + }) .catch(err => this.emit('error', err)) } - private setRemoteDescription(payload: SignalPayload) { - if (this.peerConnected) { - this.emit('warn', `attempting to set remote description while peer connected`, payload) - return - } - - this.emit('debug', `remote description recieved: \n`, payload.sdp) - this._peer!.setRemoteDescription({ type: 'answer', sdp: payload.sdp }) - } - private onMessage(e: MessageEvent) { const { event, ...payload } = JSON.parse(e.data) as WebSocketMessages this.emit('debug', `received websocket event ${event} ${payload ? `with payload: ` : ''}`, payload) - switch (event) { - case EVENT.IDENTITY.PROVIDE: - this[EVENT.IDENTITY.PROVIDE](payload as IdentityPayload) - this.createPeer() - break - case EVENT.SIGNAL.ANSWER: - this.setRemoteDescription(payload as SignalPayload) - break - default: - // @ts-ignore - if (typeof this[event] === 'function') { - // @ts-ignore - this[event](payload) - } else { - this[EVENT.MESSAGE](event, payload) - } + if (event === EVENT.SIGNAL.PROVIDE) { + const { sdp, id } = payload as SignalProvidePayload + this._id = id + this.createPeer(sdp) + return + } + + // @ts-ignore + if (typeof this[event] === 'function') { + // @ts-ignore + this[event](payload) + } else { + this[EVENT.MESSAGE](event, payload) } } @@ -295,14 +279,6 @@ export abstract class BaseClient extends EventEmitter { return } - this.emit('debug', `sending event '${EVENT.IDENTITY.DETAILS}' with payload`, { username: this._username }) - this._ws!.send( - JSON.stringify({ - event: EVENT.IDENTITY.DETAILS, - username: this._username, - }), - ) - this.emit('debug', `connected`) this[EVENT.CONNECTED]() } @@ -330,5 +306,4 @@ export abstract class BaseClient extends EventEmitter { protected abstract [EVENT.DISCONNECTED](reason?: Error): void protected abstract [EVENT.TRACK](event: RTCTrackEvent): void protected abstract [EVENT.DATA](data: any): void - protected abstract [EVENT.IDENTITY.PROVIDE](payload: IdentityPayload): void } diff --git a/client/src/neko/events.ts b/client/src/neko/events.ts index 2bce491..b61476a 100644 --- a/client/src/neko/events.ts +++ b/client/src/neko/events.ts @@ -15,10 +15,6 @@ export const EVENT = { ANSWER: 'signal/answer', PROVIDE: 'signal/provide', }, - IDENTITY: { - PROVIDE: 'identity/provide', - DETAILS: 'identity/details', - }, MEMBER: { LIST: 'member/list', CONNECTED: 'member/connected', @@ -59,7 +55,6 @@ export type Events = typeof EVENT export type WebSocketEvents = | SystemEvents | ControlEvents - | IdentityEvents | MemberEvents | SignalEvents | ChatEvents @@ -74,7 +69,6 @@ export type ControlEvents = | typeof EVENT.CONTROL.CLIPBOARD export type SystemEvents = typeof EVENT.SYSTEM.DISCONNECT -export type IdentityEvents = typeof EVENT.IDENTITY.PROVIDE | typeof EVENT.IDENTITY.DETAILS export type MemberEvents = typeof EVENT.MEMBER.LIST | typeof EVENT.MEMBER.CONNECTED | typeof EVENT.MEMBER.DISCONNECTED export type SignalEvents = typeof EVENT.SIGNAL.ANSWER | typeof EVENT.SIGNAL.PROVIDE export type ChatEvents = typeof EVENT.CHAT.MESSAGE | typeof EVENT.CHAT.EMOTE diff --git a/client/src/neko/index.ts b/client/src/neko/index.ts index d6fc7cc..2bc399f 100644 --- a/client/src/neko/index.ts +++ b/client/src/neko/index.ts @@ -7,7 +7,7 @@ import { accessor } from '~/store' import { DisconnectPayload, - IdentityPayload, + SignalProvidePayload, MemberListPayload, MemberDisconnectPayload, MemberPayload, @@ -28,10 +28,6 @@ export class NekoClient extends BaseClient implements EventEmitter { private $vue!: Vue private $accessor!: typeof accessor - private get id() { - return this.$accessor.user.id - } - init(vue: Vue) { this.$vue = vue this.$accessor = vue.$accessor @@ -72,6 +68,7 @@ export class NekoClient extends BaseClient implements EventEmitter { } protected [EVENT.CONNECTED]() { + this.$accessor.user.setMember(this.id) this.$accessor.setConnected(true) this.$accessor.setConnected(true) @@ -121,13 +118,6 @@ export class NekoClient extends BaseClient implements EventEmitter { }) } - ///////////////////////////// - // Identity Events - ///////////////////////////// - protected [EVENT.IDENTITY.PROVIDE]({ id }: IdentityPayload) { - this.$accessor.user.setMember(id) - } - ///////////////////////////// // Member Events ///////////////////////////// diff --git a/client/src/neko/messages.ts b/client/src/neko/messages.ts index 5c9b419..fdb81ee 100644 --- a/client/src/neko/messages.ts +++ b/client/src/neko/messages.ts @@ -3,7 +3,6 @@ import { WebSocketEvents, SystemEvents, ControlEvents, - IdentityEvents, MemberEvents, SignalEvents, ChatEvents, @@ -14,8 +13,8 @@ import { Member, ScreenConfigurations, ScreenResolution } from './types' export type WebSocketMessages = | WebSocketMessage - | IdentityMessage - | SignalMessage + | SignalProvideMessage + | SignalAnswerMessage | MemberListMessage | MembeConnectMessage | MembeDisconnectMessage @@ -25,8 +24,8 @@ export type WebSocketMessages = | ChatMessage export type WebSocketPayloads = - | IdentityPayload - | SignalPayload + | SignalProvidePayload + | SignalAnswerPayload | MemberListPayload | Member | ControlPayload @@ -53,26 +52,25 @@ export interface DisconnectPayload { message: string } -/* - IDENTITY MESSAGES/PAYLOADS -*/ -// identity/provide -export interface IdentityMessage extends WebSocketMessage, IdentityPayload { - event: typeof EVENT.IDENTITY.PROVIDE -} -export interface IdentityPayload { - id: string -} - /* SIGNAL MESSAGES/PAYLOADS */ +// signal/provide +export interface SignalProvideMessage extends WebSocketMessage, SignalProvidePayload { + event: typeof EVENT.SIGNAL.PROVIDE +} +export interface SignalProvidePayload { + id: string + sdp: string +} + // signal/answer -export interface SignalMessage extends WebSocketMessage, SignalPayload { +export interface SignalAnswerMessage extends WebSocketMessage, SignalAnswerPayload { event: typeof EVENT.SIGNAL.ANSWER } -export interface SignalPayload { +export interface SignalAnswerPayload { sdp: string + username: string } /* diff --git a/server/internal/session/manager.go b/server/internal/session/manager.go index 6673882..5a2548e 100644 --- a/server/internal/session/manager.go +++ b/server/internal/session/manager.go @@ -131,32 +131,6 @@ func (manager *SessionManager) Brodcast(v interface{}, exclude interface{}) erro return nil } -func (manager *SessionManager) WriteVideoSample(sample types.Sample) error { - for _, session := range manager.members { - if !session.connected { - continue - } - - if err := session.WriteVideoSample(sample); err != nil { - return err - } - } - return nil -} - -func (manager *SessionManager) WriteAudioSample(sample types.Sample) error { - for _, session := range manager.members { - if !session.connected { - continue - } - - if err := session.WriteAudioSample(sample); err != nil { - return err - } - } - return nil -} - func (manager *SessionManager) OnHost(listener func(id string)) { manager.emmiter.On("host", func(payload ...interface{}) { listener(payload[0].(string)) diff --git a/server/internal/session/session.go b/server/internal/session/session.go index f411d6d..e65cbc0 100644 --- a/server/internal/session/session.go +++ b/server/internal/session/session.go @@ -64,8 +64,6 @@ func (session *Session) SetMuted(muted bool) { func (session *Session) SetName(name string) error { session.name = name - session.connected = true - session.manager.emmiter.Emit("connected", session.id, session) return nil } @@ -79,6 +77,14 @@ func (session *Session) SetPeer(peer types.Peer) error { return nil } +func (session *Session) SetConnected(connected bool) error { + session.connected = connected + if connected { + session.manager.emmiter.Emit("connected", session.id, session) + } + return nil +} + func (session *Session) Kick(reason string) error { if session.socket == nil { return nil @@ -107,18 +113,11 @@ func (session *Session) Write(v interface{}) error { return session.socket.Send(v) } -func (session *Session) WriteVideoSample(sample types.Sample) error { - if session.peer == nil || !session.connected { +func (session *Session) SignalAnwser(sdp string) error { + if session.peer == nil { return nil } - return session.peer.WriteVideoSample(sample) -} - -func (session *Session) WriteAudioSample(sample types.Sample) error { - if session.peer == nil || !session.connected { - return nil - } - return session.peer.WriteAudioSample(sample) + return session.peer.SignalAnwser(sdp) } func (session *Session) destroy() error { diff --git a/server/internal/types/event/events.go b/server/internal/types/event/events.go index 7b52cc8..1ee171e 100644 --- a/server/internal/types/event/events.go +++ b/server/internal/types/event/events.go @@ -9,11 +9,6 @@ const ( SIGNAL_PROVIDE = "signal/provide" ) -const ( - IDENTITY_PROVIDE = "identity/provide" - IDENTITY_DETAILS = "identity/details" -) - const ( MEMBER_LIST = "member/list" MEMBER_CONNECTED = "member/connected" diff --git a/server/internal/types/message/messages.go b/server/internal/types/message/messages.go index 3cb18e7..d19040b 100644 --- a/server/internal/types/message/messages.go +++ b/server/internal/types/message/messages.go @@ -13,19 +13,16 @@ type Disconnect struct { Message string `json:"message"` } -type Identity struct { +type SignalProvide struct { Event string `json:"event"` ID string `json:"id"` + SDP string `json:"sdp"` } -type IdentityDetails struct { +type SignalAnswer struct { Event string `json:"event"` Username string `json:"username"` -} - -type Signal struct { - Event string `json:"event"` - SDP string `json:"sdp"` + SDP string `json:"sdp"` } type MembersList struct { diff --git a/server/internal/types/session.go b/server/internal/types/session.go index 4e57cf3..fbf34db 100644 --- a/server/internal/types/session.go +++ b/server/internal/types/session.go @@ -16,14 +16,14 @@ type Session interface { Member() *Member SetMuted(muted bool) SetName(name string) error + SetConnected(connected bool) error SetSocket(socket WebScoket) error SetPeer(peer Peer) error Address() *string Kick(message string) error Write(v interface{}) error Send(v interface{}) error - WriteAudioSample(sample Sample) error - WriteVideoSample(sample Sample) error + SignalAnwser(sdp string) error } type SessionManager interface { @@ -39,8 +39,6 @@ type SessionManager interface { Destroy(id string) error Clear() error Brodcast(v interface{}, exclude interface{}) error - WriteAudioSample(sample Sample) error - WriteVideoSample(sample Sample) error OnHost(listener func(id string)) OnHostCleared(listener func(id string)) OnDestroy(listener func(id string)) diff --git a/server/internal/types/webrtc.go b/server/internal/types/webrtc.go index c96572e..a507635 100644 --- a/server/internal/types/webrtc.go +++ b/server/internal/types/webrtc.go @@ -8,13 +8,12 @@ type Sample struct { type WebRTCManager interface { Start() Shutdown() error - CreatePeer(id string, sdp string) (string, Peer, error) + CreatePeer(id string, session Session) (string, error) ChangeScreenSize(width int, height int, rate int) error } type Peer interface { - WriteVideoSample(sample Sample) error - WriteAudioSample(sample Sample) error + SignalAnwser(sdp string) error WriteData(v interface{}) error Destroy() error } diff --git a/server/internal/webrtc/peer.go b/server/internal/webrtc/peer.go index 8357262..47a74d2 100644 --- a/server/internal/webrtc/peer.go +++ b/server/internal/webrtc/peer.go @@ -4,32 +4,17 @@ import ( "sync" "github.com/pion/webrtc/v2" - "github.com/pion/webrtc/v2/pkg/media" - "n.eko.moe/neko/internal/types" ) type Peer struct { id string - engine webrtc.MediaEngine - api *webrtc.API - video *webrtc.Track - audio *webrtc.Track + manager *WebRTCManager connection *webrtc.PeerConnection mu sync.Mutex } -func (peer *Peer) WriteAudioSample(sample types.Sample) error { - if err := peer.audio.WriteSample(media.Sample(sample)); err != nil { - return err - } - return nil -} - -func (peer *Peer) WriteVideoSample(sample types.Sample) error { - if err := peer.video.WriteSample(media.Sample(sample)); err != nil { - return err - } - return nil +func (peer *Peer) SignalAnwser(sdp string) error { + return peer.connection.SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) } func (peer *Peer) WriteData(v interface{}) error { diff --git a/server/internal/webrtc/tracks.go b/server/internal/webrtc/tracks.go index 236421c..2ee3d01 100644 --- a/server/internal/webrtc/tracks.go +++ b/server/internal/webrtc/tracks.go @@ -7,9 +7,9 @@ import ( "github.com/pion/webrtc/v2" ) -func (m *WebRTCManager) createVideoTrack(engine webrtc.MediaEngine) (*webrtc.Track, error) { +func (m *WebRTCManager) createVideoTrack() (*webrtc.Track, error) { var codec *webrtc.RTPCodec - for _, videoCodec := range engine.GetCodecsByKind(webrtc.RTPCodecTypeVideo) { + for _, videoCodec := range m.engine.GetCodecsByKind(webrtc.RTPCodecTypeVideo) { if videoCodec.Name == m.videoPipeline.CodecName { codec = videoCodec break @@ -23,9 +23,9 @@ func (m *WebRTCManager) createVideoTrack(engine webrtc.MediaEngine) (*webrtc.Tra return webrtc.NewTrack(codec.PayloadType, rand.Uint32(), "stream", "stream", codec) } -func (m *WebRTCManager) createAudioTrack(engine webrtc.MediaEngine) (*webrtc.Track, error) { +func (m *WebRTCManager) createAudioTrack() (*webrtc.Track, error) { var codec *webrtc.RTPCodec - for _, videoCodec := range engine.GetCodecsByKind(webrtc.RTPCodecTypeAudio) { + for _, videoCodec := range m.engine.GetCodecsByKind(webrtc.RTPCodecTypeAudio) { if videoCodec.Name == m.audioPipeline.CodecName { codec = videoCodec break diff --git a/server/internal/webrtc/webrtc.go b/server/internal/webrtc/webrtc.go index b83fe2f..71608f4 100644 --- a/server/internal/webrtc/webrtc.go +++ b/server/internal/webrtc/webrtc.go @@ -2,10 +2,12 @@ package webrtc import ( "fmt" + "io" "strings" "time" "github.com/pion/webrtc/v2" + "github.com/pion/webrtc/v2/pkg/media" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -28,13 +30,22 @@ func New(sessions types.SessionManager, config *config.WebRTC) *WebRTCManager { settings.SetEphemeralUDPPortRange(config.EphemeralMin, config.EphemeralMax) settings.SetNAT1To1IPs(config.NAT1To1IPs, webrtc.ICECandidateTypeHost) + // Create MediaEngine based off sdp + engine := webrtc.MediaEngine{} + engine.RegisterDefaultCodecs() + + // Create API with MediaEngine and SettingEngine + api := webrtc.NewAPI(webrtc.WithMediaEngine(engine), webrtc.WithSettingEngine(settings)) + return &WebRTCManager{ logger: logger, settings: settings, cleanup: time.NewTicker(1 * time.Second), shutdown: make(chan bool), sessions: sessions, + engine: engine, config: config, + api: api, configuration: &webrtc.Configuration{ SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, }, @@ -44,18 +55,23 @@ func New(sessions types.SessionManager, config *config.WebRTC) *WebRTCManager { type WebRTCManager struct { logger zerolog.Logger settings webrtc.SettingEngine - sessions types.SessionManager + engine webrtc.MediaEngine + api *webrtc.API + videoTrack *webrtc.Track + audioTrack *webrtc.Track videoPipeline *gst.Pipeline audioPipeline *gst.Pipeline + sessions types.SessionManager cleanup *time.Ticker config *config.WebRTC + shutdown chan bool configuration *webrtc.Configuration } func (m *WebRTCManager) Start() { + // Set display and change to default resolution xorg.Display(m.config.Display) - if !xorg.ValidScreenSize(m.config.ScreenWidth, m.config.ScreenHeight, m.config.ScreenRate) { m.logger.Warn().Msgf("invalid screen option %dx%d@%d", m.config.ScreenWidth, m.config.ScreenHeight, m.config.ScreenRate) } else { @@ -64,31 +80,39 @@ func (m *WebRTCManager) Start() { } } + // Create video track/pipeline videoPipeline, err := gst.CreatePipeline( m.config.VideoCodec, m.config.Display, m.config.VideoParams, ) - if err != nil { m.logger.Panic().Err(err).Msg("unable to start webrtc manager") } + m.videoPipeline = videoPipeline + video, err := m.createVideoTrack() + if err != nil { + m.logger.Panic().Err(err).Msg("unable to start webrtc manager") + } + m.videoTrack = video + + // Create audio track/pipeline audioPipeline, err := gst.CreatePipeline( m.config.AudioCodec, m.config.Device, m.config.AudioParams, ) - if err != nil { m.logger.Panic().Err(err).Msg("unable to start webrtc manager") } - - m.videoPipeline = videoPipeline m.audioPipeline = audioPipeline - videoPipeline.Start() - audioPipeline.Start() + audio, err := m.createAudioTrack() + if err != nil { + m.logger.Panic().Err(err).Msg("unable to start webrtc manager") + } + m.audioTrack = audio go func() { defer func() { @@ -100,11 +124,11 @@ func (m *WebRTCManager) Start() { case <-m.shutdown: return case sample := <-m.videoPipeline.Sample: - if err := m.sessions.WriteVideoSample(sample); err != nil { + if err := m.videoTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { m.logger.Warn().Err(err).Msg("video pipeline failed to write") } case sample := <-m.audioPipeline.Sample: - if err := m.sessions.WriteAudioSample(sample); err != nil { + if err := m.audioTrack.WriteSample(media.Sample(sample)); err != nil && err != io.ErrClosedPipe { m.logger.Warn().Err(err).Msg("audio pipeline failed to write") } case <-m.cleanup.C: @@ -125,6 +149,10 @@ func (m *WebRTCManager) Start() { m.logger.Debug().Str("id", id).Msg("session destroyed") }) + // start pipelines + videoPipeline.Start() + audioPipeline.Start() + // TODO: log resolution, bit rate and codec parameters m.logger.Info(). Str("video_display", m.config.Display). @@ -147,64 +175,28 @@ func (m *WebRTCManager) Shutdown() error { return nil } -func (m *WebRTCManager) CreatePeer(id string, sdp string) (string, types.Peer, error) { - // Create SessionDescription - description := webrtc.SessionDescription{ - SDP: sdp, - Type: webrtc.SDPTypeOffer, - } - - // Create MediaEngine based off sdp - engine := webrtc.MediaEngine{} - engine.PopulateFromSDP(description) - - // Create API with MediaEngine and SettingEngine - api := webrtc.NewAPI(webrtc.WithMediaEngine(engine), webrtc.WithSettingEngine(m.settings)) - +func (m *WebRTCManager) CreatePeer(id string, session types.Session) (string, error) { // Create new peer connection - connection, err := api.NewPeerConnection(*m.configuration) + connection, err := m.api.NewPeerConnection(*m.configuration) if err != nil { - return "", nil, err + return "", err } - // Create video track - video, err := m.createVideoTrack(engine) - if err != nil { - return "", nil, err - } - - _, err = connection.AddTransceiverFromTrack(video, webrtc.RtpTransceiverInit{ + if _, err = connection.AddTransceiverFromTrack(m.videoTrack, webrtc.RtpTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionSendonly, - }) - - if err != nil { - return "", nil, err + }); err != nil { + return "", err } - // Create audio track - audio, err := m.createAudioTrack(engine) - if err != nil { - return "", nil, err - } - - _, err = connection.AddTransceiverFromTrack(audio, webrtc.RtpTransceiverInit{ + if _, err = connection.AddTransceiverFromTrack(m.audioTrack, webrtc.RtpTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionSendonly, - }) - - if err != nil { - return "", nil, err + }); err != nil { + return "", err } - // Set remote description - connection.SetRemoteDescription(description) - - answer, err := connection.CreateAnswer(nil) + description, err := connection.CreateOffer(nil) if err != nil { - return "", nil, err - } - - if err = connection.SetLocalDescription(answer); err != nil { - return "", nil, err + return "", err } connection.OnDataChannel(func(d *webrtc.DataChannel) { @@ -215,6 +207,7 @@ func (m *WebRTCManager) CreatePeer(id string, sdp string) (string, types.Peer, e }) }) + connection.SetLocalDescription(description) connection.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { switch state { case webrtc.PeerConnectionStateDisconnected: @@ -224,18 +217,23 @@ func (m *WebRTCManager) CreatePeer(id string, sdp string) (string, types.Peer, e break case webrtc.PeerConnectionStateConnected: m.logger.Info().Str("id", id).Msg("peer connected") + if err = session.SetConnected(true); err != nil { + m.logger.Warn().Err(err).Msg("unable to set connected on peer") + m.sessions.Destroy(id) + } break } }) - return answer.SDP, &Peer{ + if err := session.SetPeer(&Peer{ id: id, - api: api, - engine: engine, - video: video, - audio: audio, + manager: m, connection: connection, - }, nil + }); err != nil { + return "", err + } + + return description.SDP, nil } func (m *WebRTCManager) ChangeScreenSize(width int, height int, rate int) error { diff --git a/server/internal/websocket/handler.go b/server/internal/websocket/handler.go index a09190e..73f8135 100644 --- a/server/internal/websocket/handler.go +++ b/server/internal/websocket/handler.go @@ -57,18 +57,11 @@ func (h *MessageHandler) Message(id string, raw []byte) error { switch header.Event { // Signal Events - case event.SIGNAL_PROVIDE: - payload := &message.Signal{} + case event.SIGNAL_ANSWER: + payload := &message.SignalAnswer{} return errors.Wrapf( utils.Unmarshal(payload, raw, func() error { - return h.createPeer(id, session, payload) - }), "%s failed", header.Event) - // Identity Events - case event.IDENTITY_DETAILS: - payload := &message.IdentityDetails{} - return errors.Wrapf( - utils.Unmarshal(payload, raw, func() error { - return h.identityDetails(id, session, payload) + return h.signalAnswer(id, session, payload) }), "%s failed", header.Event) // Control Events diff --git a/server/internal/websocket/identity.go b/server/internal/websocket/identity.go deleted file mode 100644 index a73cf8f..0000000 --- a/server/internal/websocket/identity.go +++ /dev/null @@ -1,34 +0,0 @@ -package websocket - -import ( - "n.eko.moe/neko/internal/types" - "n.eko.moe/neko/internal/types/event" - "n.eko.moe/neko/internal/types/message" -) - -func (h *MessageHandler) identityDetails(id string, session types.Session, payload *message.IdentityDetails) error { - if err := session.SetName(payload.Username); err != nil { - return err - } - return nil -} - -func (h *MessageHandler) createPeer(id string, session types.Session, payload *message.Signal) error { - sdp, peer, err := h.webrtc.CreatePeer(id, payload.SDP) - if err != nil { - return err - } - - if err := session.SetPeer(peer); err != nil { - return err - } - - if err := session.Send(message.Signal{ - Event: event.SIGNAL_ANSWER, - SDP: sdp, - }); err != nil { - return err - } - - return nil -} diff --git a/server/internal/websocket/session.go b/server/internal/websocket/session.go index 58289d7..8790757 100644 --- a/server/internal/websocket/session.go +++ b/server/internal/websocket/session.go @@ -7,10 +7,8 @@ import ( ) func (h *MessageHandler) SessionCreated(id string, session types.Session) error { - if err := session.Send(message.Identity{ - Event: event.IDENTITY_PROVIDE, - ID: id, - }); err != nil { + // send sdp and id over to client + if err := h.signalProvide(id, session); err != nil { return err } @@ -20,7 +18,7 @@ func (h *MessageHandler) SessionCreated(id string, session types.Session) error } if session.Admin() { - // send screen configurations + // send screen configurations if admin if err := h.screenConfigurations(id, session); err != nil { return err } diff --git a/server/internal/websocket/signal.go b/server/internal/websocket/signal.go new file mode 100644 index 0000000..e5b0dc1 --- /dev/null +++ b/server/internal/websocket/signal.go @@ -0,0 +1,36 @@ +package websocket + +import ( + "n.eko.moe/neko/internal/types" + "n.eko.moe/neko/internal/types/event" + "n.eko.moe/neko/internal/types/message" +) + +func (h *MessageHandler) signalProvide(id string, session types.Session) error { + sdp, err := h.webrtc.CreatePeer(id, session) + if err != nil { + return err + } + + if err := session.Send(message.SignalProvide{ + Event: event.SIGNAL_PROVIDE, + ID: id, + SDP: sdp, + }); err != nil { + return err + } + + return nil +} + +func (h *MessageHandler) signalAnswer(id string, session types.Session, payload *message.SignalAnswer) error { + if err := session.SetName(payload.Username); err != nil { + return err + } + + if err := session.SignalAnwser(payload.SDP); err != nil { + return err + } + + return nil +}