Temporary websocket disconnect handling (#6)

* fix websocket close log error.

* logger session interface no pointer.

* websocket delayet disconnect.

* session host: save id not pointer to a session.

* fix if hostId not stored.
This commit is contained in:
Miroslav Šedivý 2022-08-26 20:16:40 +02:00 committed by GitHub
parent 5612b80634
commit 691150900b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 122 additions and 61 deletions

View File

@ -19,23 +19,22 @@ type ControlTargetPayload struct {
} }
func (h *RoomHandler) controlStatus(w http.ResponseWriter, r *http.Request) error { func (h *RoomHandler) controlStatus(w http.ResponseWriter, r *http.Request) error {
host := h.sessions.GetHost() host, hasHost := h.sessions.GetHost()
if host != nil { var hostId string
return utils.HttpSuccess(w, ControlStatusPayload{ if hasHost {
HasHost: true, hostId = host.ID()
HostId: host.ID(),
})
} }
return utils.HttpSuccess(w, ControlStatusPayload{ return utils.HttpSuccess(w, ControlStatusPayload{
HasHost: false, HasHost: hasHost,
HostId: hostId,
}) })
} }
func (h *RoomHandler) controlRequest(w http.ResponseWriter, r *http.Request) error { func (h *RoomHandler) controlRequest(w http.ResponseWriter, r *http.Request) error {
host := h.sessions.GetHost() _, hasHost := h.sessions.GetHost()
if host != nil { if hasHost {
return utils.HttpUnprocessableEntity("there is already a host") return utils.HttpUnprocessableEntity("there is already a host")
} }
@ -82,9 +81,9 @@ func (h *RoomHandler) controlGive(w http.ResponseWriter, r *http.Request) error
} }
func (h *RoomHandler) controlReset(w http.ResponseWriter, r *http.Request) error { func (h *RoomHandler) controlReset(w http.ResponseWriter, r *http.Request) error {
host := h.sessions.GetHost() _, hasHost := h.sessions.GetHost()
if host != nil { if hasHost {
h.desktop.ResetKeys() h.desktop.ResetKeys()
h.sessions.ClearHost() h.sessions.ClearHost()
} }

View File

@ -49,7 +49,7 @@ type logEntry struct {
logger zerolog.Logger logger zerolog.Logger
err error err error
panic *logPanic panic *logPanic
session *types.Session session types.Session
} }
type logPanic struct { type logPanic struct {
@ -69,7 +69,7 @@ func (e *logEntry) Error(err error) {
} }
func (e *logEntry) SetSession(session types.Session) { func (e *logEntry) SetSession(session types.Session) {
e.session = &session e.session = session
} }
func (e *logEntry) Write(status, bytes int, header http.Header, elapsed time.Duration, extra any) { func (e *logEntry) Write(status, bytes int, header http.Header, elapsed time.Duration, extra any) {
@ -83,7 +83,7 @@ func (e *logEntry) Write(status, bytes int, header http.Header, elapsed time.Dur
// add session ID to logs (if exists) // add session ID to logs (if exists)
if e.session != nil { if e.session != nil {
logger = logger.With().Str("session_id", (*e.session).ID()).Logger() logger = logger.With().Str("session_id", e.session.ID()).Logger()
} }
// handle panic error message // handle panic error message

View File

@ -3,6 +3,7 @@ package session
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
"github.com/kataras/go-events" "github.com/kataras/go-events"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -62,8 +63,7 @@ type SessionManagerCtx struct {
sessions map[string]*SessionCtx sessions map[string]*SessionCtx
sessionsMu sync.Mutex sessionsMu sync.Mutex
host types.Session hostId atomic.Value
hostMu sync.Mutex
cursors map[types.Session][]types.Cursor cursors map[types.Session][]types.Cursor
cursorsMu sync.Mutex cursorsMu sync.Mutex
@ -188,24 +188,33 @@ func (manager *SessionManagerCtx) List() []types.Session {
// --- // ---
func (manager *SessionManagerCtx) SetHost(host types.Session) { func (manager *SessionManagerCtx) SetHost(host types.Session) {
manager.hostMu.Lock() var hostId string
manager.host = host if host != nil {
manager.hostMu.Unlock() hostId = host.ID()
}
manager.hostId.Store(hostId)
manager.emmiter.Emit("host_changed", host) manager.emmiter.Emit("host_changed", host)
} }
func (manager *SessionManagerCtx) GetHost() types.Session { func (manager *SessionManagerCtx) GetHost() (types.Session, bool) {
manager.hostMu.Lock() hostId, ok := manager.hostId.Load().(string)
defer manager.hostMu.Unlock() if !ok || hostId == "" {
return nil, false
}
return manager.host return manager.Get(hostId)
} }
func (manager *SessionManagerCtx) ClearHost() { func (manager *SessionManagerCtx) ClearHost() {
manager.SetHost(nil) manager.SetHost(nil)
} }
func (manager *SessionManagerCtx) isHost(host types.Session) bool {
hostId, ok := manager.hostId.Load().(string)
return ok && hostId == host.ID()
}
// --- // ---
// cursors // cursors
// --- // ---

View File

@ -2,6 +2,7 @@ package session
import ( import (
"sync" "sync"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -9,6 +10,10 @@ import (
"github.com/demodesk/neko/pkg/types/event" "github.com/demodesk/neko/pkg/types/event"
) )
// client is expected to reconnect within 5 second
// if some unexpected websocket disconnect happens
const WS_DELAYED_DURATION = 5 * time.Second
type SessionCtx struct { type SessionCtx struct {
id string id string
token string token string
@ -20,6 +25,10 @@ type SessionCtx struct {
websocketPeer types.WebSocketPeer websocketPeer types.WebSocketPeer
websocketMu sync.Mutex websocketMu sync.Mutex
// websocket delayed set connected events
wsDelayedMu sync.Mutex
wsDelayedTimer *time.Timer
webrtcPeer types.WebRTCPeer webrtcPeer types.WebRTCPeer
webrtcMu sync.Mutex webrtcMu sync.Mutex
} }
@ -56,7 +65,7 @@ func (session *SessionCtx) State() types.SessionState {
} }
func (session *SessionCtx) IsHost() bool { func (session *SessionCtx) IsHost() bool {
return session.manager.GetHost() == session return session.manager.isHost(session)
} }
func (session *SessionCtx) PrivateModeEnabled() bool { func (session *SessionCtx) PrivateModeEnabled() bool {
@ -83,7 +92,7 @@ func (session *SessionCtx) SetWebSocketPeer(websocketPeer types.WebSocketPeer) {
} }
} }
func (session *SessionCtx) SetWebSocketConnected(websocketPeer types.WebSocketPeer, connected bool) { func (session *SessionCtx) SetWebSocketConnected(websocketPeer types.WebSocketPeer, connected bool, delayed bool) {
session.websocketMu.Lock() session.websocketMu.Lock()
isCurrentPeer := websocketPeer == session.websocketPeer isCurrentPeer := websocketPeer == session.websocketPeer
session.websocketMu.Unlock() session.websocketMu.Unlock()
@ -94,8 +103,36 @@ func (session *SessionCtx) SetWebSocketConnected(websocketPeer types.WebSocketPe
session.logger.Info(). session.logger.Info().
Bool("connected", connected). Bool("connected", connected).
Bool("delayed", delayed).
Msg("set websocket connected") Msg("set websocket connected")
//
// ws delayed
//
var wsDelayedTimer *time.Timer
if delayed {
wsDelayedTimer = time.AfterFunc(WS_DELAYED_DURATION, func() {
session.SetWebSocketConnected(websocketPeer, connected, false)
})
}
session.wsDelayedMu.Lock()
if session.wsDelayedTimer != nil {
session.wsDelayedTimer.Stop()
}
session.wsDelayedTimer = wsDelayedTimer
session.wsDelayedMu.Unlock()
if delayed {
return
}
//
// not delayed
//
session.state.IsConnected = connected session.state.IsConnected = connected
if connected { if connected {

View File

@ -13,8 +13,8 @@ func (manager *WebSocketManagerCtx) fileChooserDialogEvents() {
manager.desktop.OnFileChooserDialogOpened(func() { manager.desktop.OnFileChooserDialogOpened(func() {
manager.logger.Info().Msg("file chooser dialog opened") manager.logger.Info().Msg("file chooser dialog opened")
host := manager.sessions.GetHost() host, hasHost := manager.sessions.GetHost()
if host == nil { if !hasHost {
manager.logger.Warn().Msg("no host for file chooser dialog found, closing") manager.logger.Warn().Msg("no host for file chooser dialog found, closing")
go manager.desktop.CloseFileChooserDialog() go manager.desktop.CloseFileChooserDialog()
return return

View File

@ -42,7 +42,7 @@ func (h *MessageHandlerCtx) controlRequest(session types.Session) error {
if !h.sessions.Settings().ImplicitHosting { if !h.sessions.Settings().ImplicitHosting {
// tell session if there is a host // tell session if there is a host
if host := h.sessions.GetHost(); host != nil { if host, hasHost := h.sessions.GetHost(); hasHost {
session.Send( session.Send(
event.CONTROL_HOST, event.CONTROL_HOST,
message.ControlHost{ message.ControlHost{

View File

@ -12,14 +12,16 @@ import (
) )
func (h *MessageHandlerCtx) systemInit(session types.Session) error { func (h *MessageHandlerCtx) systemInit(session types.Session) error {
host := h.sessions.GetHost() host, hasHost := h.sessions.GetHost()
controlHost := message.ControlHost{ var hostID string
HasHost: host != nil, if hasHost {
hostID = host.ID()
} }
if controlHost.HasHost { controlHost := message.ControlHost{
controlHost.HostID = host.ID() HasHost: hasHost,
HostID: hostID,
} }
size := h.desktop.GetScreenSize() size := h.desktop.GetScreenSize()

View File

@ -132,8 +132,8 @@ func (manager *WebSocketManagerCtx) Start() {
}) })
manager.desktop.OnClipboardUpdated(func() { manager.desktop.OnClipboardUpdated(func() {
session := manager.sessions.GetHost() host, hasHost := manager.sessions.GetHost()
if session == nil || !session.Profile().CanAccessClipboard { if !hasHost || !host.Profile().CanAccessClipboard {
return return
} }
@ -145,7 +145,7 @@ func (manager *WebSocketManagerCtx) Start() {
return return
} }
session.Send( host.Send(
event.CLIPBOARD_UPDATED, event.CLIPBOARD_UPDATED,
message.ClipboardData{ message.ClipboardData{
Text: data.Text, Text: data.Text,
@ -232,26 +232,47 @@ func (manager *WebSocketManagerCtx) connect(connection *websocket.Conn, r *http.
Str("agent", r.UserAgent()). Str("agent", r.UserAgent()).
Msg("connection started") Msg("connection started")
session.SetWebSocketConnected(peer, true) session.SetWebSocketConnected(peer, true, false)
defer func() { // this is a blocking function that lives
logger.Info(). // throughout whole websocket connection
Str("address", connection.RemoteAddr().String()). err = manager.handle(connection, peer, session)
Str("agent", r.UserAgent()).
Msg("connection ended")
session.SetWebSocketConnected(peer, false) logger.Info().
}() Str("address", connection.RemoteAddr().String()).
Str("agent", r.UserAgent()).
Msg("connection ended")
manager.handle(connection, peer, session) delayedDisconnect := false
e, ok := err.(*websocket.CloseError)
if !ok {
logger.Err(err).Msg("read message error")
// client is expected to reconnect soon
delayedDisconnect = true
} else {
switch e.Code {
case websocket.CloseNormalClosure:
logger.Info().Str("reason", e.Text).Msg("websocket close")
case websocket.CloseGoingAway:
logger.Info().Str("reason", "going away").Msg("websocket close")
default:
logger.Warn().Err(err).Msg("websocket close")
// abnormal websocket closure:
// client is expected to reconnect soon
delayedDisconnect = true
}
}
session.SetWebSocketConnected(peer, false, delayedDisconnect)
} }
func (manager *WebSocketManagerCtx) handle(connection *websocket.Conn, peer types.WebSocketPeer, session types.Session) { func (manager *WebSocketManagerCtx) handle(connection *websocket.Conn, peer types.WebSocketPeer, session types.Session) error {
// add session id to logger context // add session id to logger context
logger := manager.logger.With().Str("session_id", session.ID()).Logger() logger := manager.logger.With().Str("session_id", session.ID()).Logger()
bytes := make(chan []byte) bytes := make(chan []byte)
cancel := make(chan struct{}) cancel := make(chan error)
ticker := time.NewTicker(pingPeriod) ticker := time.NewTicker(pingPeriod)
defer ticker.Stop() defer ticker.Stop()
@ -263,13 +284,7 @@ func (manager *WebSocketManagerCtx) handle(connection *websocket.Conn, peer type
for { for {
_, raw, err := connection.ReadMessage() _, raw, err := connection.ReadMessage()
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { cancel <- err
logger.Warn().Err(err).Msg("read message error")
} else {
logger.Debug().Err(err).Msg("read message error")
}
close(cancel)
break break
} }
@ -306,15 +321,14 @@ func (manager *WebSocketManagerCtx) handle(connection *websocket.Conn, peer type
if !handled { if !handled {
logger.Warn().Str("event", data.Event).Msg("unhandled message") logger.Warn().Str("event", data.Event).Msg("unhandled message")
} }
case <-cancel: case err := <-cancel:
return return err
case <-manager.shutdown: case <-manager.shutdown:
peer.Destroy("connection shutdown") peer.Destroy("connection shutdown")
return return nil
case <-ticker.C: case <-ticker.C:
if err := peer.Ping(); err != nil { if err := peer.Ping(); err != nil {
logger.Err(err).Msg("ping message has failed") return err
return
} }
} }
} }

View File

@ -41,7 +41,7 @@ type Session interface {
// websocket // websocket
SetWebSocketPeer(websocketPeer WebSocketPeer) SetWebSocketPeer(websocketPeer WebSocketPeer)
SetWebSocketConnected(websocketPeer WebSocketPeer, connected bool) SetWebSocketConnected(websocketPeer WebSocketPeer, connected bool, delayed bool)
GetWebSocketPeer() WebSocketPeer GetWebSocketPeer() WebSocketPeer
Send(event string, payload any) Send(event string, payload any)
@ -60,7 +60,7 @@ type SessionManager interface {
List() []Session List() []Session
SetHost(host Session) SetHost(host Session)
GetHost() Session GetHost() (Session, bool)
ClearHost() ClearHost()
SetCursor(cursor Cursor, session Session) SetCursor(cursor Cursor, session Session)