Source code for timApp.plugin.containerLink

import json
import re
from dataclasses import dataclass
from functools import lru_cache
from re import Pattern
from typing import Optional, Any, Union

import requests
from flask import current_app
from requests import Response

from timApp.document.docsettings import DocSettings
from timApp.document.timjsonencoder import TimJsonEncoder
from timApp.markdown.dumboclient import call_dumbo, DumboOptions
from timApp.plugin.plugin import Plugin, AUTOMD
from timApp.plugin.pluginOutputFormat import PluginOutputFormat
from timApp.plugin.pluginexception import PluginException
from timApp.plugin.timtable import timTable
from timApp.util.logger import log_warning

CSPLUGIN_DOMAIN = "csplugin"
DRAGPLUGIN_DOMAIN = "drag"
FEEDBACKPLUGIN_DOMAIN = "feedback"
FIELDPLUGIN_DOMAIN = "fields"
HASKELLPLUGIN_DOMAIN = "haskellplugins"
IMAGEXPLUGIN_DOMAIN = "imagex"
JSRUNNERPLUGIN_DOMAIN = "jsrunner"
PALIPLUGIN_DOMAIN = "pali"
SVNPLUGIN_DOMAIN = "showfile"

MARKUP = "markup"

QSTMDATTRS = [
    "header",
    "expl",
    "stem",
    "rows",
    "reason",
    "choices",
    "[0-9]",
    ".*[Tt]ext",
]

FBMDATTRS = ["nextTask", "questionItems", "choices", "levels"]
DRAGATTRS = ["words"]
TEXTFIELDATTRS = ["header", "stem", "inputstem"]
GOALTABLEATTRS = ["goals", "stem", "header"]


[docs]@dataclass class PluginReg: name: str domain: str port: int = 5000 path: str = "/" automd: bool | None = None # if true, plugin is always automd # List of regexp for yaml attribute names that are handled as markdown when automd attribute is true or # plugin has automd=True regexattrs: list[str] | None = None skip_reqs: bool = False lazy: bool = True can_give_task: bool = False instance: Any | None = None # TODO get rid of this; only used by timtable @property def host(self) -> str: # TODO rename to url because contains path return f"http://{self.domain}:{self.port}{self.path}"
[docs]@lru_cache def get_plugins() -> dict[str, PluginReg]: qst_port = current_app.config["QST_PLUGIN_PORT"] internal_domain = current_app.config["INTERNAL_PLUGIN_DOMAIN"] plugin_list = [ PluginReg(name="csPlugin", domain=CSPLUGIN_DOMAIN, path="/cs/"), PluginReg(name="taunoPlugin", domain=CSPLUGIN_DOMAIN, path="/cs/tauno/"), PluginReg(name="simcirPlugin", domain=CSPLUGIN_DOMAIN, path="/cs/simcir/"), PluginReg(name="graphviz", domain=CSPLUGIN_DOMAIN, path="/cs/graphviz/"), PluginReg(name="showCode", domain=SVNPLUGIN_DOMAIN, path="/svn/"), PluginReg(name="showImage", domain=SVNPLUGIN_DOMAIN, path="/svn/image/"), PluginReg(name="showImages", domain=SVNPLUGIN_DOMAIN, path="/svn/multiimages/"), PluginReg(name="showVideo", domain=SVNPLUGIN_DOMAIN, path="/svn/video/"), PluginReg(name="showPdf", domain=SVNPLUGIN_DOMAIN, path="/svn/pdf/"), PluginReg(name="mcq", domain=HASKELLPLUGIN_DOMAIN, port=5001), PluginReg(name="mmcq", domain=HASKELLPLUGIN_DOMAIN, port=5002), PluginReg( name="mcq2", domain=internal_domain, port=qst_port, path="/qst/mcq/", regexattrs=QSTMDATTRS, automd=True, ), PluginReg( name="mmcq2", domain=internal_domain, port=qst_port, path="/qst/mmcq/", regexattrs=QSTMDATTRS, automd=True, ), PluginReg(name="pali", domain=PALIPLUGIN_DOMAIN), # TODO: field is just a dummy class to get route for /field - better solution is needed PluginReg( name="field", domain=FIELDPLUGIN_DOMAIN, regexattrs=TEXTFIELDATTRS, automd=True, ), PluginReg( name="textfield", domain=FIELDPLUGIN_DOMAIN, path="/tf/", regexattrs=TEXTFIELDATTRS, automd=True, ), PluginReg( name="cbfield", domain=FIELDPLUGIN_DOMAIN, path="/cb/", regexattrs=TEXTFIELDATTRS, automd=True, ), PluginReg( name="cbcountfield", domain=internal_domain, port=qst_port, path="/cbcountfield/", regexattrs=TEXTFIELDATTRS, automd=True, ), PluginReg( name="rbfield", domain=FIELDPLUGIN_DOMAIN, path="/rb/", regexattrs=TEXTFIELDATTRS, automd=True, ), PluginReg( name="numericfield", domain=FIELDPLUGIN_DOMAIN, path="/nf/", regexattrs=TEXTFIELDATTRS, automd=True, ), PluginReg( name="goaltable", domain=FIELDPLUGIN_DOMAIN, path="/goaltable/", regexattrs=GOALTABLEATTRS, automd=True, ), PluginReg(name="multisave", domain=FIELDPLUGIN_DOMAIN, path="/ms/"), PluginReg(name="dropdown", domain=FIELDPLUGIN_DOMAIN, path="/dropdown/"), PluginReg(name="jsrunner", domain=JSRUNNERPLUGIN_DOMAIN), PluginReg(name="imagex", domain=IMAGEXPLUGIN_DOMAIN), PluginReg( name="qst", domain=internal_domain, port=qst_port, path="/qst/", regexattrs=QSTMDATTRS, automd=True, ), PluginReg( name="timMenu", domain=internal_domain, port=qst_port, path="/timMenu/" ), PluginReg( name="timTable", domain=internal_domain, port=qst_port, path="/timTable/", instance=timTable.TimTable(), lazy=False, ), PluginReg( name="tableForm", domain=internal_domain, port=qst_port, path="/tableForm/", lazy=False, ), PluginReg( name="reviewcanvas", domain=internal_domain, port=qst_port, path="/reviewcanvas/", lazy=False, ), PluginReg( name="importData", domain=internal_domain, port=qst_port, path="/importData/", ), PluginReg( name="userSelect", domain=internal_domain, port=qst_port, path="/userSelect/", ), PluginReg(name="tape", domain=internal_domain, port=qst_port, path="/tape/"), PluginReg(name="echo", domain="tim", path="/echoRequest/", skip_reqs=True), PluginReg( name="feedback", domain=FEEDBACKPLUGIN_DOMAIN, regexattrs=FBMDATTRS, automd=True, ), PluginReg( name="drag", domain=DRAGPLUGIN_DOMAIN, regexattrs=DRAGATTRS, automd=True ), PluginReg( name="calendar", domain=internal_domain, port=qst_port, path="/calendar/" ), ] plugins = {p.name: p for p in plugin_list} return plugins
[docs]@lru_cache def get_plugin_regex_obj(plugin_name: str) -> Pattern: plugin = get_plugin(plugin_name) assert plugin.regexattrs is not None regex_pattern = "((" + ")|(".join(plugin.regexattrs) + "))" regex_obj = re.compile(regex_pattern) return regex_obj
[docs]def call_plugin_generic( plugin: str, method: str, route: str, data: Any = None, headers: Any = None, params: Any = None, read_timeout: int = 30, ) -> Response: plug = get_plugin(plugin) host = plug.host if route == "multimd" and ( plugin == "mmcq" or plugin == "mcq" ): # hack to handle mcq and mmcq in tim by qst plug = get_plugin("qst") host = plug.host + plugin + "/" url = host + route try: r = do_request(method, url, data, params, headers, read_timeout) except ( requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError, ) as e: log_warning(f"Connection failed to plugin {plugin}: {e}") raise PluginException(f"Connect timeout when calling {plugin} ({url}).") except requests.exceptions.ReadTimeout as e: log_warning(f"Read timeout occurred for plugin {plugin} in route {route}: {e}") raise PluginException(f"Read timeout when calling {plugin} ({url}).") else: if r.status_code >= 500: raise PluginException(f"Got response with status code {r.status_code}") return r
[docs]def do_request( method: str, url: str, data: Any, params: Any, headers: Any, read_timeout: int, ) -> requests.Response: resp = requests.request( method, url, data=data, timeout=(current_app.config["PLUGIN_CONNECT_TIMEOUT"], read_timeout), headers=headers, params=params, ) resp.encoding = "utf-8" return resp
# Not used currently. plugin_request_fn = do_request
[docs]def render_plugin( docsettings: DocSettings, plugin: Plugin, output_format: PluginOutputFormat ) -> str: plugin_data = plugin.render_json() if docsettings.plugin_md(): convert_md( [plugin_data], options=plugin.par.get_dumbo_options( base_opts=docsettings.get_dumbo_options() ), outtype="md" if output_format == PluginOutputFormat.HTML else "latex", ) return call_plugin_generic( plugin.type, "post", output_format.value, data=json.dumps(plugin_data, cls=TimJsonEncoder), headers={"Content-type": "application/json"}, ).text
[docs]def call_mock_dumbo_s(s: str) -> str: s = s.replace("`", "") s = s.replace("#", "\\#") s = s.replace("%", "\\%") return s
[docs]def list_to_dumbo(markup_list: list[Any]) -> None: i = 0 for val in markup_list: ic = i i += 1 if type(val) is dict: dict_to_dumbo(val) continue if type(val) is markup_list: list_to_dumbo(val) continue if not type(val) is str: continue if not val.startswith("md:"): continue v = val[3:] v = call_mock_dumbo_s(v) markup_list[ic] = v
[docs]def dict_to_dumbo(pm: dict) -> None: for mkey in pm: val = pm[mkey] if type(val) is dict: dict_to_dumbo(val) continue if type(val) is list: list_to_dumbo(val) continue if not type(val) is str: continue if not val.startswith("md:"): continue v = val[3:] v = call_mock_dumbo_s(v) pm[mkey] = v
[docs]def convert_md( plugin_data: list[dict], options: DumboOptions, outtype: str = "md", plugin_opts: list[DumboOptions] | None = None, ) -> None: markups = [p for p in plugin_data] html_markups = call_dumbo( markups, f"/{outtype}keys", options=options, data_opts=plugin_opts ) for p, h in zip(plugin_data, html_markups): p.clear() p.update(h)
[docs]def prepare_for_dumbo_attr_list_recursive( regex_obj: Pattern, plugin_data: dict ) -> None: for key, value in plugin_data.items(): if isinstance(value, dict): prepare_for_dumbo_attr_list_recursive(regex_obj, value) elif isinstance(value, list): if regex_obj.search(str(key)) is not None: prepare_for_dumbo_attr_list_list_recursive(regex_obj, value) elif isinstance(value, str): if regex_obj.search(str(key)) is not None: if not plugin_data[key].startswith("md:"): plugin_data[key] = "md:" + value
[docs]def prepare_for_dumbo_attr_list_list_recursive(regex_obj: Pattern, data: list) -> None: for i in range(0, len(data)): item = data[i] if isinstance(item, dict): prepare_for_dumbo_attr_list_recursive(regex_obj, item) elif isinstance(item, list): prepare_for_dumbo_attr_list_list_recursive(regex_obj, data) elif isinstance(item, str): if not item.startswith("md:"): data[i] = "md:" + item
[docs]def convert_tex_mock(plugin_data: dict | list) -> None: if isinstance(plugin_data, dict): dict_to_dumbo(plugin_data) return for p in plugin_data: pm = p["markup"] dict_to_dumbo(pm)
[docs]def render_plugin_multi( docsettings: DocSettings, plugin: str, plugin_data: list[Plugin], plugin_output_format: PluginOutputFormat = PluginOutputFormat.HTML, default_auto_md: bool = False, ) -> str: opts = docsettings.get_dumbo_options() plugin_dumbo_opts = [p.par.get_dumbo_options(base_opts=opts) for p in plugin_data] plugin_dicts = [p.render_json() for p in plugin_data] plugin_reg = get_plugin(plugin) plugin_automd = ( plugin_reg.automd if plugin_reg.automd is not None else default_auto_md ) regexattrs = plugin_reg.regexattrs for plug_dict in plugin_dicts: if has_auto_md(plug_dict[MARKUP], plugin_automd): if regexattrs is not None: regex_obj = get_plugin_regex_obj(plugin_reg.name) # use attribute list instead of calling the plugin prepare_for_dumbo_attr_list_recursive(regex_obj, plug_dict) elif plugin_reg.instance: try: plugin_reg.instance.prepare_for_dumbo(plug_dict) except: continue else: raise PluginException( "automd for non-inner plugins not implemented yet" ) # TODO implement if docsettings.plugin_md(): convert_md( plugin_dicts, options=opts, plugin_opts=plugin_dumbo_opts, outtype="md" if plugin_output_format == PluginOutputFormat.HTML else "latex", ) if plugin_reg.instance and plugin_output_format == PluginOutputFormat.HTML: return plugin_reg.instance.multihtml_direct_call(plugin_dicts) return call_plugin_generic( plugin, "post", ("multimd" if plugin_output_format == PluginOutputFormat.MD else "multihtml"), data=json.dumps(plugin_dicts, cls=TimJsonEncoder), headers={"Content-type": "application/json"}, ).text
[docs]def has_auto_md(data: dict, default: bool) -> bool: return data.get(AUTOMD, default)
[docs]def call_plugin_resource( plugin: str, filename: str, args: Any = None ) -> requests.Response: try: plug = get_plugin(plugin) # We need to avoid calling ourselves to avoid infinite request loop. if plug.host.startswith( f'http://{current_app.config["INTERNAL_PLUGIN_DOMAIN"]}' ): raise PluginException("Plugin route not found") resp = requests.get(plug.host + filename, timeout=5, stream=True, params=args) resp.encoding = "utf-8" return resp except requests.exceptions.Timeout: raise PluginException("Could not connect to plugin: " + plugin)
[docs]def call_plugin_answer(plugin: str, answer_data: dict) -> str: markup = answer_data.get("markup") or {} timeout = markup.get("timeout") if not isinstance(timeout, int): if isinstance(timeout, str) and timeout.isnumeric(): timeout = int(timeout) else: timeout = 25 # use timeout + 5 so that plugin will realize the timeout first return call_plugin_generic( plugin, "put", "answer", json.dumps(answer_data, cls=TimJsonEncoder), headers={"Content-type": "application/json"}, read_timeout=min(timeout + 5, 120), ).text
# Get lists of js and css files required by plugin, as well as list of Angular modules they define.
[docs]@lru_cache(maxsize=100) def plugin_reqs(plugin: str) -> str: return call_plugin_generic(plugin, "get", "reqs").text
# Gets plugin info (host)
[docs]def get_plugin(plugin: str) -> PluginReg: plugins = get_plugins() plug = plugins.get(plugin) if plug: return plug raise PluginException("Plugin does not exist.")