Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import json | |
| import traceback | |
| class PluginManager(object): | |
| def __init__(self, APP_VERSION, PROD, CPU_ONLY, logger): | |
| super(PluginManager, self).__init__() | |
| self.APP_VERSION = APP_VERSION | |
| self.CPU_ONLY = CPU_ONLY | |
| self.PROD = PROD | |
| self.path = "./resources/app" if PROD else "." | |
| self.modules_path = "resources.app." if PROD else "" | |
| self.logger = logger | |
| self.setupModules = set() | |
| self.enabledPlugins = set() | |
| self.teardownModules = {} | |
| self.refresh_active_plugins() | |
| def reset_plugins (self): | |
| self.plugins = { | |
| "custom-event": [], | |
| "start": { | |
| "pre": [], | |
| "mid": [], | |
| "post": [] | |
| }, | |
| "load-model": { | |
| "pre": [], | |
| "mid": [], | |
| "post": [] | |
| }, | |
| "synth-line": { | |
| "pre": [], | |
| "mid": [], | |
| "pre_energy": [], | |
| "post": [] | |
| }, | |
| "batch-synth-line": { | |
| "pre": [], | |
| "mid": [], | |
| "post": [] | |
| }, | |
| "mp-output-audio": { | |
| "pre": [], | |
| "post": [] | |
| }, | |
| "arpabet-replace": { | |
| "pre": [], | |
| "post": [] | |
| }, | |
| "output-audio": { | |
| "pre": [], | |
| "mid": [], | |
| "post": [] | |
| }, | |
| } | |
| self.plugins_context_cache = {} | |
| for key in self.plugins.keys(): | |
| if key=="custom-event": | |
| pass | |
| else: | |
| self.plugins_context_cache[key] = {} | |
| for sub_key in self.plugins[key].keys(): | |
| self.plugins_context_cache[key][sub_key] = {} | |
| def set_context_cache (self, event, hook, plugin_id, data): | |
| self.plugins_context_cache[event][hook][plugin_id] = data | |
| def get_active_plugins_count (self): | |
| active_plugins = [] | |
| for _ in self.plugins["custom-event"]: | |
| active_plugins.append(["custom-event", None]) | |
| for key in self.plugins.keys(): | |
| if key=="custom-event": | |
| continue | |
| plugin_triggers = list(self.plugins[key].keys()) | |
| for trigger_type in plugin_triggers: | |
| for plugin in self.plugins[key][trigger_type]: | |
| active_plugins.append([key, trigger_type]) | |
| return len(active_plugins) | |
| # For ease of access | |
| def set_context (self, data): | |
| self.context = data | |
| def refresh_active_plugins (self): | |
| with open("plugins.txt") as f: | |
| lines = f.read().split("\n") | |
| removed_plugins = [] | |
| for line in lines: | |
| if not line.startswith("*") and line in self.enabledPlugins: | |
| removed_plugins.append(line) | |
| for plugin_id in removed_plugins: | |
| if plugin_id in self.teardownModules: | |
| for func in self.teardownModules[plugin_id]: | |
| params = {"logger": self.logger, "appVersion": self.APP_VERSION, "isCPUonly": self.CPU_ONLY, "isDev": not self.PROD} | |
| func(params) | |
| self.reset_plugins() | |
| status = [] | |
| for line in lines: | |
| if line.startswith("*"): | |
| plugin_id = line[1:] | |
| self.logger.info(f'plugin_id: {plugin_id}') | |
| try: | |
| with open(f'{self.path}/plugins/{plugin_id}/plugin.json') as f: | |
| plugin_json = f.read() | |
| plugin_json = json.loads(plugin_json) | |
| minVersionOk = checkVersionRequirements(plugin_json["min-app-version"] if "min-app-version" in plugin_json else None, self.APP_VERSION) | |
| maxVersionOk = checkVersionRequirements(plugin_json["max-app-version"] if "max-app-version" in plugin_json else None, self.APP_VERSION, True) | |
| if not minVersionOk or not maxVersionOk: | |
| self.logger.info(f'minVersionOk {minVersionOk}') | |
| self.logger.info(f'maxVersionOk {maxVersionOk}') | |
| continue | |
| for key in self.plugins.keys(): | |
| if key=="custom-event": | |
| self.load_module_function(plugin_json, plugin_id, ["back-end-hooks", "custom-event"], []) | |
| else: | |
| plugin_triggers = list(self.plugins[key].keys()) | |
| for trigger_type in plugin_triggers: | |
| self.load_module_function(plugin_json, plugin_id, ["back-end-hooks", key, trigger_type], []) | |
| self.enabledPlugins.add(plugin_id) | |
| status.append("OK") | |
| except: | |
| self.logger.info(traceback.format_exc()) | |
| status.append(plugin_id) | |
| return status | |
| def load_module_function (self, plugin_json, plugin_name, structure, structure2): | |
| if structure[0] in plugin_json and plugin_json[structure[0]] is not None: | |
| key = structure[0] | |
| structure2.append(key) | |
| plugin_json = plugin_json[key] | |
| del structure[0] | |
| if len(structure): | |
| return self.load_module_function(plugin_json, plugin_name, structure, structure2) | |
| else: | |
| file_name = plugin_json["file"] | |
| function = plugin_json["function"] | |
| if not file_name: | |
| return | |
| if file_name.endswith(".py"): | |
| setup = {"logger": self.logger, "appVersion": self.APP_VERSION, "isCPUonly": self.CPU_ONLY, "isDev": not self.PROD} | |
| with open(f'{self.path}/plugins/{plugin_name}/{file_name}') as f: | |
| locals_data = locals() | |
| locals_data["setupData"] = setup | |
| locals_data["plugins"] = self.plugins | |
| exec(f.read(), None, locals_data) | |
| import types | |
| for key in list(locals_data.keys()): | |
| if isinstance(locals_data[key], types.FunctionType): | |
| if key==function: | |
| if structure2[-1]=="custom-event": | |
| self.plugins[structure2[-1]].append([plugin_name, file_name, locals_data[key]]) | |
| else: | |
| self.plugins[structure2[-2]][structure2[-1]].append([plugin_name, file_name, locals_data[key]]) | |
| elif key=="setup": | |
| if f'{plugin_name}/{file_name}' not in self.setupModules: | |
| self.setupModules.add(f'{plugin_name}/{file_name}') | |
| locals_data[key](setup) | |
| elif key=="teardown": | |
| if plugin_name not in self.teardownModules.keys(): | |
| self.teardownModules[plugin_name] = [] | |
| self.teardownModules[plugin_name].append(locals_data[key]) | |
| else: | |
| self.logger.info(f'[Plugin: {plugin_name}]: Cannot import {file_name} file for {structure2[-1]} {structure2[-2]} entry-point: Only python files are supported right now.') | |
| def run_plugins (self, plist, event="", data=None): | |
| response = None | |
| if event=="custom-event" and "pluginId" not in data: | |
| self.logger.info(f'Custom event called, but "pluginId" not specified. Not running.') | |
| return None | |
| if len(plist): | |
| self.logger.info("Running plugins for event:" + event) | |
| for [plugin_name, file_name, function] in plist: | |
| if event=="custom-event" and plugin_name!=data["pluginId"]: | |
| continue | |
| hook, eventName = event.split(" ") | |
| if plugin_name in self.plugins_context_cache[eventName][hook].keys(): | |
| data["context_cache"] = self.plugins_context_cache[eventName][hook][plugin_name] | |
| # else: | |
| # data["context_cache"] = None | |
| try: | |
| self.logger.info(plugin_name) | |
| self.logger.set_logger_prefix(plugin_name) | |
| function(data) | |
| if "context_cache" in data.keys(): | |
| self.plugins_context_cache[eventName][hook][plugin_name] = data["context_cache"] | |
| self.logger.set_logger_prefix("") | |
| except: | |
| self.logger.info(f'[Plugin run error at event "{event}": {plugin_name}]') | |
| self.logger.info(traceback.format_exc()) | |
| return response | |
| def checkVersionRequirements (requirements, appVersion, checkMax=False): | |
| if not requirements: | |
| return True | |
| appVersionRequirement = [int(val) for val in str(requirements).split(".")] | |
| appVersionInts = [int(val) for val in str(appVersion).split(".")] | |
| appVersionOk = True | |
| if checkMax: | |
| if appVersionRequirement[0] >= appVersionInts[0]: | |
| if len(appVersionRequirement)>1 and int(appVersionRequirement[0])==appVersionInts[0]: | |
| if appVersionRequirement[1] >= appVersionInts[1]: | |
| if len(appVersionRequirement)>2 and int(appVersionRequirement[1])==appVersionInts[1]: | |
| if appVersionRequirement[2] >= appVersionInts[2]: | |
| pass | |
| else: | |
| appVersion = False | |
| else: | |
| appVersionOk = False | |
| else: | |
| appVersionOk = False | |
| else: | |
| if appVersionRequirement[0] <= appVersionInts[0]: | |
| if len(appVersionRequirement)>1 and int(appVersionRequirement[0])==appVersionInts[0]: | |
| if appVersionRequirement[1] <= appVersionInts[1]: | |
| if len(appVersionRequirement)>2 and int(appVersionRequirement[1])==appVersionInts[1]: | |
| if appVersionRequirement[2] <= appVersionInts[2]: | |
| pass | |
| else: | |
| appVersion = False | |
| else: | |
| appVersionOk = False | |
| else: | |
| appVersionOk = False | |
| return appVersionOk | |