Merge pull request #8 from m1k1o/broadcast-gui

Add Broadcast functionality
This commit is contained in:
Miroslav Šedivý 2020-09-27 01:37:33 +02:00 committed by GitHub
commit 1977e017dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 366 additions and 134 deletions

View File

@ -8,7 +8,7 @@ WORKDIR /src
# install dependencies # install dependencies
RUN set -eux; apt-get update; \ RUN set -eux; apt-get update; \
apt-get install -y --no-install-recommends git cmake make libx11-dev libxrandr-dev libxtst-dev \ apt-get install -y --no-install-recommends git cmake make libx11-dev libxrandr-dev libxtst-dev \
libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-base gstreamer1.0-plugins-good; \ libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly; \
# #
# install libclipboard # install libclipboard
set -eux; \ set -eux; \
@ -69,7 +69,7 @@ RUN set -eux; apt-get update; \
# #
# gst # gst
apt-get install -y --no-install-recommends libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \ apt-get install -y --no-install-recommends libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \
gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-pulseaudio; \ gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-pulseaudio; \
# #
# create a non-root user # create a non-root user
groupadd --gid $USER_GID $USERNAME; \ groupadd --gid $USER_GID $USERNAME; \

View File

@ -19,49 +19,49 @@
"lint": "vue-cli-service lint" "lint": "vue-cli-service lint"
}, },
"dependencies": { "dependencies": {
"@fortawesome/fontawesome-free": "^5.13.0", "@fortawesome/fontawesome-free": "^5.14.0",
"animejs": "^3.1.0", "animejs": "^3.2.0",
"axios": "^0.19.1", "axios": "^0.19.1",
"date-fns": "^2.11.1", "date-fns": "^2.16.1",
"emoji-datasource": "^5.0.1", "emoji-datasource": "^5.0.1",
"emojilib": "^2.4.0", "emojilib": "^2.4.0",
"eventemitter3": "^4.0.0", "eventemitter3": "^4.0.7",
"resize-observer-polyfill": "^1.5.1", "resize-observer-polyfill": "^1.5.1",
"simple-markdown": "^0.7.2", "simple-markdown": "^0.7.2",
"sweetalert2": "^9.10.9", "sweetalert2": "^9.17.2",
"typed-vuex": "^0.1.17", "typed-vuex": "^0.1.21",
"v-tooltip": "^2.0.3", "v-tooltip": "^2.0.3",
"vue": "^2.6.10", "vue": "^2.6.12",
"vue-class-component": "^7.2.3", "vue-class-component": "^7.2.6",
"vue-clickaway": "^2.2.2", "vue-clickaway": "^2.2.2",
"vue-context": "^5.1.0", "vue-context": "^5.2.0",
"vue-i18n": "^8.16.0", "vue-i18n": "^8.21.1",
"vue-notification": "^1.3.20", "vue-notification": "^1.3.20",
"vue-property-decorator": "^8.4.1", "vue-property-decorator": "^8.5.1",
"vuex": "^3.1.3" "vuex": "^3.5.1"
}, },
"devDependencies": { "devDependencies": {
"@types/animejs": "^3.1.0", "@types/animejs": "^3.1.2",
"@types/node": "^13.7.0", "@types/node": "^13.13.21",
"@types/vue": "^2.0.0", "@types/vue": "^2.0.0",
"@types/vue-clickaway": "^2.2.0", "@types/vue-clickaway": "^2.2.0",
"@typescript-eslint/eslint-plugin": "^2.26.0", "@typescript-eslint/eslint-plugin": "^2.34.0",
"@typescript-eslint/parser": "^2.26.0", "@typescript-eslint/parser": "^2.34.0",
"@vue/cli-plugin-babel": "^4.1.0", "@vue/cli-plugin-babel": "^4.5.6",
"@vue/cli-plugin-eslint": "^4.1.0", "@vue/cli-plugin-eslint": "^4.5.6",
"@vue/cli-plugin-typescript": "^4.1.0", "@vue/cli-plugin-typescript": "^4.5.6",
"@vue/cli-plugin-vuex": "^4.1.0", "@vue/cli-plugin-vuex": "^4.5.6",
"@vue/cli-service": "^4.1.0", "@vue/cli-service": "^4.5.6",
"@vue/eslint-config-prettier": "^6.0.0", "@vue/eslint-config-prettier": "^6.0.0",
"@vue/eslint-config-typescript": "^5.0.2", "@vue/eslint-config-typescript": "^5.1.0",
"eslint": "^6.8.0", "eslint": "^6.8.0",
"eslint-plugin-prettier": "^3.1.1", "eslint-plugin-prettier": "^3.1.4",
"eslint-plugin-vue": "^6.2.2", "eslint-plugin-vue": "^6.2.2",
"node-sass": "^4.12.0", "node-sass": "^4.14.1",
"prettier": "^2.0.2", "prettier": "^2.1.2",
"sass-loader": "^8.0.0", "sass-loader": "^8.0.0",
"ts-node": "^8.6.2", "ts-node": "^8.10.2",
"typescript": "^3.8.3", "typescript": "^3.9.7",
"vue-template-compiler": "^2.6.10" "vue-template-compiler": "^2.6.12"
} }
} }

View File

@ -48,6 +48,19 @@
<span /> <span />
</label> </label>
</li> </li>
<template v-if="admin">
<li>
<span>{{ $t('setting.broadcast_is_active') }}</span>
<label class="switch">
<input type="checkbox" v-model="broadcast_is_active" />
<span />
</label>
</li>
<li>
<span>{{ $t('setting.broadcast_url') }}</span>
<input v-model="broadcast_url" :disabled="broadcast_is_active" class="input">
</li>
</template>
<li v-if="connected"> <li v-if="connected">
<button @click.stop.prevent="logout">{{ $t('logout') }}</button> <button @click.stop.prevent="logout">{{ $t('logout') }}</button>
</li> </li>
@ -220,6 +233,30 @@
} }
} }
} }
.input {
display: block;
height: 30px;
text-align: right;
padding: 0 10px;
margin-left: 10px;
line-height: 30px;
text-overflow: ellipsis;
border: 1px solid transparent;
border-radius: 5px;
color: white;
background-color: $background-tertiary;
font-weight: lighter;
user-select: auto;
&::selection {
background: $text-normal;
}
&[disabled] {
background: none;
}
}
} }
} }
} }
@ -230,6 +267,12 @@
@Component({ name: 'neko-settings' }) @Component({ name: 'neko-settings' })
export default class extends Vue { export default class extends Vue {
private broadcast_url: string = '';
get admin() {
return this.$accessor.user.admin
}
get connected() { get connected() {
return this.$accessor.connected return this.$accessor.connected
} }
@ -282,6 +325,27 @@
return this.$accessor.settings.keyboard_layout return this.$accessor.settings.keyboard_layout
} }
get broadcast_is_active() {
return this.$accessor.settings.broadcast_is_active
}
set broadcast_is_active(value: boolean) {
if (value) {
this.$accessor.settings.broadcastCreate(this.broadcast_url)
} else {
this.$accessor.settings.broadcastDestroy()
}
}
get broadcast_url_remote() {
return this.$accessor.settings.broadcast_url
}
@Watch('broadcast_url_remote', { immediate: true })
onBroadcastUrlChange() {
this.broadcast_url = this.broadcast_url_remote
}
set keyboard_layout(value: string) { set keyboard_layout(value: string) {
this.$accessor.settings.setKeyboardLayout(value) this.$accessor.settings.setKeyboardLayout(value)
this.$accessor.remote.changeKeyboard() this.$accessor.remote.changeKeyboard()

View File

@ -61,6 +61,8 @@ export const setting = {
ignore_emotes: 'Ignore Emotes', ignore_emotes: 'Ignore Emotes',
chat_sound: 'Play Chat Sound', chat_sound: 'Play Chat Sound',
keyboard_layout: 'Change Keyboard Layout', keyboard_layout: 'Change Keyboard Layout',
broadcast_is_active: 'Broadcast Enabled',
broadcast_url: 'RTMP url',
} }
export const connection = { export const connection = {

View File

@ -38,6 +38,11 @@ export const EVENT = {
RESOLUTION: 'screen/resolution', RESOLUTION: 'screen/resolution',
SET: 'screen/set', SET: 'screen/set',
}, },
BROADCAST: {
STATUS: "broadcast/status",
CREATE: "broadcast/create",
DESTROY: "broadcast/destroy",
},
ADMIN: { ADMIN: {
BAN: 'admin/ban', BAN: 'admin/ban',
KICK: 'admin/kick', KICK: 'admin/kick',
@ -60,6 +65,7 @@ export type WebSocketEvents =
| SignalEvents | SignalEvents
| ChatEvents | ChatEvents
| ScreenEvents | ScreenEvents
| BroadcastEvents
| AdminEvents | AdminEvents
export type ControlEvents = export type ControlEvents =
@ -76,6 +82,11 @@ export type SignalEvents = typeof EVENT.SIGNAL.ANSWER | typeof EVENT.SIGNAL.PROV
export type ChatEvents = typeof EVENT.CHAT.MESSAGE | typeof EVENT.CHAT.EMOTE export type ChatEvents = typeof EVENT.CHAT.MESSAGE | typeof EVENT.CHAT.EMOTE
export type ScreenEvents = typeof EVENT.SCREEN.CONFIGURATIONS | typeof EVENT.SCREEN.RESOLUTION | typeof EVENT.SCREEN.SET export type ScreenEvents = typeof EVENT.SCREEN.CONFIGURATIONS | typeof EVENT.SCREEN.RESOLUTION | typeof EVENT.SCREEN.SET
export type BroadcastEvents =
| typeof EVENT.BROADCAST.STATUS
| typeof EVENT.BROADCAST.CREATE
| typeof EVENT.BROADCAST.DESTROY
export type AdminEvents = export type AdminEvents =
| typeof EVENT.ADMIN.BAN | typeof EVENT.ADMIN.BAN
| typeof EVENT.ADMIN.KICK | typeof EVENT.ADMIN.KICK

View File

@ -18,6 +18,7 @@ import {
ControlClipboardPayload, ControlClipboardPayload,
ScreenConfigurationsPayload, ScreenConfigurationsPayload,
ScreenResolutionPayload, ScreenResolutionPayload,
BroadcastStatusPayload,
AdminPayload, AdminPayload,
AdminTargetPayload, AdminTargetPayload,
} from './messages' } from './messages'
@ -326,6 +327,13 @@ export class NekoClient extends BaseClient implements EventEmitter<NekoEvents> {
}) })
} }
/////////////////////////////
// Broadcast Events
/////////////////////////////
protected [EVENT.BROADCAST.STATUS](payload: BroadcastStatusPayload) {
this.$accessor.settings.broadcastStatus(payload)
}
///////////////////////////// /////////////////////////////
// Admin Events // Admin Events
///////////////////////////// /////////////////////////////

View File

@ -37,6 +37,8 @@ export type WebSocketPayloads =
| ScreenResolutionPayload | ScreenResolutionPayload
| ScreenConfigurationsPayload | ScreenConfigurationsPayload
| AdminPayload | AdminPayload
| BroadcastStatusPayload
| BroadcastCreatePayload
export interface WebSocketMessage { export interface WebSocketMessage {
event: WebSocketEvents | string event: WebSocketEvents | string
@ -177,6 +179,18 @@ export interface ScreenConfigurationsPayload {
configurations: ScreenConfigurations configurations: ScreenConfigurations
} }
/*
BROADCAST PAYLOADS
*/
export interface BroadcastCreatePayload {
url: string
}
export interface BroadcastStatusPayload {
url: string
isActive: boolean
}
/* /*
ADMIN PAYLOADS ADMIN PAYLOADS
*/ */

View File

@ -1,5 +1,6 @@
import { getterTree, mutationTree, actionTree } from 'typed-vuex' import { getterTree, mutationTree, actionTree } from 'typed-vuex'
import { get, set } from '~/utils/localstorage' import { get, set } from '~/utils/localstorage'
import { EVENT } from '~/neko/events'
import { accessor } from '~/store' import { accessor } from '~/store'
export const namespaced = true export const namespaced = true
@ -18,6 +19,9 @@ export const state = () => {
keyboard_layout: get<string>('keyboard_layout', 'us'), keyboard_layout: get<string>('keyboard_layout', 'us'),
keyboard_layouts_list: {} as KeyboardLayouts, keyboard_layouts_list: {} as KeyboardLayouts,
broadcast_is_active: false,
broadcast_url: "",
} }
} }
@ -57,6 +61,10 @@ export const mutations = mutationTree(state, {
setKeyboardLayoutsList(state, value: KeyboardLayouts) { setKeyboardLayoutsList(state, value: KeyboardLayouts) {
state.keyboard_layouts_list = value state.keyboard_layouts_list = value
}, },
setBroadcastStatus(state, { url, isActive }) {
state.broadcast_url = url,
state.broadcast_is_active = isActive
},
}) })
export const actions = actionTree( export const actions = actionTree(
@ -71,5 +79,15 @@ export const actions = actionTree(
}) })
.catch(console.error) .catch(console.error)
}, },
broadcastStatus({ getters }, { url, isActive }) {
accessor.settings.setBroadcastStatus({ url, isActive })
},
broadcastCreate({ getters }, url: string) {
$client.sendMessage(EVENT.BROADCAST.CREATE, { url })
},
broadcastDestroy({ getters }) {
$client.sendMessage(EVENT.BROADCAST.DESTROY)
},
}, },
) )

View File

@ -12,9 +12,12 @@ services:
- SYS_ADMIN - SYS_ADMIN
environment: environment:
DISPLAY: :99.0 DISPLAY: :99.0
NEKO_SCREEN: '1280x720@30' NEKO_SCREEN: '1920x1080@30'
NEKO_PASSWORD: neko NEKO_PASSWORD: neko
NEKO_PASSWORD_ADMIN: admin NEKO_PASSWORD_ADMIN: admin
NEKO_BIND: :8080 NEKO_BIND: :8080
NEKO_EPR: 52000-52010 NEKO_EPR: 52000-52010
NEKO_NAT1TO1: 192.168.1.20 NEKO_NAT1TO1: 192.168.1.20
NEKO_BROADCAST: 'true'
NEKO_RTMP: 'rtmp://192.168.1.20/live/neko'

View File

@ -3,6 +3,7 @@ package broadcast
import ( import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"n.eko.moe/neko/internal/gst" "n.eko.moe/neko/internal/gst"
"n.eko.moe/neko/internal/types/config" "n.eko.moe/neko/internal/types/config"
) )
@ -10,34 +11,70 @@ import (
type BroadcastManager struct { type BroadcastManager struct {
logger zerolog.Logger logger zerolog.Logger
pipeline *gst.Pipeline pipeline *gst.Pipeline
config *config.Broadcast remote *config.Remote
enabled bool
url string
} }
func New(config *config.Broadcast) *BroadcastManager { func New(remote *config.Remote) *BroadcastManager {
return &BroadcastManager{ return &BroadcastManager{
logger: log.With().Str("module", "remote").Logger(), logger: log.With().Str("module", "remote").Logger(),
config: config, remote: remote,
enabled: false,
url: "",
} }
} }
func (manager *BroadcastManager) Start() { func (manager *BroadcastManager) Start() {
if !manager.enabled || manager.IsActive() {
return
}
var err error var err error
manager.pipeline, err = gst.CreateRTMPPipeline( manager.pipeline, err = gst.CreateRTMPPipeline(
manager.config.Device, manager.remote.Device,
manager.config.Display, manager.remote.Display,
manager.config.RTMP, manager.url,
) )
manager.logger.Info().
Str("audio_device", manager.remote.Device).
Str("video_display", manager.remote.Display).
Str("rtmp_pipeline_src", manager.pipeline.Src).
Msgf("RTMP pipeline is starting...")
if err != nil { if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline")
return
} }
manager.pipeline.Start() manager.pipeline.Play()
} }
func (manager *BroadcastManager) Shutdown() error { func (manager *BroadcastManager) Stop() {
if manager.pipeline != nil { if !manager.IsActive() {
manager.pipeline.Stop() return
} }
return nil manager.pipeline.Stop()
manager.pipeline = nil
}
func (manager *BroadcastManager) IsActive() bool {
return manager.pipeline != nil
}
func (manager *BroadcastManager) Create(url string) {
manager.url = url
manager.enabled = true
manager.Start()
}
func (manager *BroadcastManager) Destroy() {
manager.Stop()
manager.enabled = false
}
func (manager *BroadcastManager) GetUrl() string {
return manager.url
} }

View File

@ -84,6 +84,10 @@ void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) {
gst_element_set_state(pipeline, GST_STATE_PLAYING); gst_element_set_state(pipeline, GST_STATE_PLAYING);
} }
void gstreamer_send_play_pipeline(GstElement *pipeline) {
gst_element_set_state(pipeline, GST_STATE_PLAYING);
}
void gstreamer_send_stop_pipeline(GstElement *pipeline) { void gstreamer_send_stop_pipeline(GstElement *pipeline) {
gst_element_set_state(pipeline, GST_STATE_NULL); gst_element_set_state(pipeline, GST_STATE_NULL);
} }

View File

@ -68,7 +68,8 @@ func init() {
func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineRTMP string) (*Pipeline, error) { func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineRTMP string) (*Pipeline, error) {
video := fmt.Sprintf(videoSrc, pipelineDisplay) video := fmt.Sprintf(videoSrc, pipelineDisplay)
audio := fmt.Sprintf(audioSrc, pipelineDevice) audio := fmt.Sprintf(audioSrc, pipelineDevice)
return CreatePipeline(fmt.Sprintf("%s ! x264enc ! flv. ! %s ! faac ! flv. ! flvmux name='flv' ! rtmpsink location='%s'", video, audio, pipelineRTMP), "", 0)
return CreatePipeline(fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video), "", 0)
} }
// CreateAppPipeline creates a GStreamer Pipeline // CreateAppPipeline creates a GStreamer Pipeline
@ -228,6 +229,11 @@ func (p *Pipeline) Start() {
C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id)) C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id))
} }
// Play starts the GStreamer Pipeline
func (p *Pipeline) Play() {
C.gstreamer_send_play_pipeline(p.Pipeline)
}
// Stop stops the GStreamer Pipeline // Stop stops the GStreamer Pipeline
func (p *Pipeline) Stop() { func (p *Pipeline) Stop() {
C.gstreamer_send_stop_pipeline(p.Pipeline) C.gstreamer_send_stop_pipeline(p.Pipeline)

View File

@ -11,6 +11,7 @@ extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int
GstElement *gstreamer_send_create_pipeline(char *pipeline); GstElement *gstreamer_send_create_pipeline(char *pipeline);
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId); void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId);
void gstreamer_send_play_pipeline(GstElement *pipeline);
void gstreamer_send_stop_pipeline(GstElement *pipeline); void gstreamer_send_stop_pipeline(GstElement *pipeline);
void gstreamer_send_start_mainloop(void); void gstreamer_send_start_mainloop(void);
void gstreamer_init(void); void gstreamer_init(void);

View File

@ -18,19 +18,21 @@ type RemoteManager struct {
video *gst.Pipeline video *gst.Pipeline
audio *gst.Pipeline audio *gst.Pipeline
config *config.Remote config *config.Remote
broadcast types.BroadcastManager
cleanup *time.Ticker cleanup *time.Ticker
shutdown chan bool shutdown chan bool
emmiter events.EventEmmiter emmiter events.EventEmmiter
streaming bool streaming bool
} }
func New(config *config.Remote) *RemoteManager { func New(config *config.Remote, broadcast types.BroadcastManager) *RemoteManager {
return &RemoteManager{ return &RemoteManager{
logger: log.With().Str("module", "remote").Logger(), logger: log.With().Str("module", "remote").Logger(),
cleanup: time.NewTicker(1 * time.Second), cleanup: time.NewTicker(1 * time.Second),
shutdown: make(chan bool), shutdown: make(chan bool),
emmiter: events.New(), emmiter: events.New(),
config: config, config: config,
broadcast: broadcast,
streaming: false, streaming: false,
} }
} }
@ -44,7 +46,16 @@ func (manager *RemoteManager) AudioCodec() string {
} }
func (manager *RemoteManager) Start() { func (manager *RemoteManager) Start() {
xorg.Display(manager.config.Display)
if !xorg.ValidScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) {
manager.logger.Warn().Msgf("invalid screen option %dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)
} else if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil {
manager.logger.Warn().Err(err).Msg("unable to change screen size")
}
manager.createPipelines() manager.createPipelines()
manager.broadcast.Start()
go func() { go func() {
defer func() { defer func() {
@ -70,6 +81,8 @@ func (manager *RemoteManager) Shutdown() error {
manager.logger.Info().Msgf("remote shutting down") manager.logger.Info().Msgf("remote shutting down")
manager.video.Stop() manager.video.Stop()
manager.audio.Stop() manager.audio.Stop()
manager.broadcast.Stop()
manager.cleanup.Stop() manager.cleanup.Stop()
manager.shutdown <- true manager.shutdown <- true
return nil return nil
@ -88,6 +101,8 @@ func (manager *RemoteManager) OnAudioFrame(listener func(sample types.Sample)) {
} }
func (manager *RemoteManager) StartStream() { func (manager *RemoteManager) StartStream() {
manager.createPipelines()
manager.logger.Info(). manager.logger.Info().
Str("video_display", manager.config.Display). Str("video_display", manager.config.Display).
Str("video_codec", manager.config.VideoCodec). Str("video_codec", manager.config.VideoCodec).
@ -98,15 +113,6 @@ func (manager *RemoteManager) StartStream() {
Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)). Str("screen_resolution", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)).
Msgf("Pipelines starting...") Msgf("Pipelines starting...")
xorg.Display(manager.config.Display)
if !xorg.ValidScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate) {
manager.logger.Warn().Msgf("invalid screen option %dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)
} else if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil {
manager.logger.Warn().Err(err).Msg("unable to change screen size")
}
manager.createPipelines()
manager.video.Start() manager.video.Start()
manager.audio.Start() manager.audio.Start()
manager.streaming = true manager.streaming = true
@ -140,7 +146,7 @@ func (manager *RemoteManager) createPipelines() {
manager.config.AudioParams, manager.config.AudioParams,
) )
if err != nil { if err != nil {
manager.logger.Panic().Err(err).Msg("unable to screate audio pipeline") manager.logger.Panic().Err(err).Msg("unable to create audio pipeline")
} }
} }
@ -150,8 +156,12 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int)
} }
manager.video.Stop() manager.video.Stop()
manager.broadcast.Stop()
defer func() { defer func() {
manager.video.Start() manager.video.Start()
manager.broadcast.Start()
manager.logger.Info().Msg("starting video pipeline...") manager.logger.Info().Msg("starting video pipeline...")
}() }()
@ -159,17 +169,16 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int)
return err return err
} }
video, err := gst.CreateAppPipeline( var err error
manager.video, err = gst.CreateAppPipeline(
manager.config.VideoCodec, manager.config.VideoCodec,
manager.config.Display, manager.config.Display,
manager.config.VideoParams, manager.config.VideoParams,
) )
if err != nil { if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create new video pipeline") manager.logger.Panic().Err(err).Msg("unable to create new video pipeline")
} }
manager.video = video
return nil return nil
} }

View File

@ -0,0 +1,10 @@
package types
type BroadcastManager interface {
Start()
Stop()
IsActive() bool
Create(url string)
Destroy()
GetUrl() string
}

View File

@ -1,48 +0,0 @@
package config
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
type Broadcast struct {
Enabled bool
Display string
Device string
AudioParams string
VideoParams string
RTMP string
}
func (Broadcast) Init(cmd *cobra.Command) error {
cmd.PersistentFlags().Bool("broadcast", false, "use PCMA audio codec")
if err := viper.BindPFlag("broadcast", cmd.PersistentFlags().Lookup("broadcast")); err != nil {
return err
}
cmd.PersistentFlags().String("rtmp", "", "RMTP url for broadcasting")
if err := viper.BindPFlag("rtmp", cmd.PersistentFlags().Lookup("rtmp")); err != nil {
return err
}
cmd.PersistentFlags().String("cast_audio", "", "audio codec parameters to use for broadcasting")
if err := viper.BindPFlag("cast_audio", cmd.PersistentFlags().Lookup("cast_audio")); err != nil {
return err
}
cmd.PersistentFlags().String("cast_video", "", "video codec parameters to use for broadcasting")
if err := viper.BindPFlag("cast_video", cmd.PersistentFlags().Lookup("cast_video")); err != nil {
return err
}
return nil
}
func (s *Broadcast) Set() {
s.Enabled = viper.GetBool("broadcast")
s.Display = viper.GetString("display")
s.Device = viper.GetString("device")
s.AudioParams = viper.GetString("cast_audio")
s.VideoParams = viper.GetString("cast_video")
s.RTMP = viper.GetString("rtmp")
}

View File

@ -36,6 +36,12 @@ const (
SCREEN_SET = "screen/set" SCREEN_SET = "screen/set"
) )
const (
BORADCAST_STATUS = "broadcast/status"
BORADCAST_CREATE = "broadcast/create"
BORADCAST_DESTROY = "broadcast/destroy"
)
const ( const (
ADMIN_BAN = "admin/ban" ADMIN_BAN = "admin/ban"
ADMIN_KICK = "admin/kick" ADMIN_KICK = "admin/kick"

View File

@ -110,3 +110,14 @@ type ScreenConfigurations struct {
Event string `json:"event"` Event string `json:"event"`
Configurations map[int]types.ScreenConfiguration `json:"configurations"` Configurations map[int]types.ScreenConfiguration `json:"configurations"`
} }
type BroadcastStatus struct {
Event string `json:"event"`
URL string `json:"url"`
IsActive bool `json:"isActive"`
}
type BroadcastCreate struct {
Event string `json:"event"`
URL string `json:"url"`
}

View File

@ -0,0 +1,56 @@
package websocket
import (
"n.eko.moe/neko/internal/types"
"n.eko.moe/neko/internal/types/event"
"n.eko.moe/neko/internal/types/message"
)
func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Create(payload.URL)
if err := h.boradcastStatus(session); err != nil {
return err
}
return nil
}
func (h *MessageHandler) boradcastDestroy(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Destroy()
if err := h.boradcastStatus(session); err != nil {
return err
}
return nil
}
func (h *MessageHandler) boradcastStatus(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
if err := session.Send(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.broadcast.IsActive(),
URL: h.broadcast.GetUrl(),
}); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS)
return err
}
return nil
}

View File

@ -13,12 +13,13 @@ import (
) )
type MessageHandler struct { type MessageHandler struct {
logger zerolog.Logger logger zerolog.Logger
sessions types.SessionManager sessions types.SessionManager
webrtc types.WebRTCManager webrtc types.WebRTCManager
remote types.RemoteManager remote types.RemoteManager
banned map[string]bool broadcast types.BroadcastManager
locked bool banned map[string]bool
locked bool
} }
func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) { func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) {
@ -123,6 +124,16 @@ func (h *MessageHandler) Message(id string, raw []byte) error {
return h.screenSet(id, session, payload) return h.screenSet(id, session, payload)
}), "%s failed", header.Event) }), "%s failed", header.Event)
// Boradcast Events
case event.BORADCAST_CREATE:
payload := &message.BroadcastCreate{}
return errors.Wrapf(
utils.Unmarshal(payload, raw, func() error {
return h.boradcastCreate(session, payload)
}), "%s failed", header.Event)
case event.BORADCAST_DESTROY:
return errors.Wrapf(h.boradcastDestroy(session), "%s failed", header.Event)
// Admin Events // Admin Events
case event.ADMIN_LOCK: case event.ADMIN_LOCK:
return errors.Wrapf(h.adminLock(id, session), "%s failed", header.Event) return errors.Wrapf(h.adminLock(id, session), "%s failed", header.Event)

View File

@ -17,6 +17,11 @@ func (h *MessageHandler) SessionCreated(id string, session types.Session) error
if err := h.screenConfigurations(id, session); err != nil { if err := h.screenConfigurations(id, session); err != nil {
return err return err
} }
// send broadcast status if admin
if err := h.boradcastStatus(session); err != nil {
return err
}
} }
return nil return nil

View File

@ -16,26 +16,27 @@ import (
"n.eko.moe/neko/internal/utils" "n.eko.moe/neko/internal/utils"
) )
func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { func New(sessions types.SessionManager, remote types.RemoteManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler {
logger := log.With().Str("module", "websocket").Logger() logger := log.With().Str("module", "websocket").Logger()
return &WebSocketHandler{ return &WebSocketHandler{
logger: logger, logger: logger,
conf: conf, conf: conf,
sessions: sessions, sessions: sessions,
remote: remote, remote: remote,
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { CheckOrigin: func(r *http.Request) bool {
return true return true
}, },
}, },
handler: &MessageHandler{ handler: &MessageHandler{
logger: logger.With().Str("subsystem", "handler").Logger(), logger: logger.With().Str("subsystem", "handler").Logger(),
remote: remote, remote: remote,
sessions: sessions, broadcast: broadcast,
webrtc: webrtc, sessions: sessions,
banned: make(map[string]bool), webrtc: webrtc,
locked: false, banned: make(map[string]bool),
locked: false,
}, },
} }
} }
@ -44,13 +45,13 @@ func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types
const pingPeriod = 60 * time.Second const pingPeriod = 60 * time.Second
type WebSocketHandler struct { type WebSocketHandler struct {
logger zerolog.Logger logger zerolog.Logger
upgrader websocket.Upgrader upgrader websocket.Upgrader
sessions types.SessionManager sessions types.SessionManager
remote types.RemoteManager remote types.RemoteManager
conf *config.WebSocket conf *config.WebSocket
handler *MessageHandler handler *MessageHandler
shutdown chan bool shutdown chan bool
} }
func (ws *WebSocketHandler) Start() error { func (ws *WebSocketHandler) Start() error {

View File

@ -6,6 +6,7 @@ import (
"os/signal" "os/signal"
"runtime" "runtime"
"n.eko.moe/neko/internal/broadcast"
"n.eko.moe/neko/internal/http" "n.eko.moe/neko/internal/http"
"n.eko.moe/neko/internal/remote" "n.eko.moe/neko/internal/remote"
"n.eko.moe/neko/internal/session" "n.eko.moe/neko/internal/session"
@ -107,6 +108,7 @@ type Neko struct {
server *http.Server server *http.Server
sessionManager *session.SessionManager sessionManager *session.SessionManager
remoteManager *remote.RemoteManager remoteManager *remote.RemoteManager
broadcastManager *broadcast.BroadcastManager
webRTCManager *webrtc.WebRTCManager webRTCManager *webrtc.WebRTCManager
webSocketHandler *websocket.WebSocketHandler webSocketHandler *websocket.WebSocketHandler
} }
@ -116,8 +118,9 @@ func (neko *Neko) Preflight() {
} }
func (neko *Neko) Start() { func (neko *Neko) Start() {
broadcastManager := broadcast.New(neko.Remote)
remoteManager := remote.New(neko.Remote) remoteManager := remote.New(neko.Remote, broadcastManager)
remoteManager.Start() remoteManager.Start()
sessionManager := session.New(remoteManager) sessionManager := session.New(remoteManager)
@ -125,7 +128,7 @@ func (neko *Neko) Start() {
webRTCManager := webrtc.New(sessionManager, remoteManager, neko.WebRTC) webRTCManager := webrtc.New(sessionManager, remoteManager, neko.WebRTC)
webRTCManager.Start() webRTCManager.Start()
webSocketHandler := websocket.New(sessionManager, remoteManager, webRTCManager, neko.WebSocket) webSocketHandler := websocket.New(sessionManager, remoteManager, broadcastManager, webRTCManager, neko.WebSocket)
webSocketHandler.Start() webSocketHandler.Start()
server := http.New(neko.Server, webSocketHandler) server := http.New(neko.Server, webSocketHandler)