Add WebSocket heartbeat (#16)

* add heartbeat.

* rename.
This commit is contained in:
Miroslav Šedivý 2022-11-11 18:00:36 +01:00 committed by GitHub
parent b27b8e028d
commit 863de78b70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 4 deletions

View File

@ -1,5 +1,5 @@
import EventEmitter from 'eventemitter3'
import { SYSTEM_LOGS } from '../types/events'
import { SYSTEM_HEARTBEAT, SYSTEM_LOGS } from '../types/events'
import { Logger } from '../utils/logger'
export interface NekoWebSocketEvents {
@ -8,7 +8,13 @@ export interface NekoWebSocketEvents {
message: (event: string, payload: any) => void
}
const statusCodeMap = {
// how long can connection be idle before closing
const STALE_TIMEOUT_MS = 12_500 // 12.5 seconds
// how often should stale check be evaluated
const STALE_INTERVAL_MS = 7_000 // 7 seconds
const STATUS_CODE_MAP = {
1000: 'Normal Closure',
1001: 'Going Away',
1002: 'Protocol Error',
@ -29,6 +35,8 @@ const statusCodeMap = {
export class NekoWebSocket extends EventEmitter<NekoWebSocketEvents> {
private _ws?: WebSocket
private _stale_interval?: number
private _last_received?: Date
// eslint-disable-next-line
constructor(
@ -67,8 +75,8 @@ export class NekoWebSocket extends EventEmitter<NekoWebSocketEvents> {
this._ws.onclose = (e: CloseEvent) => {
let reason = 'close'
if (e.code in statusCodeMap) {
reason = statusCodeMap[e.code]
if (e.code in STATUS_CODE_MAP) {
reason = STATUS_CODE_MAP[e.code]
}
this.onDisconnected(reason)
@ -78,6 +86,13 @@ export class NekoWebSocket extends EventEmitter<NekoWebSocketEvents> {
}
public disconnect(reason: string) {
this._last_received = undefined
if (this._stale_interval) {
window.clearInterval(this._stale_interval)
this._stale_interval = undefined
}
if (typeof this._ws !== 'undefined') {
// unmount all events
this._ws.onopen = () => {}
@ -106,6 +121,10 @@ export class NekoWebSocket extends EventEmitter<NekoWebSocketEvents> {
private onMessage(e: MessageEvent) {
const { event, payload } = JSON.parse(e.data)
this._last_received = new Date()
// heartbeat only updates last_received
if (event == SYSTEM_HEARTBEAT) return
this._log.debug(`received websocket event`, { event, payload })
this.emit('message', event, payload)
}
@ -116,6 +135,10 @@ export class NekoWebSocket extends EventEmitter<NekoWebSocketEvents> {
return
}
// periodically check if connection is stale
if (this._stale_interval) window.clearInterval(this._stale_interval)
this._stale_interval = window.setInterval(this.onStaleCheck.bind(this), STALE_INTERVAL_MS)
this._log.info(`connected`)
this.emit('connected')
}
@ -126,4 +149,16 @@ export class NekoWebSocket extends EventEmitter<NekoWebSocketEvents> {
this._log.info(`disconnected`, { reason })
this.emit('disconnected', new Error(`connection ${reason}`))
}
private onStaleCheck() {
if (!this._last_received) return
// if we haven't received a message in specified time,
// assume the connection is dead
const diff = new Date().getTime() - this._last_received.getTime()
if (diff < STALE_TIMEOUT_MS) return
this._log.warn(`websocket connection is stale, disconnecting`)
this.onDisconnected('stale')
}
}

View File

@ -3,6 +3,7 @@ export const SYSTEM_ADMIN = 'system/admin'
export const SYSTEM_SETTINGS = 'system/settings'
export const SYSTEM_LOGS = 'system/logs'
export const SYSTEM_DISCONNECT = 'system/disconnect'
export const SYSTEM_HEARTBEAT = 'system/heartbeat'
export const SIGNAL_REQUEST = 'signal/request'
export const SIGNAL_RESTART = 'signal/restart'