OBSPlugin/main.py

201 lines
6.5 KiB
Python

from src.backend.PluginManager.ActionBase import ActionBase
from src.backend.PluginManager.PluginBase import PluginBase
# Import gtk modules
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gtk, Adw, Gdk
import sys
import os
import threading
from datetime import timedelta
from loguru import logger as log
from obswebsocket import events
# Add plugin to sys.paths
sys.path.append(os.path.dirname(__file__))
from OBSController import OBSController
from OBSActionBase import OBSActionBase
class ToggleRecord(OBSActionBase):
ACTION_NAME = "Toggle Record"
CONTROLS_KEY_IMAGE = True
def __init__(self, deck_controller, page, coords):
super().__init__(deck_controller=deck_controller, page=page, coords=coords)
def on_ready(self):
# Connect to obs if not connected
if not self.PLUGIN_BASE.obs.connected:
# self.PLUGIN_BASE.obs.connect_to(host="localhost", port=4444, timeout=3, legacy=False)
self.reconnect_obs()
# Show current rec status
self.show_current_rec_status()
# Register signal
self.PLUGIN_BASE.obs.register(self.on_state_change, events.RecordStateChanged)
def on_state_change(self, message):
state_string = message.datain["outputState"]
state = 0
if state_string == "OBS_WEBSOCKET_OUTPUT_STOPPED":
state = 0
elif state_string in ["OBS_WEBSOCKET_OUTPUT_STARTING", "OBS_WEBSOCKET_OUTPUT_STARTED", "OBS_WEBSOCKET_OUTPUT_RESUMED"]:
state = 1
elif state_string == "OBS_WEBSOCKET_OUTPUT_PAUSED":
state = 2
self.show_for_state(state)
def show_current_rec_status(self, new_paused = False):
if not self.PLUGIN_BASE.obs.connected:
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", "error.png"))
return
status = self.PLUGIN_BASE.obs.get_record_status()
if status is None:
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", "error.png"))
return
active = status.datain["outputActive"]
paused = status.datain["outputPaused"]
if paused:
self.show_for_state(2)
elif active:
self.show_for_state(1)
else:
self.show_for_state(0)
def show_for_state(self, state: int):
"""
0: Not Recording
1: Recording
2: Paused
3: Stopping in progress
"""
image = "record_inactive.png"
if state == 0:
self.set_bottom_label(None)
image = "record_inactive.png"
elif state == 1:
self.show_rec_time()
image = "record_active.png"
print("active")
elif state == 2:
self.show_rec_time()
image = "record_resume.png"
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", image))
def on_key_down(self):
if not self.PLUGIN_BASE.obs.connected:
return
self.PLUGIN_BASE.obs.toggle_record()
def on_tick(self):
self.show_current_rec_status()
def show_rec_time(self):
if not self.PLUGIN_BASE.obs.connected:
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", "error.png"))
return
status = self.PLUGIN_BASE.obs.get_record_status()
if status is None:
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", "error.png"))
return
active = status.datain["outputActive"]
if not active:
self.set_bottom_label(None)
return
self.set_bottom_label(status.datain["outputTimecode"][:-4], font_size=16)
class RecPlayPause(OBSActionBase):
ACTION_NAME = "Recording Play/Pause"
CONTROLS_KEY_IMAGE = True
def __init__(self, deck_controller, page, coords):
super().__init__(deck_controller=deck_controller, page=page, coords=coords)
def on_ready(self):
# Connect to obs if not connected
if not self.PLUGIN_BASE.obs.connected:
# self.PLUGIN_BASE.obs.connect_to(host="localhost", port=4444, timeout=3, legacy=False)
self.reconnect_obs()
# Show current rec status
self.show_current_rec_status()
# Register signal
self.PLUGIN_BASE.obs.register(self.on_state_change, events.RecordStateChanged)
def on_state_change(self, message):
state_string = message.datain["outputState"]
state = 0
if state_string in ["OBS_WEBSOCKET_OUTPUT_RESUMED", "OBS_WEBSOCKET_OUTPUT_STARTED"]:
state = 1
if state_string == "OBS_WEBSOCKET_OUTPUT_PAUSED":
state = 2
print(state_string)
self.show_for_state(state)
def show_current_rec_status(self, new_paused = False):
if not self.PLUGIN_BASE.obs.connected:
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", "error.png"))
return
status = self.PLUGIN_BASE.obs.get_record_status()
if status is None:
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", "error.png"))
return
active = status.datain["outputActive"]
paused = status.datain["outputPaused"]
if active and not paused:
self.show_for_state(1)
elif paused:
self.show_for_state(2)
else:
self.show_for_state(0)
def show_for_state(self, state: int):
"""
0: Not Recording
1: Recording
2: Paused
3: Stopping in progress
"""
image = "record_inactive.png"
if state == 1:
self.set_bottom_label("Pause", font_size=16)
image = "record_pause.png"
if state == 2:
self.set_bottom_label("Resume", font_size=16)
image = "record_resume.png"
else:
self.set_bottom_label(None)
self.set_key(media_path=os.path.join(self.PLUGIN_BASE.PATH, "assets", image))
def on_key_down(self):
if not self.PLUGIN_BASE.obs.connected:
return
self.PLUGIN_BASE.obs.toggle_record_pause()
def on_tick(self):
self.show_current_rec_status()
class OBS(PluginBase):
def __init__(self):
self.PLUGIN_NAME = "OBS"
self.GITHUB_REPO = "https://github.com/your-github-repo"
super().__init__()
self.obs = OBSController()
self.add_action(ToggleRecord)
self.add_action(RecPlayPause)
# Load custom css
self.add_css_stylesheet(os.path.join(self.PATH, "style.css"))