add sessions cursors.

This commit is contained in:
Miroslav Šedivý 2021-10-24 01:09:41 +02:00
parent 318b833b30
commit 0b5e064cab
7 changed files with 97 additions and 29 deletions

View File

@ -15,14 +15,12 @@ import (
func New(config *config.Session) *SessionManagerCtx {
manager := &SessionManagerCtx{
logger: log.With().Str("module", "session").Logger(),
config: config,
host: nil,
hostMu: sync.Mutex{},
tokens: make(map[string]string),
sessions: make(map[string]*SessionCtx),
sessionsMu: sync.Mutex{},
emmiter: events.New(),
logger: log.With().Str("module", "session").Logger(),
config: config,
tokens: make(map[string]string),
sessions: make(map[string]*SessionCtx),
cursors: make(map[types.Session]types.Cursor),
emmiter: events.New(),
}
// create API session
@ -48,13 +46,19 @@ func New(config *config.Session) *SessionManagerCtx {
}
type SessionManagerCtx struct {
logger zerolog.Logger
config *config.Session
host types.Session
hostMu sync.Mutex
logger zerolog.Logger
config *config.Session
tokens map[string]string
sessions map[string]*SessionCtx
sessionsMu sync.Mutex
host types.Session
hostMu sync.Mutex
cursors map[types.Session]types.Cursor
cursorsMu sync.Mutex
emmiter events.EventEmmiter
apiSession *SessionCtx
}
@ -193,6 +197,32 @@ func (manager *SessionManagerCtx) ClearHost() {
manager.SetHost(nil)
}
// ---
// cursors
// ---
func (manager *SessionManagerCtx) SetCursor(x, y int, session types.Session) {
manager.cursorsMu.Lock()
defer manager.cursorsMu.Unlock()
pos, ok := manager.cursors[session]
if ok {
pos.X, pos.Y = x, y
} else {
manager.cursors[session] = types.Cursor{X: x, Y: y}
}
}
func (manager *SessionManagerCtx) PopCursors() map[types.Session]types.Cursor {
manager.cursorsMu.Lock()
defer manager.cursorsMu.Unlock()
cursors := manager.cursors
manager.cursors = make(map[types.Session]types.Cursor)
return cursors
}
// ---
// broadcasts
// ---

View File

@ -16,10 +16,6 @@ type SessionCtx struct {
profile types.MemberProfile
state types.SessionState
positionX int
positionY int
positionMu sync.Mutex
websocketPeer types.WebSocketPeer
websocketMu sync.Mutex
@ -57,16 +53,8 @@ func (session *SessionCtx) IsHost() bool {
return session.manager.GetHost() == session
}
// ---
// cursor position
// ---
func (session *SessionCtx) SetPosition(x, y int) {
session.positionMu.Lock()
defer session.positionMu.Unlock()
session.positionX = x
session.positionY = y
func (session *SessionCtx) SetCursor(x, y int) {
session.manager.SetCursor(x, y, session)
}
// ---

View File

@ -21,6 +21,7 @@ const (
SESSION_DELETED = "session/deleted"
SESSION_PROFILE = "session/profile"
SESSION_STATE = "session/state"
SESSION_CURSORS = "session/cursors"
)
const (

View File

@ -87,6 +87,12 @@ type SessionData struct {
State types.SessionState `json:"state"`
}
type SessionCursor struct {
ID string `json:"id"`
X uint16 `json:"x"`
Y uint16 `json:"y"`
}
/////////////////////////////
// Control
/////////////////////////////

View File

@ -12,6 +12,11 @@ var (
ErrSessionLoginDisabled = errors.New("session login disabled")
)
type Cursor struct {
X int
Y int
}
type SessionState struct {
IsConnected bool `json:"is_connected"`
IsWatching bool `json:"is_watching"`
@ -23,8 +28,8 @@ type Session interface {
State() SessionState
IsHost() bool
// cursor position
SetPosition(x, y int)
// cursor
SetCursor(x, y int)
// websocket
SetWebSocketPeer(websocketPeer WebSocketPeer)
@ -50,6 +55,9 @@ type SessionManager interface {
GetHost() Session
ClearHost()
SetCursor(x, y int, session Session)
PopCursors() map[Session]Cursor
Broadcast(event string, payload interface{}, exclude interface{})
AdminBroadcast(event string, payload interface{}, exclude interface{})

View File

@ -52,7 +52,7 @@ func (manager *WebRTCManagerCtx) handle(data []byte, session types.Session) erro
// handle inactive cursor movement
if session.Profile().CanHost {
session.SetPosition(x, y)
session.SetCursor(x, y)
}
return nil

View File

@ -132,6 +132,41 @@ func (manager *WebSocketManagerCtx) Start() {
manager.fileChooserDialogEvents()
manager.wg.Add(1)
go func() {
defer manager.wg.Done()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-manager.shutdown:
return
case <-ticker.C:
cursorsMap := manager.sessions.PopCursors()
if len(cursorsMap) == 0 {
continue
}
cursors := []message.SessionCursor{}
for session, cursor := range cursorsMap {
cursors = append(
cursors,
message.SessionCursor{
ID: session.ID(),
X: uint16(cursor.X),
Y: uint16(cursor.Y),
},
)
}
// TODO: Send to subscribers only.
manager.sessions.AdminBroadcast(event.SESSION_CURSORS, cursors, nil)
}
}
}()
manager.logger.Info().Msg("websocket starting")
}