connection (WIP).

This commit is contained in:
Miroslav Šedivý 2021-07-15 23:58:45 +02:00
parent efe04c62ed
commit ad7e3d47c5

View File

@ -6,12 +6,8 @@ import * as EVENT from '../types/events'
import { NekoWebSocket } from './websocket' import { NekoWebSocket } from './websocket'
import { NekoWebRTC, WebRTCStats } from './webrtc' import { NekoWebRTC, WebRTCStats } from './webrtc'
import { Connection } from '../types/state' import { Connection } from '../types/state'
import { Reconnecter, ReconnecterAbstract } from '../utils/reconnecter'
const WEBSOCKET_TIMEOUT = 5000
const WEBSOCKET_RECONN_DELAY = 1500
const WEBRTC_TIMEOUT = 10000
const WEBRTC_RECONN_DELAY = 1500
const WEBRTC_RECONN_MAX_LOSS = 25 const WEBRTC_RECONN_MAX_LOSS = 25
const WEBRTC_RECONN_FAILED_ATTEMPTS = 5 const WEBRTC_RECONN_FAILED_ATTEMPTS = 5
@ -19,15 +15,85 @@ export interface NekoConnectionEvents {
disconnect: (error?: Error) => void disconnect: (error?: Error) => void
} }
class WebsocketReconnecter extends ReconnecterAbstract {
private _conn: NekoConnection
private _websocket: NekoWebSocket
constructor(conn: NekoConnection, websocket: NekoWebSocket) {
super()
// TODO: Antipattern.
this._conn = conn
// TODO: Unmount.
this._websocket = websocket
this._websocket.on('connected', () => this.emit('connect'))
this._websocket.on('disconnected', (error) => this.emit('disconnect', error))
}
public async connect() {
let url = this._conn.getUrl()
const token = this._conn.getToken()
if (token) {
url += '?token=' + encodeURIComponent(token)
}
this._websocket.connect(url)
}
public async disconnect() {
this._websocket.disconnect()
}
}
class WebrtcReconnecter extends ReconnecterAbstract {
private _conn: NekoConnection
private _websocket: NekoWebSocket
private _webrtc: NekoWebRTC
constructor(conn: NekoConnection, websocket: NekoWebSocket, webrtc: NekoWebRTC) {
super()
// TODO: Antipattern.
this._conn = conn
this._websocket = websocket
// TODO: Unmount.
this._webrtc = webrtc
this._webrtc.on('connected', () => this.emit('connect'))
this._webrtc.on('disconnected', (error) => this.emit('disconnect', error))
}
public async connect() {
this._websocket.send(EVENT.SIGNAL_REQUEST, { video: this._conn.getVideo() })
}
public async disconnect() {
this._webrtc.disconnect()
}
}
export class NekoConnection extends EventEmitter<NekoConnectionEvents> { export class NekoConnection extends EventEmitter<NekoConnectionEvents> {
private _activated = false
private _url = '' private _url = ''
private _token = '' private _token = ''
private _video = ''
private _log = new Logger('connection') private _log = new Logger('connection')
private _state: Connection private _state: Connection
public websocket = new NekoWebSocket() public websocket = new NekoWebSocket()
public _websocket_reconn = new Reconnecter(new WebsocketReconnecter(this, this.websocket), {
max_reconnects: 15,
timeout_ms: 5000,
backoff_ms: 1500,
})
public webrtc = new NekoWebRTC() public webrtc = new NekoWebRTC()
public _webrtc_reconn = new Reconnecter(new WebrtcReconnecter(this, this.websocket, this.webrtc), {
max_reconnects: 15,
timeout_ms: 10000,
backoff_ms: 1500,
})
constructor(state: Connection) { constructor(state: Connection) {
super() super()
@ -38,34 +104,41 @@ export class NekoConnection extends EventEmitter<NekoConnectionEvents> {
Vue.set(this._state, 'type', 'screencast') Vue.set(this._state, 'type', 'screencast')
// websocket // websocket
this.websocket.on('connected', () => { this._websocket_reconn.on('connect', () => {
if (this.websocket.connected && this.webrtc.connected) { if (this.websocket.connected && this.webrtc.connected) {
Vue.set(this._state, 'status', 'connected') Vue.set(this._state, 'status', 'connected')
} }
})
this.websocket.on('disconnected', () => {
if (this._state.status === 'connected') {
Vue.set(this._state, 'status', 'disconnected')
}
this._websocketReconnect() if (!this._webrtc_reconn.isConnected) {
this._webrtc_reconn.connect()
}
})
this._websocket_reconn.on('disconnect', () => {
if (this._state.status === 'connected') {
Vue.set(this._state, 'status', 'connecting')
}
})
this._websocket_reconn.on('close', (error) => {
this.emit('disconnect', error)
}) })
// webrtc // webrtc
this.webrtc.on('connected', () => { this._webrtc_reconn.on('connect', () => {
if (this.websocket.connected && this.webrtc.connected) { if (this.websocket.connected && this.webrtc.connected) {
Vue.set(this._state, 'status', 'connected') Vue.set(this._state, 'status', 'connected')
} }
Vue.set(this._state, 'type', 'webrtc') Vue.set(this._state, 'type', 'webrtc')
}) })
this.webrtc.on('disconnected', () => { this._webrtc_reconn.on('disconnect', () => {
if (this._state.status === 'connected') { if (this._state.status === 'connected') {
Vue.set(this._state, 'status', 'disconnected') Vue.set(this._state, 'status', 'connecting')
} }
Vue.set(this._state, 'type', 'screencast') Vue.set(this._state, 'type', 'screencast')
this._webrtcReconnect() })
this._webrtc_reconn.on('close', (error) => {
this.emit('disconnect', error)
}) })
let webrtcCongestion: number = 0 let webrtcCongestion: number = 0
@ -73,7 +146,7 @@ export class NekoConnection extends EventEmitter<NekoConnectionEvents> {
Vue.set(this._state.webrtc, 'stats', stats) Vue.set(this._state.webrtc, 'stats', stats)
// if automatic quality adjusting is turned off // if automatic quality adjusting is turned off
if (!this._state.webrtc.auto || !this.activated) return if (!this._state.webrtc.auto || !this._webrtc_reconn.isOpen) return
// if there are no or just one quality, no switching can be done // if there are no or just one quality, no switching can be done
if (this._state.webrtc.videos.length <= 1) return if (this._state.webrtc.videos.length <= 1) return
@ -105,11 +178,15 @@ export class NekoConnection extends EventEmitter<NekoConnectionEvents> {
} }
// try to reconnect // try to reconnect
this._webrtcReconnect() this._webrtc_reconn.reconnect()
} }
}) })
} }
public get activated() {
return this._websocket_reconn.isOpen && this._webrtc_reconn.isOpen
}
public setUrl(url: string) { public setUrl(url: string) {
this._url = url.replace(/^http/, 'ws').replace(/\/+$/, '') + '/api/ws' this._url = url.replace(/^http/, 'ws').replace(/\/+$/, '') + '/api/ws'
} }
@ -131,146 +208,32 @@ export class NekoConnection extends EventEmitter<NekoConnectionEvents> {
throw new Error('video id not found') throw new Error('video id not found')
} }
this.websocket.send(EVENT.SIGNAL_VIDEO, { video: video }) if (this.websocket.connected) {
this.websocket.send(EVENT.SIGNAL_VIDEO, { video })
}
this._video = video
}
public getVideo() {
return this._video
} }
public async connect(video?: string): Promise<void> { public async connect(video?: string): Promise<void> {
try { if (video) this._video = video
await this._websocketConnect()
if (video && !this._state.webrtc.videos.includes(video)) { this._webrtc_reconn.open(true)
throw new Error('video id not found') this._websocket_reconn.open()
}
await this._webrtcConnect(video)
this._activated = true
} catch (e) {
this.disconnect()
throw e
}
} }
public disconnect() { public disconnect() {
this._activated = false this._websocket_reconn.close()
this._webrtc_reconn.close()
this.webrtc.disconnect()
this.websocket.disconnect()
Vue.set(this._state, 'status', 'disconnected') Vue.set(this._state, 'status', 'disconnected')
this.emit('disconnect') this.emit('disconnect')
} }
public get activated() {
return this._activated
}
async _websocketConnect() {
Vue.set(this._state, 'status', 'connecting')
let url = this._url
if (this._token) {
url += '?token=' + encodeURIComponent(this._token)
}
this.websocket.connect(url)
await new Promise<void>((res, rej) => {
const timeout = window.setTimeout(() => {
this.websocket.disconnect()
rej(new Error('timeouted'))
}, WEBSOCKET_TIMEOUT)
this.websocket.once('connected', () => {
window.clearTimeout(timeout)
res()
})
this.websocket.once('disconnected', (reason) => {
window.clearTimeout(timeout)
rej(reason)
})
})
}
_websocketIsReconnecting = false
_websocketReconnect() {
if (this._websocketIsReconnecting) {
this._log.debug(`websocket reconnection already in progress`)
return
}
this._log.debug(`starting websocket reconnection`)
this.websocket.disconnect()
setTimeout(async () => {
while (this.activated) {
try {
await this._websocketConnect()
this._webrtcReconnect()
break
} catch (e) {
this._log.debug(`websocket reconnection failed`, e)
}
}
this._websocketIsReconnecting = false
this._log.debug(`websocket reconnection finished`)
}, WEBSOCKET_RECONN_DELAY)
}
async _webrtcConnect(video?: string) {
if (video && !this._state.webrtc.videos.includes(video)) {
throw new Error('video id not found')
}
Vue.set(this._state, 'status', 'connecting')
this.websocket.send(EVENT.SIGNAL_REQUEST, { video: video })
await new Promise<void>((res, rej) => {
const timeout = window.setTimeout(() => {
this.webrtc.disconnect()
rej(new Error('timeouted'))
}, WEBRTC_TIMEOUT)
this.webrtc.once('connected', () => {
window.clearTimeout(timeout)
res()
})
this.webrtc.once('disconnected', (reason) => {
window.clearTimeout(timeout)
rej(reason)
})
})
}
_webrtcIsReconnecting = false
_webrtcReconnect() {
if (this._webrtcIsReconnecting) {
this._log.debug(`webrtc reconnection already in progress`)
return
}
this._log.debug(`starting webrtc reconnection`)
this.webrtc.disconnect()
setTimeout(async () => {
let lastQuality: string | null = this._state.webrtc.video
while (this.activated && this.websocket.connected) {
try {
if (lastQuality != null) {
lastQuality = this._webrtcQualityDowngrade(lastQuality) || lastQuality
}
await this._webrtcConnect(lastQuality || undefined)
break
} catch (e) {
this._log.debug(`webrtc reconnection failed`, e)
}
}
this._webrtcIsReconnecting = false
this._log.debug(`webrtc reconnection finished`)
}, WEBRTC_RECONN_DELAY)
}
_webrtcQualityDowngrade(quality: string): string | undefined { _webrtcQualityDowngrade(quality: string): string | undefined {
// get index of selected or surrent quality // get index of selected or surrent quality
const index = this._state.webrtc.videos.indexOf(quality) const index = this._state.webrtc.videos.indexOf(quality)