progress on server refactor

This commit is contained in:
Craig 2020-01-20 14:38:07 +00:00
parent 81abb88317
commit 78af798d68
14 changed files with 392 additions and 131 deletions

View File

@ -6,7 +6,6 @@ require (
github.com/go-chi/chi v4.0.3+incompatible
github.com/gorilla/websocket v1.4.1
github.com/kataras/go-events v0.0.2
github.com/matoous/go-nanoid v1.1.0
github.com/pion/logging v0.2.2
github.com/pion/webrtc/v2 v2.1.18
github.com/pkg/errors v0.8.1

View File

@ -81,8 +81,6 @@ github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzR
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA=
github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk=
github.com/matoous/go-nanoid v1.1.0 h1:B4BSMxTVgYrCHqtovL/adb8GFkE4mPCNntOOrdZLeCk=
github.com/matoous/go-nanoid v1.1.0/go.mod h1:L+uFUqrYwDgNWu5R02GrSxxcqX7ghiFuKPlKEOZ90GE=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=

View File

@ -1,15 +1,25 @@
package event
const SDP_REPLY = "sdp/reply"
const SDP_PROVIDE = "sdp/provide"
const CONTROL_RELEASE = "control/release"
const CONTROL_RELEASED = "control/released"
const CONTROL_REQUEST = "control/request"
const CONTROL_GIVE = "control/give"
const CONTROL_GIVEN = "control/given"
const CONTROL_LOCKED = "control/locked"
const CONTROL_REQUESTING = "control/requesting"
const SIGNAL_ANSWER = "signal/answer"
const SIGNAL_PROVIDE = "signal/provide"
const IDENTITY_PROVIDE = "identity/provide"
const IDENTITY_NAME = "identity/name"
const IDENTITY_DETAILS = "identity/details"
const MEMBER_LIST = "member/list"
const MEMBER_CONNECTED = "member/connected"
const MEMBER_DISCONNECTED = "member/disconnected"
const CONTROL_LOCKED = "control/locked"
const CONTROL_RELEASE = "control/release"
const CONTROL_REQUEST = "control/request"
const CONTROL_REQUESTING = "control/requesting"
// TODO
const ADMIN_BAN = "admin/ban"
const ADMIN_KICK = "admin/kick"
const ADMIN_LOCK = "admin/lock"
const ADMIN_MUTE = "admin/mute"
const ADMIN_UNMUTE = "admin/unmute"
const ADMIN_FORCE_CONTROL = "admin/force/control"
const ADMIN_FORCE_RELEASE = "admin/force/release"

View File

@ -1,15 +1,47 @@
package message
import "n.eko.moe/neko/internal/session"
type Message struct {
Event string `json:"event"`
}
type IdentityProvide struct {
type Identity struct {
Message
ID string `json:"id"`
}
type SDP struct {
type IdentityDetails struct {
Message
Username string `json:"username"`
}
type Signal struct {
Message
SDP string `json:"sdp"`
}
type Members struct {
Message
Memebers []*session.Session `json:"members"`
}
type Member struct {
Message
*session.Session
}
type MemberDisconnected struct {
Message
ID string `json:"id"`
}
type Control struct {
Message
ID string `json:"id"`
}
type Chat struct {
Message
ID string `json:"id"`
Content string `json:"content"`
}

View File

@ -26,9 +26,10 @@ type SessionManager struct {
func (m *SessionManager) New(id string, admin bool, socket *websocket.Conn) *Session {
session := &Session{
ID: id,
Admin: admin,
socket: socket,
ID: id,
Admin: admin,
socket: socket,
connected: false,
}
m.members[id] = session
@ -76,6 +77,17 @@ func (m *SessionManager) Get(id string) (*Session, bool) {
return session, ok
}
func (m *SessionManager) GetConnected() []*Session {
var sessions []*Session
for _, sess := range m.members {
if sess.connected {
sessions = append(sessions, sess)
}
}
return sessions
}
func (m *SessionManager) Set(id string, session *Session) {
m.members[id] = session
}
@ -115,6 +127,8 @@ func (m *SessionManager) SetName(id string, name string) (bool, error) {
session, ok := m.members[id]
if ok {
session.Name = name
session.connected = true
m.emmiter.Emit("connected", id, session)
return true, nil
}
@ -126,24 +140,21 @@ func (m *SessionManager) Clear() error {
}
func (m *SessionManager) Brodcast(v interface{}, exclude interface{}) error {
if exclude != nil {
for id, sess := range m.members {
for id, sess := range m.members {
if !sess.connected {
continue
}
if exclude != nil {
if in, _ := utils.ArrayIn(id, exclude); in {
continue
}
if err := sess.Send(v); err != nil {
return err
}
}
} else {
for _, sess := range m.members {
if err := sess.Send(v); err != nil {
return err
}
if err := sess.Send(v); err != nil {
return err
}
}
return nil
}
@ -165,6 +176,12 @@ func (m *SessionManager) OnCreated(listener func(id string, session *Session)) {
})
}
func (m *SessionManager) OnConnected(listener func(id string, session *Session)) {
m.emmiter.On("connected", func(payload ...interface{}) {
listener(payload[0].(string), payload[1].(*Session))
})
}
func (m *SessionManager) OnDestroy(listener func(id string)) {
m.emmiter.On("destroyed", func(payload ...interface{}) {
listener(payload[0].(string))

View File

@ -8,12 +8,13 @@ import (
)
type Session struct {
ID string
Name string
Admin bool
socket *websocket.Conn
peer *webrtc.PeerConnection
mu sync.Mutex
ID string `json:"id"`
Name string `json:"username"`
Admin bool `json:"admin"`
connected bool
socket *websocket.Conn
peer *webrtc.PeerConnection
mu sync.Mutex
}
// TODO: write to peer data channel

View File

@ -0,0 +1,98 @@
package utils
import (
"crypto/rand"
"fmt"
"math"
)
const (
defaultAlphabet = "_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" // len=64
defaultSize = 21
defaultMaskSize = 5
)
// Generator function
type Generator func([]byte) (int, error)
// BytesGenerator is the default bytes generator
var BytesGenerator Generator = rand.Read
func initMasks(params ...int) []uint {
var size int
if len(params) == 0 {
size = defaultMaskSize
} else {
size = params[0]
}
masks := make([]uint, size)
for i := 0; i < size; i++ {
shift := 3 + i
masks[i] = (2 << uint(shift)) - 1
}
return masks
}
func getMask(alphabet string, masks []uint) int {
for i := 0; i < len(masks); i++ {
curr := int(masks[i])
if curr >= len(alphabet)-1 {
return curr
}
}
return 0
}
// GenerateUID is a low-level function to change alphabet and ID size.
func GenerateUID(alphabet string, size int) (string, error) {
if len(alphabet) == 0 || len(alphabet) > 255 {
return "", fmt.Errorf("alphabet must not empty and contain no more than 255 chars. Current len is %d", len(alphabet))
}
if size <= 0 {
return "", fmt.Errorf("size must be positive integer")
}
masks := initMasks(size)
mask := getMask(alphabet, masks)
ceilArg := 1.6 * float64(mask*size) / float64(len(alphabet))
step := int(math.Ceil(ceilArg))
id := make([]byte, size)
bytes := make([]byte, step)
for j := 0; ; {
_, err := BytesGenerator(bytes)
if err != nil {
return "", err
}
for i := 0; i < step; i++ {
currByte := bytes[i] & byte(mask)
if currByte < byte(len(alphabet)) {
id[j] = alphabet[currByte]
j++
if j == size {
return string(id[:size]), nil
}
}
}
}
}
// NewUID generates secure URL-friendly unique ID.
func NewUID(param ...int) (string, error) {
var size int
if len(param) == 0 {
size = defaultSize
} else {
size = param[0]
}
bytes := make([]byte, size)
_, err := BytesGenerator(bytes)
if err != nil {
return "", err
}
id := make([]byte, size)
for i := 0; i < size; i++ {
id[i] = defaultAlphabet[bytes[i]&63]
}
return string(id[:size]), nil
}

View File

@ -25,7 +25,6 @@ type loggerFactory struct {
}
func (l loggerFactory) NewLogger(subsystem string) logging.LeveledLogger {
l.logger.Debug().Msgf("creating logger for %s", subsystem)
return logger{
logger: l.logger.With().Str("subsystem", subsystem).Logger(),
}

View File

@ -192,8 +192,8 @@ func (m *WebRTCManager) CreatePeer(id string, sdp string) error {
return err
}
if err := session.Send(message.SDP{
Message: message.Message{Event: event.SDP_REPLY},
if err := session.Send(message.Signal{
Message: message.Message{Event: event.SIGNAL_ANSWER},
SDP: answer.SDP,
}); err != nil {
return err

View File

@ -0,0 +1,78 @@
package websocket
import (
"n.eko.moe/neko/internal/event"
"n.eko.moe/neko/internal/message"
"n.eko.moe/neko/internal/session"
)
func (h *MessageHandler) controlRelease(id string, session *session.Session) error {
// check if session is host
if !h.sessions.IsHost(id) {
return nil
}
// release host
h.logger.Debug().Str("id", id).Msgf("host called %s", event.CONTROL_RELEASE)
h.sessions.ClearHost()
// tell everyone
if err := h.sessions.Brodcast(
message.Control{
Message: message.Message{Event: event.CONTROL_RELEASE},
ID: id,
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE)
return err
}
return nil
}
func (h *MessageHandler) controlRequest(id string, session *session.Session) error {
h.logger.Debug().Str("id", id).Msgf("user called %s", event.CONTROL_REQUEST)
// check for host
if !h.sessions.HasHost() {
// set host
h.sessions.SetHost(id)
// let everyone know
if err := h.sessions.Brodcast(
message.Control{
Message: message.Message{Event: event.CONTROL_LOCKED},
ID: id,
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_LOCKED)
return err
}
return nil
}
// get host
host, ok := h.sessions.GetHost()
if ok {
// tell session there is a host
if err := session.Send(message.Control{
Message: message.Message{Event: event.CONTROL_REQUEST},
ID: host.ID,
}); err != nil {
h.logger.Warn().Err(err).Str("id", id).Msgf("sending event %s has failed", event.CONTROL_REQUEST)
return err
}
// tell host session wants to be host
if err := host.Send(message.Control{
Message: message.Message{Event: event.CONTROL_REQUESTING},
ID: id,
}); err != nil {
h.logger.Warn().Err(err).Str("id", host.ID).Msgf("sending event %s has failed", event.CONTROL_REQUESTING)
return err
}
}
return nil
}

View File

@ -6,12 +6,12 @@ import (
"time"
"github.com/gorilla/websocket"
gonanoid "github.com/matoous/go-nanoid"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"n.eko.moe/neko/internal/config"
"n.eko.moe/neko/internal/session"
"n.eko.moe/neko/internal/utils"
"n.eko.moe/neko/internal/webrtc"
)
@ -38,8 +38,6 @@ func New(sessions *session.SessionManager, webrtc *webrtc.WebRTCManager, conf *c
// Send pings to peer with this period. Must be less than pongWait.
const pingPeriod = 60 * time.Second
const alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
type WebSocketHandler struct {
logger zerolog.Logger
upgrader websocket.Upgrader
@ -65,15 +63,23 @@ func (ws *WebSocketHandler) Start() error {
}()
ws.sessions.OnCreated(func(id string, session *session.Session) {
if err := ws.handler.Created(id, session); err != nil {
if err := ws.handler.SessionCreated(id, session); err != nil {
ws.logger.Warn().Str("id", id).Err(err).Msg("session created with and error")
} else {
ws.logger.Debug().Str("id", id).Msg("session created")
}
})
ws.sessions.OnConnected(func(id string, session *session.Session) {
if err := ws.handler.SessionConnected(id, session); err != nil {
ws.logger.Warn().Str("id", id).Err(err).Msg("session connected with and error")
} else {
ws.logger.Debug().Str("id", id).Msg("session connected")
}
})
ws.sessions.OnDestroy(func(id string) {
if err := ws.handler.Destroyed(id); err != nil {
if err := ws.handler.SessionDestroyed(id); err != nil {
ws.logger.Warn().Str("id", id).Err(err).Msg("session destroyed with and error")
} else {
ws.logger.Debug().Str("id", id).Msg("session destroyed")
@ -122,7 +128,7 @@ func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) erro
Msg("session ended")
}()
if err = ws.handler.Connected(id, socket); err != nil {
if err = ws.handler.SocketConnected(id, socket); err != nil {
ws.logger.Error().Err(err).Msg("connection failed")
if err = socket.Close(); err != nil {
return err
@ -135,7 +141,7 @@ func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) erro
}
func (ws *WebSocketHandler) authenticate(r *http.Request) (string, bool, error) {
id, err := gonanoid.Generate(alphabet, 32)
id, err := utils.NewUID(32)
if err != nil {
return "", false, err
}
@ -165,7 +171,7 @@ func (ws *WebSocketHandler) handle(socket *websocket.Conn, id string) {
defer func() {
ticker.Stop()
ws.logger.Debug().Str("address", socket.RemoteAddr().String()).Msg("handle socket ending")
ws.handler.Disconnected(id)
ws.handler.SocketDisconnected(id)
}()
for {

View File

@ -0,0 +1,14 @@
package websocket
import (
"n.eko.moe/neko/internal/message"
"n.eko.moe/neko/internal/session"
)
func (h *MessageHandler) identityDetails(id string, session *session.Session, payload *message.IdentityDetails) error {
if _, err := h.sessions.SetName(id, payload.Username); err != nil {
return err
}
return nil
}

View File

@ -20,36 +20,14 @@ type MessageHandler struct {
webrtc *webrtc.WebRTCManager
}
func (h *MessageHandler) Connected(id string, socket *websocket.Conn) error {
func (h *MessageHandler) SocketConnected(id string, socket *websocket.Conn) error {
return nil
}
func (h *MessageHandler) Disconnected(id string) error {
func (h *MessageHandler) SocketDisconnected(id string) error {
return h.sessions.Destroy(id)
}
func (h *MessageHandler) Created(id string, session *session.Session) error {
if err := session.Send(message.IdentityProvide{
Message: message.Message{Event: event.IDENTITY_PROVIDE},
ID: id,
}); err != nil {
return err
}
return nil
}
func (h *MessageHandler) Destroyed(id string) error {
if h.sessions.IsHost(id) {
h.sessions.ClearHost()
if err := h.sessions.Brodcast(message.Message{Event: event.CONTROL_RELEASED}, []string{id}); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASED)
}
}
return nil
}
func (h *MessageHandler) Message(id string, raw []byte) error {
header := message.Message{}
if err := json.Unmarshal(raw, &header); err != nil {
@ -62,9 +40,18 @@ func (h *MessageHandler) Message(id string, raw []byte) error {
}
switch header.Event {
case event.SDP_PROVIDE:
payload := message.SDP{}
return errors.Wrapf(utils.Unmarshal(&payload, raw, func() error { return h.webrtc.CreatePeer(id, payload.SDP) }), "%s failed", header.Event)
case event.SIGNAL_PROVIDE:
payload := message.Signal{}
return errors.Wrapf(
utils.Unmarshal(&payload, raw, func() error {
return h.webrtc.CreatePeer(id, payload.SDP)
}), "%s failed", header.Event)
case event.IDENTITY_DETAILS:
payload := &message.IdentityDetails{}
return errors.Wrapf(
utils.Unmarshal(payload, raw, func() error {
return h.identityDetails(id, session, payload)
}), "%s failed", header.Event)
case event.CONTROL_RELEASE:
return errors.Wrapf(h.controlRelease(id, session), "%s failed", header.Event)
case event.CONTROL_REQUEST:
@ -73,59 +60,3 @@ func (h *MessageHandler) Message(id string, raw []byte) error {
return errors.Errorf("unknown message event %s", header.Event)
}
}
func (h *MessageHandler) controlRelease(id string, session *session.Session) error {
if !h.sessions.IsHost(id) {
return nil
}
h.logger.Debug().Str("id", id).Msgf("host called %s", event.CONTROL_RELEASED)
h.sessions.ClearHost()
if err := session.Send(message.Message{Event: event.CONTROL_RELEASE}); err != nil {
h.logger.Warn().Err(err).Str("id", id).Msgf("sending event %s has failed", event.CONTROL_RELEASE)
return err
}
if err := h.sessions.Brodcast(message.Message{Event: event.CONTROL_RELEASED}, []string{session.ID}); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASED)
return err
}
return nil
}
func (h *MessageHandler) controlRequest(id string, session *session.Session) error {
h.logger.Debug().Str("id", id).Msgf("user called %s", event.CONTROL_REQUEST)
if !h.sessions.HasHost() {
h.sessions.SetHost(id)
if err := session.Send(message.Message{Event: event.CONTROL_GIVE}); err != nil {
h.logger.Warn().Err(err).Str("id", id).Msgf("sending event %s has failed", event.CONTROL_GIVE)
return err
}
if err := h.sessions.Brodcast(message.Message{Event: event.CONTROL_GIVEN}, []string{session.ID}); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_GIVEN)
return err
}
return nil
}
if err := session.Send(message.Message{Event: event.CONTROL_LOCKED}); err != nil {
h.logger.Warn().Err(err).Str("id", id).Msgf("sending event %s has failed", event.CONTROL_LOCKED)
return err
}
host, ok := h.sessions.GetHost()
if ok {
if err := host.Send(message.Message{Event: event.CONTROL_REQUESTING}); err != nil {
h.logger.Warn().Err(err).Str("id", id).Msgf("sending event %s has failed", event.CONTROL_REQUESTING)
return err
}
}
return nil
}

View File

@ -0,0 +1,78 @@
package websocket
import (
"n.eko.moe/neko/internal/event"
"n.eko.moe/neko/internal/message"
"n.eko.moe/neko/internal/session"
)
func (h *MessageHandler) SessionCreated(id string, session *session.Session) error {
if err := session.Send(message.Identity{
Message: message.Message{Event: event.IDENTITY_PROVIDE},
ID: id,
}); err != nil {
return err
}
return nil
}
func (h *MessageHandler) SessionConnected(id string, session *session.Session) error {
// send list of members to session
if err := session.Send(message.Members{
Message: message.Message{Event: event.MEMBER_LIST},
Memebers: h.sessions.GetConnected(),
}); err != nil {
h.logger.Warn().Str("id", id).Err(err).Msgf("sending event %s has failed", event.MEMBER_LIST)
return err
}
// tell session there is a host
host, ok := h.sessions.GetHost()
if ok {
if err := session.Send(message.Control{
Message: message.Message{Event: event.CONTROL_LOCKED},
ID: host.ID,
}); err != nil {
h.logger.Warn().Str("id", id).Err(err).Msgf("sending event %s has failed", event.CONTROL_LOCKED)
return err
}
}
// let everyone know there is a new session
if err := h.sessions.Brodcast(
message.Member{
Message: message.Message{Event: event.MEMBER_CONNECTED},
Session: session,
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE)
return err
}
return nil
}
func (h *MessageHandler) SessionDestroyed(id string) error {
// clear host if exists
if h.sessions.IsHost(id) {
h.sessions.ClearHost()
if err := h.sessions.Brodcast(message.Control{
Message: message.Message{Event: event.CONTROL_RELEASE},
ID: id,
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE)
}
}
// let everyone know session disconnected
if err := h.sessions.Brodcast(
message.MemberDisconnected{
Message: message.Message{Event: event.MEMBER_DISCONNECTED},
ID: id,
}, nil); err != nil {
h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.MEMBER_DISCONNECTED)
return err
}
return nil
}