"""Functions for dealing with plugin paragraphs."""
import json
from collections import OrderedDict, defaultdict
from dataclasses import dataclass
from itertools import chain
from typing import Optional, Union, DefaultDict
from xml.sax.saxutils import quoteattr
import attr
import yaml
import yaml.parser
from sqlalchemy import func
from timApp.answer.answer import Answer
from timApp.answer.answers import valid_answers_query, valid_taskid_filter
from timApp.auth.accesshelper import has_edit_access, verify_view_access
from timApp.document.docentry import DocEntry
from timApp.document.docparagraph import DocParagraph
from timApp.document.docsettings import DocSettings
from timApp.document.document import Document
from timApp.document.macroinfo import MacroInfo
from timApp.document.randutils import hashfunc
from timApp.document.usercontext import UserContext
from timApp.document.viewcontext import ViewContext
from timApp.document.yamlblock import YamlBlock
from timApp.markdown.dumboclient import call_dumbo
from timApp.plugin.containerLink import plugin_reqs, get_plugin
from timApp.plugin.containerLink import render_plugin_multi, render_plugin, get_plugins
from timApp.plugin.plugin import (
Plugin,
PluginRenderOptions,
load_markup_from_yaml,
expand_macros_for_plugin,
find_inline_plugins,
InlinePlugin,
finalize_inline_yaml,
PluginWrap,
WANT_FIELDS,
find_task_ids,
get_simple_hash_from_par_and_user,
)
from timApp.plugin.pluginOutputFormat import PluginOutputFormat
from timApp.plugin.pluginexception import PluginException
from timApp.plugin.taskid import TaskId
from timApp.printing.printsettings import PrintFormat
from timApp.util.get_fields import (
get_fields_and_users,
RequestedGroups,
GetFieldsAccess,
)
from timApp.util.rndutils import SeedClass
from timApp.util.timtiming import taketime
from timApp.util.utils import get_error_tex, Range, get_error_html_block, get_error_html
from tim_common.html_sanitize import sanitize_html
[docs]def get_error_plugin(
plugin_name,
message,
response=None,
plugin_output_format: PluginOutputFormat = PluginOutputFormat.HTML,
inline=False,
):
"""
:param response:
:type message: str
:type plugin_name: str
"""
error_message = "Plugin " + (f"{plugin_name} " if plugin_name else "") + "error:"
if plugin_output_format == PluginOutputFormat.MD:
return get_error_tex(error_message, message)
return (
get_error_html_block(error_message, message, response)
if not inline
else get_error_html(f"{error_message} {message}", response)
)
PluginOrError = Union[Plugin, str] # str represent HTML markup of error
AnswerMap = dict[str, tuple[Answer, int]]
ErrorMap = dict[Range, tuple[str, str]]
[docs]@attr.s
class PluginPlacement:
"""Represents the position(s) of plugin(s) in a block.
Can be either:
* a block-level (traditional) plugin, or
* one or more inlineplugins.
In case of a block-level plugin, the range spans the entire block's expanded markdown.
"""
plugins: dict[Range, Plugin] = attr.ib(kw_only=True) # ordered
errors: ErrorMap = attr.ib(kw_only=True) # ordered
block: DocParagraph = attr.ib(kw_only=True)
"""The block where the plugins are."""
expanded_md: str = attr.ib(kw_only=True)
"""Expanded markdown of the containing block."""
is_block_plugin: bool = attr.ib(kw_only=True)
"""Whether this is a block-level plugin."""
output_format: PluginOutputFormat = attr.ib(kw_only=True)
[docs] def get_block_plugin(self):
if not self.is_block_plugin:
return None
try:
return next(iter(self.plugins.values()))
except StopIteration:
return None
[docs] def set_error(self, r: Range, err: str):
p = self.plugins.pop(r)
self.errors[r] = err, p.type
[docs] def set_output(self, r: Range, out: str):
self.plugins[r].set_output(out)
[docs] def get_block_output(
self, extract_plugins: bool = False
) -> tuple[str, dict[str, str]]:
"""TODO: this did not help very much
if self.is_block_plugin:
idx = next(iter(self.plugins))
p = self.plugins[idx]
return p.get_final_output().strip()
"""
sorted_ranges = sorted(
chain(self.plugins.keys(), self.errors.keys()),
key=lambda r: r[0],
reverse=True,
)
block_codes: dict[str, str] = {}
out_md = self.expanded_md
for sr in sorted_ranges:
p = self.plugins.get(sr)
if not p:
err, name = self.errors[sr]
h = get_error_plugin(
name,
err,
plugin_output_format=self.output_format,
inline=extract_plugins,
)
else:
h = (
p.get_final_output().strip()
) # allow inlineplugins to come close each other
if extract_plugins:
# We can't pass plugin HTML directly into Dumbo
# because Pandoc does not know how to parse custom HTML elements
# Instead we temporarily encode the plugin HTML into
# a special code that is passed through Pandoc unchanged
# We pick a special prefix that is far enough from
# anything that Pandoc could process in a special manner
h_code = f"§§plugin_html_{hashfunc(h)}"
block_codes[h_code] = h
h = h_code
start, end = sr
out_md = out_md[:start] + h + out_md[end:]
return out_md, block_codes
[docs] @staticmethod
def from_par(
block: DocParagraph,
load_states: bool,
macroinfo: MacroInfo,
plugin_opts: PluginRenderOptions,
user_ctx: UserContext,
view_ctx: ViewContext,
settings: DocSettings,
answer_map: AnswerMap,
custom_answer: Answer | None,
output_format: PluginOutputFormat,
) -> Optional["PluginPlacement"]:
plugin_name = block.get_attr("plugin")
defaultplugin = block.get_attr("defaultplugin")
if not plugin_name and not defaultplugin:
return None
new_seed = False
rnd_seed = None
answer_and_cnt = None
ask_next = False
if rnd_seed is None:
rnd_seed = get_simple_hash_from_par_and_user(
block,
user_ctx,
) # TODO: RND_SEED: get users seed for this plugin
# TODO: if possible to look from markup newtask: true, this is not needed
if block.is_new_task():
if block.answer_nr is not None and not block.ask_new:
rnd_seed = SeedClass(rnd_seed, block.answer_nr)
else: # try with length of answers
task_id = block.get_attr("taskId")
doc_id = str(block.doc.doc_id)
if task_id:
answer_and_cnt = answer_map.get(doc_id + "." + task_id, None)
if answer_and_cnt:
cnt = answer_and_cnt[1]
if cnt > 0:
rnd_seed = SeedClass(rnd_seed, cnt)
ask_next = True
new_seed = True
rnd_error = None
try:
if (
block.insert_rnds(rnd_seed) and new_seed
): # do not change order! inserts must be done
# TODO: RND_SEED save rnd_seed to user data
pass
except ValueError as e:
rnd_error = str(e)
errs = OrderedDict()
plugs = OrderedDict()
is_block_plugin = bool(plugin_name)
if rnd_error:
md = block.get_expanded_markdown(macroinfo)
errs[0, len(md)] = rnd_error, plugin_name or defaultplugin
elif plugin_name:
# We want the expanded markdown here, so can't call Plugin.from_paragraph[_macros] directly.
macros = macroinfo.get_macros()
md = expand_macros_for_plugin(block, macros, macroinfo.jinja_env)
p_range = 0, len(md)
try:
vals = load_markup_from_yaml(
md, settings.global_plugin_attrs(), block.get_attr("plugin")
)
if ask_next:
block.ask_new = True
if vals.get("initNewAnswer", None) == "":
load_states = False
if plugin_name in WANT_FIELDS and "fields" in vals and user_ctx:
data, aliases, field_names, _ = get_fields_and_users(
vals["fields"],
RequestedGroups([user_ctx.user.get_personal_group()]),
block.doc.docinfo,
user_ctx.logged_user,
view_ctx,
add_missing_fields=True,
access_option=GetFieldsAccess.from_bool(
True
), # TODO: the user selected from User list
)
df = data[0]["fields"]
da = []
labels = []
for fn in field_names:
da.append(df.get(fn, 0))
labels.append(fn)
vals["fielddata"] = {
"data": data[0]["fields"],
"aliases": aliases,
"fieldnames": field_names,
"graphdata": {"data": da, "labels": labels},
}
except PluginException as e:
errs[p_range] = str(e), plugin_name
else:
taskid = block.get_attr("taskId")
try:
tid = (
TaskId.parse(
taskid, require_doc_id=False, allow_block_hint=False
)
if taskid
else None
)
except PluginException as e:
errs[p_range] = str(e), plugin_name
else:
if check_task_access(errs, p_range, plugin_name, tid):
try:
plugs[p_range] = Plugin(
tid,
vals,
plugin_name,
par=block,
)
except PluginException as e:
errs[p_range] = str(e), plugin_name
else:
md = None
for task_id, p_yaml, p_range, md in find_inline_plugins(block, macroinfo):
plugin_type = ""
try:
task_id = task_id.validate()
plugin_type = task_id.plugin_type or defaultplugin
y = load_markup_from_yaml(
finalize_inline_yaml(p_yaml),
settings.global_plugin_attrs(),
plugin_type,
)
except PluginException as e:
errs[p_range] = str(e), plugin_type
continue
if not check_task_access(errs, p_range, plugin_type, task_id):
continue
try:
plug = InlinePlugin(
task_id=task_id,
values=y,
plugin_type=plugin_type,
p_range=p_range,
par=block,
)
except PluginException as e:
errs[p_range] = str(e), plugin_type
continue
plugs[p_range] = plug
if md is None:
# Can happen if inline plugin block has no plugins.
md = block.get_expanded_markdown(macroinfo)
for p in plugs.values():
if p.type == "qst":
p.values["isTask"] = not block.is_question()
if load_states:
if (
custom_answer is not None
and custom_answer.task_id == p.task_id.doc_task
):
answer_and_cnt = custom_answer, custom_answer.get_answer_number()
elif p.task_id:
answer_and_cnt = answer_map.get(p.task_id.doc_task, None)
p.set_render_options(
answer_and_cnt if load_states and answer_and_cnt is not None else None,
plugin_opts,
)
return PluginPlacement(
block=block,
errors=errs,
expanded_md=md,
plugins=plugs,
is_block_plugin=is_block_plugin,
output_format=output_format,
)
[docs]def check_task_access(errs: ErrorMap, p_range: Range, plugin_name: str, tid: TaskId):
if tid and tid.doc_id:
b = DocEntry.find_by_id(tid.doc_id)
if b:
has_access = verify_view_access(b, require=False)
if not has_access:
errs[p_range] = (
"Task id refers to another document, "
"but you do not have access to that document."
), plugin_name
return False
else:
errs[p_range] = "Task id refers to a non-existent document.", plugin_name
return False
return True
KeyType = tuple[int, Range]
[docs]def get_answers(user, task_ids, answer_map):
col = func.max(Answer.id).label("col")
cnt = func.count(Answer.id).label("cnt")
if user is None:
sub = (
valid_answers_query(task_ids)
.add_columns(col, cnt)
.with_entities(col, cnt)
.group_by(Answer.task_id)
.subquery()
)
else:
sub = (
user.answers.filter(valid_taskid_filter(task_ids))
.add_columns(col, cnt)
.with_entities(col, cnt)
.group_by(Answer.task_id)
.subquery()
)
answers: list[tuple[Answer, int]] = (
Answer.query.join(sub, Answer.id == sub.c.col)
.with_entities(Answer, sub.c.cnt)
.all()
)
for answer, cnt in answers:
answer_map[answer.task_id] = answer, cnt
return cnt, answers
[docs]@dataclass
class PluginifyResult:
pars: list[DocParagraph]
js_paths: list[str]
css_paths: list[str]
custom_answer_plugin: Plugin | None
all_plugins: list[Plugin]
has_errors: bool
[docs]def pluginify(
doc: Document,
pars: list[DocParagraph],
user_ctx: UserContext,
view_ctx: ViewContext,
custom_answer: Answer | None = None,
task_id: TaskId | None = None,
sanitize=True,
do_lazy=False,
load_states=True,
review=False,
pluginwrap=PluginWrap.Full,
output_format: PluginOutputFormat = PluginOutputFormat.HTML,
user_print: bool = False,
target_format: PrintFormat = PrintFormat.LATEX,
) -> PluginifyResult:
"""
"Pluginifies" the specified DocParagraphs by calling the corresponding plugin route for each plugin
paragraph.
:param view_ctx: The view context.
:param doc: Document / DocumentVersion object.
:param pars: A list of DocParagraphs to be processed.
:param user_ctx: The user context.
:param custom_answer: Optional answer that will used as the state for the plugin instead of answer database.
:param task_id: Optional taskId for plugin which will load it's current state (returned as custom_answer_plugin)
If custom_answer or task_id is specified, the expression len(blocks) MUST be 1.
:param sanitize: Whether the blocks should be sanitized before processing.
:param do_lazy: Whether to use lazy versions of the plugins.
:param output_format: Desired output format (html/md) for plugins
:param user_print: Whether the plugins should output the original values or user's input (when exporting markdown).
:param target_format: for MD-print what exact format to use
:return: Processed HTML blocks along with JavaScript and CSS stylesheet dependencies.
"""
taketime("answ", "start")
if not view_ctx.preview and has_edit_access(doc.get_docinfo()):
for p in pars:
if p.is_translation_out_of_date():
p.add_class("troutofdate")
else:
if p.is_translation_unchecked():
p.add_class("checktr")
if sanitize:
for par in pars:
par.sanitize_html()
# init these for performance as they stay the same for all pars
md_out = output_format == PluginOutputFormat.MD
html_out = False if md_out else (output_format == PluginOutputFormat.HTML)
html_pars = [par.prepare(view_ctx, use_md=md_out) for par in pars]
if custom_answer is not None or task_id is not None:
if len(pars) != 1:
raise PluginException("len(blocks) must be 1 if custom state is specified")
plugins: DefaultDict[str, dict[KeyType, Plugin]] = defaultdict(OrderedDict)
answer_map: AnswerMap = {}
plugin_opts = PluginRenderOptions(
do_lazy=do_lazy,
user_print=user_print,
preview=view_ctx.preview,
target_format=target_format,
output_format=output_format,
user_ctx=user_ctx,
review=review,
wraptype=pluginwrap,
viewmode=view_ctx.viewmode,
)
if load_states and custom_answer is None and user_ctx.user.logged_in:
# TODO: could this return also the plugins, then there is no need for other iteration
task_ids, _, _ = find_task_ids(
pars, view_ctx, user_ctx, check_access=user_ctx.is_different
)
get_answers(user_ctx.user, task_ids, answer_map)
# get_answers(User.get_by_id(user_ctx.user.id), task_ids, answer_map)
# db.session.close()
# TODO: RND_SEED get all users rand_seeds for this doc's tasks. New table?
placements = {}
dumbo_opts = OrderedDict()
custom_answer_plugin = None
has_errors = False
for idx, block in enumerate(pars):
is_gamified = block.get_attr("gamification")
is_gamified = not not is_gamified
settings = block.doc.get_settings()
macroinfo = settings.get_macroinfo(view_ctx, user_ctx=user_ctx)
if is_gamified:
md = block.get_expanded_markdown(macroinfo=macroinfo)
try:
# TODO: Gamification map should be its own plugin
gd = YamlBlock.from_markdown(md).values
runner = "gamification-map"
html_pars[
idx
].output = f"<{runner} data={quoteattr(json.dumps(gd))}></{runner}>"
except yaml.YAMLError as e:
has_errors = True
html_pars[idx].output = (
'<div class="error"><p>Gamification error:</p><pre>'
+ str(e)
+ "</pre><p>From block:</p><pre>"
+ md
+ "</pre></div>"
)
pplace = PluginPlacement.from_par(
block=block,
load_states=load_states,
macroinfo=macroinfo,
plugin_opts=plugin_opts,
user_ctx=user_ctx,
view_ctx=view_ctx,
settings=settings,
answer_map=answer_map,
custom_answer=custom_answer,
output_format=output_format,
)
if pplace:
placements[idx] = pplace
for r, p in pplace.plugins.items():
plugins[p.type][idx, r] = p
if (custom_answer and p.task_id.doc_task == custom_answer.task_id) or (
task_id and p.task_id.doc_task == task_id
):
custom_answer_plugin = p
if not pplace.is_block_plugin:
dumbo_opts[idx] = block.get_dumbo_options(
base_opts=settings.get_dumbo_options()
)
else:
if block.nocache and not is_gamified: # get_nocache():
# if block.get_nocache():
texts = [block.get_expanded_markdown(macroinfo)]
htmls = call_dumbo(
texts,
options=block.get_dumbo_options(
base_opts=settings.get_dumbo_options()
),
)
html_pars[idx].output = sanitize_html(
htmls[0]
) # to collect all together before dumbo
# taketime("answ", "markup", len(plugins))
js_paths = []
css_paths = []
# TODO: Get plugin values before 1st answer query and loop for special cases
# (these tasks could have been omitted from 1st answer query)
glb_task_ids = []
glb_plugins_to_change = []
curruser_task_ids = []
curruser_plugins_to_change = []
taketime("glb/ucu", "GLO/currUser")
for plugin_name, plugin_block_map in plugins.items():
for _, plugin in plugin_block_map.items():
plugin.values.pop("postprogram", None)
plugin.values.pop("preprogram", None)
if not plugin.task_id:
continue
if plugin.task_id.is_global:
glb_task_ids.append(plugin.task_id)
glb_plugins_to_change.append(plugin)
elif plugin.known.useCurrentUser and user_ctx.is_different:
curruser_task_ids.append(plugin.task_id)
curruser_plugins_to_change.append(plugin)
if glb_task_ids:
get_answers(None, glb_task_ids, answer_map)
for p in glb_plugins_to_change:
a = answer_map.get(p.task_id.doc_task, None)
if not a:
continue
p.answer = a[0]
p.answer_count = a[1]
if curruser_task_ids:
for tid in curruser_task_ids:
answer_map.pop(tid.doc_task, None)
get_answers(user_ctx.logged_user, curruser_task_ids, answer_map)
for p in curruser_plugins_to_change:
p.options.user_ctx = UserContext.from_one_user(user_ctx.logged_user)
a = answer_map.get(p.task_id.doc_task, None)
if not a:
p.answer = None
p.answer_count = None
continue
p.answer = a[0]
p.answer_count = a[1]
# p.options.__setattr__("user", current_user)
taketime("glb/ucu", "done")
settings = doc.get_settings()
all_plugins = []
for plugin_name, plugin_block_map in plugins.items():
taketime("plg", plugin_name)
try:
plugin = get_plugin(plugin_name)
plugin_lazy = plugin.lazy
plugin_block_map_vals = [*plugin_block_map.values()]
for p in plugin_block_map_vals:
all_plugins.append(p)
resp = plugin_reqs(plugin_name)
except PluginException as e:
has_errors = True
for idx, r in plugin_block_map.keys():
placements[idx].set_error(r, str(e))
continue
# taketime("plg e", plugin_name)
try:
reqs = json.loads(resp)
plugin.can_give_task = reqs.get("canGiveTask", False)
if plugin_name == "mmcq" or plugin_name == "mcq":
reqs["multihtml"] = True
reqs["multimd"] = True
except ValueError as e:
has_errors = True
for idx, r in plugin_block_map.keys():
placements[idx].set_error(
r, f"Failed to parse JSON from plugin reqs route: {e}"
)
continue
plugin_js_files, plugin_css_files = plugin_deps(reqs)
for src in plugin_js_files:
if src.startswith("http") or src.startswith("/"): # absolute URL
js_paths.append(src)
elif src.endswith(".js"): # relative JS URL
js_paths.append(f"/{plugin_name}/{src}")
else: # module name
js_paths.append(src)
for src in plugin_css_files:
if src.startswith("http") or src.startswith("/"):
css_paths.append(src)
else:
css_paths.append(f"/{plugin_name}/{src}")
# Remove duplicates, preserving order TODO: could this be done out of the loop?
# taketime("rmv", "Remove dupl")
js_paths = list(OrderedDict.fromkeys(js_paths))
css_paths = list(OrderedDict.fromkeys(css_paths))
default_auto_md = reqs.get("default_automd", False)
if (html_out and reqs.get("multihtml")) or (md_out and reqs.get("multimd")):
try:
# taketime("plg m", plugin_name)
response = render_plugin_multi(
settings,
plugin_name,
list(plugin_block_map.values()),
plugin_output_format=output_format,
default_auto_md=default_auto_md,
)
taketime("plg e", plugin_name)
except PluginException as e:
has_errors = True
for idx, r in plugin_block_map.keys():
placements[idx].set_error(r, str(e))
continue
try:
plugin_htmls = json.loads(response)
except ValueError as e:
has_errors = True
for idx, r in plugin_block_map.keys():
placements[idx].set_error(
r, f"Failed to parse plugin response from multihtml route: {e}"
)
continue
if not isinstance(plugin_htmls, list):
for ((idx, r), plugin) in plugin_block_map.items():
plugin.plugin_lazy = plugin_lazy
placements[idx].set_error(
r,
f"Multihtml response of {plugin_name} was not a list: {plugin_htmls}",
)
else:
for ((idx, r), plugin), html in zip(
plugin_block_map.items(), plugin_htmls
):
plugin.plugin_lazy = plugin_lazy
placements[idx].set_output(r, html)
else:
for (idx, r), plugin in plugin_block_map.items():
if md_out:
err_msg_md = (
"Plugin does not support printing yet. "
"Please refer to TIM help pages if you want to learn how you can manually "
"define what to print here."
)
placements[idx].set_error(r, err_msg_md)
else:
try:
html = render_plugin(
docsettings=settings,
plugin=plugin,
output_format=output_format,
)
except PluginException as e:
has_errors = True
placements[idx].set_error(r, str(e))
continue
placements[idx].set_output(r, html)
taketime("plg m", "Plugins done")
taketime("plc", "Placement start")
for idx, place in placements.items():
par = html_pars[idx]
pass_to_dumbo = idx in dumbo_opts
output, plugin_htmls = place.get_block_output(extract_plugins=pass_to_dumbo)
par.output = output
if pass_to_dumbo:
par.plugin_htmls = plugin_htmls
taketime("plc", "Placement done")
# inline plugin blocks need to go through Dumbo to process MD
if output_format == PluginOutputFormat.HTML:
htmls_to_dumbo = []
settings_to_dumbo = []
taketime("dumbo", "start 1")
for k, v in dumbo_opts.items():
htmls_to_dumbo.append({"content": html_pars[k].output, **v.dict()})
settings_to_dumbo.append(v)
taketime("dumbo", "start 2")
for h, (idx, s) in zip(
call_dumbo(htmls_to_dumbo, options=doc.get_settings().get_dumbo_options()),
dumbo_opts.items(),
):
par = html_pars[idx]
for plugin_key, plugin_html in par.plugin_htmls.items():
h = h.replace(plugin_key, plugin_html)
par.plugin_htmls = None
par.output = sanitize_html(h)
taketime("phtml done")
return PluginifyResult(
pars=pars,
js_paths=js_paths,
css_paths=css_paths,
custom_answer_plugin=custom_answer_plugin,
all_plugins=all_plugins,
has_errors=has_errors,
)
[docs]def get_all_reqs():
allreqs = {}
for plugin, vals in get_plugins().items():
if vals.skip_reqs:
continue
try:
resp = plugin_reqs(plugin)
except PluginException:
continue
try:
reqs = json.loads(resp)
allreqs[plugin] = reqs
except ValueError:
continue
return allreqs
[docs]def plugin_deps(p: dict) -> tuple[list[str], list[str]]:
"""
:param p: is json of plugin requirements of the form:
{"js": ["js.js"], "css":["css.css"]}
"""
js_files = []
css_files = []
if "css" in p:
for cssF in p["css"]:
css_files.append(cssF)
if "js" in p:
for jsF in p["js"]:
js_files.append(jsF)
return js_files, css_files