OBSPlugin/actions/InputDial/InputDial.py

221 lines
7.8 KiB
Python

from plugins.com_core447_OBSPlugin.OBSActionBase import OBSActionBase
from src.backend.DeckManagement.DeckController import DeckController
from src.backend.DeckManagement.InputIdentifier import Input, InputEvent
from src.backend.PageManagement.Page import Page
from src.backend.PluginManager.PluginBase import PluginBase
from GtkHelper.GtkHelper import ComboRow
import os
import threading
import math
# Import gtk modules
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gtk, Adw
class InputDial(OBSActionBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.muted = None
self.volume = None
self.last_muted = None
self.last_volume = None
def on_ready(self):
# Connect ot obs if not connected
if self.plugin_base.backend is not None:
if not self.plugin_base.get_connected():
self.reconnect_obs()
# Show current input volume
self.muted = None
self.volume = None
threading.Thread(target=self.show_current_input_volume, daemon=True, name="show_current_input_volume").start()
def show_current_input_volume(self):
if self.plugin_base.backend is None:
self.current_state = None
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
if not self.plugin_base.backend.get_connected():
self.current_state = None
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
if not self.get_settings().get("input"):
self.current_state = None
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
# update muted
status = self.plugin_base.backend.get_input_muted(self.get_settings().get("input"))
if status is None:
self.current_state = -1
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
self.muted = status["muted"]
# update volume
status = self.plugin_base.backend.get_input_volume(self.get_settings().get("input"))
if status is None:
self.current_state = -1
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
self.volume = self.db_to_volume(status["volume"])
# Now render the button
image = "input_muted.png" if self.muted else "input_unmuted.png"
label = f"{self.volume}%"
if self.last_muted != self.muted:
self.last_muted = self.muted
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", image), size=0.9)
if self.last_volume != self.volume:
self.last_volume = self.volume
self.set_label(label)
def get_config_rows(self) -> list:
super_rows = super().get_config_rows()
self.input_model = Gtk.StringList()
self.input_row = Adw.ComboRow(model=self.input_model, title=self.plugin_base.lm.get("actions.input-dial-row.label"))
self.connect_signals()
self.load_input_model()
self.load_configs()
super_rows.append(self.input_row)
return super_rows
def connect_signals(self):
self.input_row.connect("notify::selected", self.on_input_change)
def disconnect_signals(self):
try:
self.input_row.disconnect_by_func(self.on_input_change)
except TypeError as e:
pass
def load_input_model(self):
self.disconnect_signals()
# Clear model
while self.input_model.get_n_items() > 0:
self.input_model.remove(0)
# Load model
if self.plugin_base.backend.get_connected():
inputs = self.plugin_base.backend.get_inputs()
if inputs is None:
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
for input in inputs:
self.input_model.append(input)
self.connect_signals()
def load_configs(self):
self.load_selected_device()
def load_selected_device(self):
self.disconnect_signals()
settings = self.get_settings()
for i, input_name in enumerate(self.input_model):
if input_name.get_string() == settings.get("input"):
self.input_row.set_selected(i)
self.connect_signals()
return
self.input_row.set_selected(Gtk.INVALID_LIST_POSITION)
self.connect_signals()
def on_input_change(self, *args):
settings = self.get_settings()
selected_index = self.input_row.get_selected()
settings["input"] = self.input_model[selected_index].get_string()
self.set_settings(settings)
def event_callback(self, event: InputEvent, data: dict = None):
if event == Input.Key.Events.DOWN or event == Input.Dial.Events.DOWN:
self.mute_toggle()
if str(event) == str(Input.Dial.Events.TURN_CW):
self.volume_change(+5)
if str(event) == str(Input.Dial.Events.TURN_CCW):
self.volume_change(-5)
def mute_toggle(self):
if self.plugin_base.backend is None:
self.current_state = None
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
if not self.plugin_base.backend.get_connected():
self.current_state = None
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
input_name = self.get_settings().get("input")
if input_name in [None, ""]:
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
self.muted = not self.muted
self.plugin_base.backend.set_input_muted(input_name, self.muted)
self.on_tick()
def volume_change(self, diff):
if self.plugin_base.backend is None:
self.current_state = None
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
if not self.plugin_base.backend.get_connected():
self.current_state = None
self.show_error()
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
input_name = self.get_settings().get("input")
if input_name in [None, ""]:
self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "error.png"))
return
self.volume += diff
if self.volume < 0:
self.volume = 0
if self.volume > 100:
self.volume = 100
self.plugin_base.backend.set_input_volume(input_name, self.volume_to_db(self.volume))
self.on_tick()
def on_tick(self):
self.show_current_input_volume()
def reconnect_obs(self):
super().reconnect_obs()
if hasattr(self, "input_model"):
self.load_input_model()
self.load_configs()
self.muted = None
self.volume = None
def volume_to_db(self, vol):
if vol == 0:
return -100
if vol > 100:
return 0
return math.log(vol/100)*10/math.log(1.5)
def db_to_volume(self, db):
if db < -100:
return 0
if db > 0:
return 100
return math.floor(1.5**(db/10) * 100)