Source code for timApp.document.docparagraph

from __future__ import annotations

import json
import os
import shelve
from collections import defaultdict
from copy import copy
from typing import TYPE_CHECKING

import commonmark
import filelock
from commonmark.node import Node
from jinja2.sandbox import SandboxedEnvironment

from timApp.document.documentparser import DocumentParser
from timApp.document.documentparseroptions import DocumentParserOptions
from timApp.document.documentwriter import DocumentWriter
from timApp.document.macroinfo import MacroInfo
from timApp.document.par_basic_data import ParBasicData
from timApp.document.preloadoption import PreloadOption
from timApp.document.prepared_par import PreparedPar
from timApp.document.randutils import random_id, hashfunc
from timApp.document.viewcontext import ViewContext, default_view_ctx
from timApp.markdown.autocounters import TimSandboxedEnvironment
from timApp.markdown.dumboclient import DumboOptions, MathType, InputFormat
from timApp.markdown.markdownconverter import (
    par_list_to_html_list,
    expand_macros,
    format_heading,
    AutoCounters,
)
from timApp.timdb.exceptions import TimDbException, InvalidReferenceException
from timApp.timtypes import DocumentType
from timApp.util.rndutils import get_rands_as_dict, SeedType
from timApp.util.utils import count_chars_from_beginning, get_error_html, title_to_id
from tim_common.html_sanitize import sanitize_html, strip_div
from tim_common.utils import parse_bool

if TYPE_CHECKING:
    from timApp.document.document import Document
    from timApp.document.docinfo import DocInfo

SKIPPED_ATTRS = {"r", "rd", "rp", "ra", "rt", "mt", "settings"}

# TODO: a bit short name for global variable
se = SandboxedEnvironment(autoescape=True)


# TODO: Make this a dataclass as soon as __slots__ is supported for dataclasses (coming in Python 3.10 maybe).
[docs]class DocParagraph: """Represents a paragraph that is associated with a :class:`Document`. See :doc:`docparagraph` for more info.""" __slots__ = { "__is_ref", "__is_setting", "__rands", "__rnd_seed", "answer_nr", "ask_new", "attrs", "doc", "prepared_par", "html", "html_sanitized", "nomacros", "original", "preamble_doc", "prev_deref", "ref_chain", "ref_doc", "ref_pars", "was_invalid", # persistent attributes (stored on disk): "attrs", "hash", # stored as 't' "html_cache", # stored as 'h' "id", "md", } def __init__(self, doc: Document): """Constructs a DocParagraph. :param doc: The Document object to which this paragraph is connected. """ self.doc: Document = doc self.prev_deref: DocParagraph | None = None self.ref_doc = None self.original: DocParagraph | None = None self.html_sanitized = False self.html = None self.prepared_par: PreparedPar | None = None # Cache for referenced paragraphs. Keys {True, False} correspond to the values of set_html parameter in # get_referenced_pars. self.ref_pars = {} self.__rands = None # random number macros for this pg self.__rnd_seed = 0 self.attrs: dict[str, str] | None = None self.nomacros = None self.ref_chain = None self.answer_nr: int | None = None # needed if variable tasks, None = not task at all or not variable task self.ask_new: bool | None = None # to send for plugins to force new question self.html_cache = None def __eq__(self, other): if isinstance(other, self.__class__): return self.is_identical_to(other) return NotImplemented def __ne__(self, other): if isinstance(other, self.__class__): return not (self == other) return NotImplemented
[docs] @staticmethod def help_par(): """Returns a dummy paragraph with id 'HELP_PAR' that is used as a placeholder for an empty document.""" return DocParagraph.create(doc=None, par_id="HELP_PAR")
[docs] @classmethod def create( cls, doc: Document | None, par_id: str | None = None, md: str = "", par_hash: str | None = None, html: str | None = None, attrs: dict | None = None, ) -> DocParagraph: """Creates a DocParagraph from the given parameters. :param doc: The Document object to which this paragraph is connected. :param par_id: The paragraph id or None if it should be autogenerated. :param md: The markdown content. :param par_hash: The hash for the paragraph or None if it should be computed. :param html: The HTML for the paragraph or None if it should be generated based on markdown. :param attrs: The attributes for the paragraph. :return: The created DocParagraph. """ par = DocParagraph(doc) par.html = html par.id = random_id() if par_id is None else par_id par.md = md par.hash = hashfunc(md, attrs) if par_hash is None else par_hash par.attrs = attrs or {} par._cache_props() return par
@property def nocache(self): return self.attrs.get("nocache", False)
[docs] def create_reference( self, doc, translator: str | None = None, r: str | None = None, add_rd: bool = True, ) -> DocParagraph: """Creates a reference paragraph to this paragraph. :param doc: The Document object in which the reference paragraph will reside. :param r: The kind of the reference. :param add_rd: If True, sets the rd attribute for the reference paragraph. :param translator: The name of the machine translator set to mt on machine translation. :return: The created DocParagraph. """ return create_reference( doc, doc_id=self.get_doc_id(), par_id=self.get_id(), r=r, add_rd=add_rd, translator=translator, )
[docs] @staticmethod def create_area_reference( doc: Document, area_name: str, r: str | None = None, rd: int | None = None ) -> DocParagraph: """Creates an area reference paragraph. :param area_name: The name of the area. :param doc: The Document object in which the reference paragraph will reside. :param r: The kind of the reference. :param rd: ID of the referenced document. :return: The created DocParagraph. """ par = DocParagraph.create(doc) par.set_attr("r", r) doc_id = doc.doc_id if rd is None else rd par.set_attr("rd", str(doc_id) if doc_id is not None else None) par.set_attr("ra", area_name) par.set_attr("rp", None) par._cache_props() return par
[docs] @classmethod def from_dict(cls, doc, d: dict) -> DocParagraph: """Creates a paragraph from a dictionary. :param doc: The Document object in which the paragraph will reside. :param d: The dictionary. :return: The created DocParagraph. """ par = DocParagraph(doc) par.id = d["id"] par.md = d["md"] par.attrs = d.get("attrs", {}) par.html_cache = d.get("h") par._cache_props() par._compute_hash() return par
[docs] def no_macros(self): nm = self.attrs.get("nomacros", None) if nm is not None: nm = nm.lower() return nm != "false" return self.doc.get_settings().nomacros(False)
[docs] def is_new_task(self): return self.attrs.get("seed", "") == "answernr"
[docs] @staticmethod def is_no_macros(settings, doc_macros): nm = settings.get("nomacros") if nm is not None: nm = nm.lower() return nm != "false" return doc_macros
[docs] @classmethod def get_latest(cls, doc, par_id: str) -> DocParagraph: """Retrieves the latest paragraph version from the data store. :param doc: The Document object for which to retrieve the paragraph. :param par_id: The paragraph id. :return: The retrieved DocParagraph. """ try: t = os.readlink(cls._get_path(doc, par_id, "current")) return cls.get(doc, par_id, t) except FileNotFoundError: doc._raise_not_found(par_id)
[docs] @classmethod def get(cls, doc, par_id: str, t: str) -> DocParagraph: """Retrieves a specific paragraph version from the data store. :param doc: The Document object for which to retrieve the paragraph. :param par_id: The paragraph id. :param t: The paragraph hash. :return: The retrieved DocParagraph. """ try: with open(cls._get_path(doc, par_id, t)) as f: return cls.from_dict(doc, json.loads(f.read())) except FileNotFoundError: doc._raise_not_found(par_id)
@classmethod def _get_path(cls, doc, par_id: str, t: str) -> str: """Returns the filesystem location for a specific paragraph version. :param doc: The Document object in which the paragraph resides. :param par_id: The paragraph id. :param t: The paragraph hash. :return: The filesystem location for the paragraph. """ from timApp.timdb.dbaccess import get_files_path froot = get_files_path() # For performance, we use string concatenation. The "/" operator of Path is slower # and it shows in perf profiles. return f"{froot}/pars/{doc.doc_id}/{par_id}/{t}" @classmethod def _get_base_path(cls, doc, par_id: str) -> str: """Returns the filesystem location for the versions of a given paragraph. :param doc: The Document object in which the paragraph resides. :param par_id: The paragraph id. :return: The filesystem location for the versions of the paragraph. """ from timApp.timdb.dbaccess import get_files_path froot = get_files_path() return (froot / "pars" / str(doc.doc_id) / par_id).as_posix()
[docs] def dict(self, include_html_cache: bool = False) -> dict: """Returns the persistent data as a dict.""" d = dict( attrs=self.attrs, id=self.id, md=self.md, t=self.hash, ) if include_html_cache and self.html_cache: d["h"] = self.html_cache return d
[docs] def get_basic_data(self): return ParBasicData( attrs=self.attrs, doc_id=self.doc.doc_id, hash=self.hash, id=self.id, md=self.md, )
[docs] def prepare( self, view_ctx: ViewContext, use_md: bool = False, cache: bool = True ) -> PreparedPar: """Returns the corresponding PreparedPar.""" if self.prepared_par: return self.prepared_par if self.original: basic_data = self.original.get_basic_data() target_data = self.get_basic_data() target_data.doc_id = self.ref_doc.doc_id else: basic_data = self.get_basic_data() target_data = None if use_md: output = self.md else: try: output = self.get_html(view_ctx, no_persist=True) except Exception as e: output = get_error_html(e) preamble = self.from_preamble() class_str = "par" if not self.get_attr("area"): if classes := self.classes: for c in classes: class_str += " " + c if self.is_question(): class_str += " questionPar" else: plugintype = self.get_attr("plugin") if plugintype: class_str += f" {plugintype}" if preamble: class_str += " preamble" fd = PreparedPar( data=basic_data, target=target_data, output=output, html_class=class_str, from_preamble=preamble.path if preamble else None, ) if cache: self.prepared_par = fd return fd
def _cache_props(self): """Caches some boolean properties about this paragraph in internal attributes.""" self.__is_ref = self.is_par_reference() or self.is_area_reference() self.__is_setting = "settings" in self.attrs
[docs] def get_doc_id(self) -> int: """Returns the Document id to which this paragraph is attached.""" return self.doc.doc_id
[docs] def get_id(self) -> str: """Returns the id of this paragraph.""" return self.id
[docs] def is_identical_to(self, par: DocParagraph): return self.is_same_as(par) and self.get_id() == par.get_id()
[docs] def is_different_from(self, par: DocParagraph) -> bool: """Determines whether the given paragraph is different from this paragraph content-wise.""" return not self.is_same_as(par)
[docs] def is_same_as(self, par: DocParagraph) -> bool: """Determines whether the given paragraph is same as this paragraph content-wise.""" return self.get_hash() == par.get_hash() and self.attrs == par.attrs
[docs] def is_same_as_html(self, par: DocParagraph, view_ctx: ViewContext): return self.is_same_as(par) and self.get_html( view_ctx, no_persist=True ) == par.get_html(view_ctx, no_persist=True)
[docs] def get_hash(self) -> str: """Returns the hash of this paragraph.""" return self.hash
[docs] def get_markdown(self) -> str: """Returns the markdown of this paragraph.""" return self.md
[docs] def insert_rnds(self, rnd_seed: SeedType | None) -> bool: """Inserts Jinja rnd variable as a list of random numbers based to attribute rnd and rnd_seed return True if attribute rnd found and OK, else False """ self.__rands, self.__rnd_seed, state = get_rands_as_dict( self.attrs, rnd_seed, None ) if self.__rands is None: return False return True
[docs] def get_rands(self): return self.__rands
[docs] def get_nomacros(self): if self.nomacros is not None: return self.nomacros self.nomacros = self.no_macros() return self.nomacros
[docs] def get_auto_id(self) -> str: task_id = self.attrs.get("taskId", None) if task_id: return task_id return self.id
[docs] def get_expanded_markdown( self, macroinfo: MacroInfo, ignore_errors: bool = False, ) -> str: """Returns the macro-processed markdown for this paragraph. :param macroinfo: The MacroInfo to use. If None, the MacroInfo is taken from the document that has the paragraph. :param ignore_errors: Whether or not to ignore errors when expanding the macros :return: The expanded markdown. """ md = self.md if self.get_nomacros(): return md settings = self.doc.get_settings() macros = macroinfo.get_macros() env = macroinfo.jinja_env counters = env.counters if counters: counters.task_id = self.get_auto_id() counters.is_plugin = self.is_plugin() try: if self.insert_rnds( md + macros.get("username", "") ): # TODO: RND_SEED: check what seed should be used, is this used to plugins? macros = {**macros, **self.__rands} except Exception as err: # raise Exception('Error in rnd: ' + str(err)) from err pass # TODO: show exception to user! return expand_macros( md, macros, settings, ignore_errors=ignore_errors, env=env, )
[docs] def get_title(self) -> str | None: """Attempts heuristically to return a title for this paragraph. :return: The title for this paragraph or None if there is no sensible title. """ md = self.md if len(md) < 3 or md[0] != "#" or md[1] == "-": return None attr_index = md.find("{") return md[2:attr_index].strip() if attr_index > 0 else md[2:].strip()
[docs] def get_exported_markdown(self, skip_tr=False, export_ids=False) -> str: """Returns the markdown in exported form for this paragraph.""" if (not skip_tr) and self.is_par_reference() and self.is_translation(): # This gives a default translation based on the source paragraph # todo: same for area reference data = [] try: ref_pars = self.get_referenced_pars() except InvalidReferenceException: pass else: for par in ref_pars: d = self.dict() md = par.md if md: d["md"] = md data.append(d) return DocumentWriter( data, export_hashes=False, export_ids=export_ids ).get_text() return DocumentWriter( [self.dict()], export_hashes=False, export_ids=export_ids ).get_text(DocumentParserOptions.single_paragraph())
def __get_setting_html(self) -> str: """Returns the HTML for the settings paragraph.""" from timApp.document.docsettings import DocSettings try: DocSettings.from_paragraph(self) except TimDbException as e: return f'<div class="pluginError">Invalid settings: {e}</div>' return se.from_string("<pre>{{yml}}</pre>").render(yml=self.md)
[docs] def get_html(self, view_ctx: ViewContext, no_persist: bool = True) -> str: """Returns the html for the paragraph.""" if self.html is not None: return self.html if self.is_plugin() or self.has_plugins(): return self._set_html("") if self.is_setting(): return self._set_html(self.__get_setting_html()) context_par = ( self.doc.get_previous_par(self, get_last_if_no_prev=False) if no_persist else None ) preload_pars = ( self.doc.get_paragraphs() if self.doc.preload_option == PreloadOption.all else [self] ) DocParagraph.preload_htmls( preload_pars, self.doc.get_settings(), view_ctx, context_par=context_par, persist=not no_persist, ) # This DocParagraph instance is not necessarily the same as what self.doc contains. In that case, we copy the # HTML from the doc's equivalent paragraph. if self.html is None: self.html = self.doc.par_map[self.get_id()]["c"].html assert self.html is not None return self.html
[docs] @classmethod def preload_htmls( cls, pars: list[DocParagraph], settings, view_ctx: ViewContext, clear_cache: bool = False, context_par: DocParagraph | None = None, persist: bool | None = True, ): """Loads the HTML for each paragraph in the given list. :param view_ctx: :param context_par: The context paragraph. Required only for previewing for now. :param persist: Whether the result of preloading should be saved to disk. :param clear_cache: Whether all caches should be refreshed. :param settings: The document settings. :param pars: Paragraphs to preload. :return: A list of paragraphs whose HTML changed as the result of preloading. """ if not pars: return [] doc_id = pars[0].doc.doc_id macro_cache_file = f"/tmp/tim_auto_macros_{doc_id}" heading_cache_file = f"/tmp/heading_cache_{doc_id}" first_pars = [] if context_par is not None: first_pars = [context_par] pars = first_pars + pars if not persist: cache = {} heading_cache = {} with shelve.open(macro_cache_file) as c, shelve.open( heading_cache_file ) as hc: # Basically we want the cache objects to be non-persistent, so we convert them to normal dicts # Find out better way if possible... for par in first_pars: key = str((par.get_id(), par.doc.get_version())) value = c.get(key) if value is not None: cache[key] = value value = hc.get(par.get_id()) if value is not None: heading_cache[par.get_id()] = value unloaded_pars = cls.get_unloaded_pars( pars, settings, cache, heading_cache, clear_cache ) else: with filelock.FileLock(f"/tmp/cache_lock_{doc_id}"): if clear_cache: try: os.remove(macro_cache_file + ".db") except FileNotFoundError: pass try: os.remove(heading_cache_file + ".db") except FileNotFoundError: pass with shelve.open(macro_cache_file) as cache, shelve.open( heading_cache_file ) as heading_cache: unloaded_pars = cls.get_unloaded_pars( pars, settings, cache, heading_cache, clear_cache ) for k, v in heading_cache.items(): heading_cache[k] = v changed_pars = [] if len(unloaded_pars) > 0: def deref_tr_par(p): """Required for getting the original par's attributes, so that for example "nonumber" class doesn't have to be repeated in translations. """ if not p.is_translation(): return p try: return p.get_referenced_pars()[0] except InvalidReferenceException as e: p.was_invalid = True p._set_html(get_error_html(e)) return p htmls = par_list_to_html_list( [deref_tr_par(par) for par, _, _, _, _ in unloaded_pars], settings=settings, view_ctx=view_ctx, auto_macros=( {"h": auto_macros["h"], "headings": hs} for _, _, auto_macros, hs, _ in unloaded_pars ), ) for (par, auto_macro_hash, _, _, old_html), h in zip(unloaded_pars, htmls): # h is not sanitized but old_html is, but HTML stays unchanged after sanitization most of the time # so they are comparable after stripping div. We want to avoid calling sanitize_html unnecessarily. if getattr(par, "was_invalid", False): continue if isinstance(h, bytes): h = h.decode() h = strip_div(h) if h != old_html: h = sanitize_html(h) if not par.from_preamble(): changed_pars.append(par) par.html_cache[auto_macro_hash] = h par._set_html(h, sanitized=True) if persist and not par.from_preamble(): par.__write() return changed_pars
[docs] @classmethod def get_unloaded_pars( cls, pars, settings, auto_macro_cache, heading_cache, clear_cache=False ): """Finds out which of the given paragraphs need to be preloaded again. :param pars: The list of paragraphs to be processed. :param settings: The settings for the document. :param auto_macro_cache: The cache object from which to retrieve and store the auto macro data. :param heading_cache: A cache object to store headings into. The key is paragraph id and value is a list of headings in that paragraph. :param clear_cache: Whether all caches should be refreshed. :return: A 5-tuple of the form: (paragraph, hash of the auto macro values, auto macros, so far used headings, old HTML). """ cumulative_headings = [] unloaded_pars = [] dyn = 0 macroinfo = settings.get_macroinfo(default_view_ctx) macros = macroinfo.get_macros() env = macroinfo.jinja_env settings_hash = settings.get_hash() for par in pars: if par.is_dynamic(): dyn += 1 continue if not clear_cache and par.html is not None: continue cached = par.html_cache try: auto_number_start = settings.auto_number_start() auto_macros = par.get_auto_macro_values( macros, env, auto_macro_cache, heading_cache, auto_number_start ) except RecursionError: raise TimDbException( "Infinite recursion detected in get_auto_macro_values; the document may be broken." ) auto_macro_hash = hashfunc(settings_hash + str(auto_macros)) par_headings = heading_cache.get(par.get_id()) if cumulative_headings: # Performance optimization: copy only if the set of headings changes if par_headings: all_headings_so_far = cumulative_headings[-1].copy() else: all_headings_so_far = cumulative_headings[-1] else: all_headings_so_far = defaultdict(int) cumulative_headings.append(all_headings_so_far) if par_headings is not None: for h in par_headings: all_headings_so_far[h] += 1 if not clear_cache and cached is not None: if type(cached) is str: # Compatibility old_html = cached else: cached_html = cached.get(auto_macro_hash) if cached_html is not None: par.html = cached_html continue else: try: old_html = next(iter(cached.values())) except StopIteration: old_html = None else: old_html = None tup = (par, auto_macro_hash, auto_macros, all_headings_so_far, old_html) par.html_cache = {} unloaded_pars.append(tup) return unloaded_pars
[docs] def has_class(self, class_name): """Returns whether this paragraph has the specified class.""" if classes := self.classes: return class_name in classes return False
[docs] def add_class(self, *classes: str): """Adds the specified class to this paragraph.""" for class_name in classes: if not self.has_class(class_name): curr_classes = self.classes if curr_classes is None: curr_classes = [] curr_classes.append(class_name) self.classes = curr_classes
[docs] def get_auto_macro_values( self, macros, env: TimSandboxedEnvironment, auto_macro_cache, heading_cache, auto_number_start, ): """Returns the auto macros values for the current paragraph. Auto macros include things like current heading/table/figure numbers. :param heading_cache: A cache object to store headings into. The key is paragraph id and value is a list of headings in that paragraph. :param macros: Macros to apply for the paragraph. :param auto_macro_cache: The cache object from which to retrieve and store the auto macro data. :param auto_number_start: Object of heading start numbers. :return: Auto macro values as a dict. :param env: Environment for macros. :return: A dict(str, dict(int,int)) containing the auto macro information. """ key = str((self.get_id(), self.doc.get_version())) cached = auto_macro_cache.get(key) if cached is not None: return cached prev_par: DocParagraph = self.doc.get_previous_par(self) if prev_par is None: prev_par_auto_values = {"h": auto_number_start} heading_cache[self.get_id()] = [] else: prev_par_auto_values = prev_par.get_auto_macro_values( macros, env, auto_macro_cache, heading_cache, auto_number_start ) # If the paragraph is a translation but it has not been translated (empty markdown), we use the md from the original. deref = None if prev_par is not None and prev_par.is_translation(): try: deref = prev_par.get_referenced_pars()[0] except InvalidReferenceException: # In case of an invalid reference, just skip this one. deref = None if ( prev_par is None or prev_par.is_dynamic() or prev_par.has_class("nonumber") or (deref and deref.has_class("nonumber")) ): auto_macro_cache[key] = prev_par_auto_values heading_cache[self.get_id()] = [] return prev_par_auto_values md_expanded = prev_par.md if not md_expanded and deref is not None: md_expanded = deref.md if not prev_par.get_nomacros(): # TODO: RND_SEED should we fill the rands also? md_expanded = expand_macros( md_expanded, macros, self.doc.get_settings(), env ) blocks = DocumentParser( md_expanded, options=DocumentParserOptions.break_on_empty_lines() ).get_blocks() deltas = copy(prev_par_auto_values["h"]) title_ids = [] for e in blocks: level = count_chars_from_beginning(e["md"], "#") if 0 < level < 7: title = e["md"][level:].strip() title_ids.append(title_to_id(title)) deltas[level] += 1 for i in range(level + 1, 7): deltas[i] = auto_number_start.get(i, 0) heading_cache[self.get_id()] = title_ids result = {"h": deltas} auto_macro_cache[key] = result return result
[docs] def sanitize_html(self): """Sanitizes the HTML for this paragraph. If the HTML has already been sanitized or the HTML has not been loaded, this method does nothing. """ if self.html_sanitized or not self.html: return new_html = sanitize_html(self.html) self._set_html(new_html, True)
def _set_html(self, new_html: str, sanitized: bool = False) -> str: """Sets the HTML for this paragraph. :param new_html: The new HTML. :param sanitized: Whether the HTML is sanitized. Default is False. :return: The HTML. """ self.html = new_html if self.prepared_par is not None: self.prepared_par.output = new_html self.html_sanitized = sanitized return self.html
[docs] def get_attr(self, attr_name: str, default_value: str | None = None) -> str | None: """Returns the value of the specified attribute. :param attr_name: The name of the attribute to get. :param default_value: The default value to return if the attribute does not exist. :return: The attribute value. """ return self.attrs.get(attr_name, default_value)
[docs] def set_markdown(self, new_md: str): """Sets markdown for this paragraph. :param new_md: The new markdown. """ self.md = new_md self._compute_hash()
def _compute_hash(self) -> None: self.hash = hashfunc(self.md, self.attrs) @property def classes(self) -> list[str] | None: return self.attrs.get("classes", None) @classes.setter def classes(self, classes: list[str] | None) -> None: # TODO: Class list should not be an attribute but its own list if classes is None: self.attrs.pop("classes", None) else: # noinspection PyTypeChecker self.attrs["classes"] = classes
[docs] def set_attr(self, attr_name: str, attr_val: str | None): """Sets the value of the specified attribute. :param attr_name: The name of the attribute to set. :param attr_val: The value for the attribute. """ if attr_val is None: self.attrs.pop(attr_name, None) else: self.attrs[attr_name] = attr_val self._cache_props() self._compute_hash()
[docs] def is_task(self): """Returns whether the paragraph is a task.""" return ( self.get_attr("taskId") is not None and self.get_attr("plugin") is not None )
[docs] def get_attrs(self) -> dict: return self.attrs
[docs] def get_base_path(self) -> str: """Returns the filesystem path for the versions of this paragraph.""" return self._get_base_path(self.doc, self.get_id())
[docs] def get_path(self) -> str: """Returns the filesystem path for this paragraph.""" return self._get_path(self.doc, self.id, self.hash)
def __write(self): file_name = self.get_path() does_exist = os.path.isfile(file_name) if not does_exist: base_path = self.get_base_path() if not os.path.exists(base_path): os.makedirs(base_path) with open(file_name, "w") as f: f.write(json.dumps(self.dict(include_html_cache=True)))
[docs] def set_latest(self): """Updates the 'current' symlink to point to this paragraph version.""" linkpath = self._get_path(self.doc, self.get_id(), "current") if linkpath == self.get_hash(): return if os.path.islink(linkpath) or os.path.isfile(linkpath): os.unlink(linkpath) os.symlink(self.get_hash(), linkpath)
[docs] def clone(self) -> DocParagraph: """Clones the paragraph. :return: The cloned paragraph. """ p = self return DocParagraph.create( attrs=p.attrs, doc=p.doc, html=p.html, md=p.md, par_hash=p.hash, par_id=p.id, )
[docs] def clear_cache(self) -> None: """Clears the HTML cache of this paragraph.""" self.html_cache = None
[docs] def save(self, add: bool = False) -> None: """Performs a save operation for this paragraph. This updates the document version and paragraph list appropriately. :param add: Whether to add (True) or modify an existing (False). """ # TODO: Possibly get rid of 'add' parameter altogether. if add: self.doc.add_paragraph_obj(self) else: self.doc.modify_paragraph_obj(self.get_id(), self)
[docs] def store(self): """Stores the paragraph to disk.""" self.__write() # Clear cached referenced paragraphs because this was modified self.ref_pars = {}
[docs] def is_reference(self) -> bool: """Returns whether this paragraph is a reference to some other paragraph.""" return self.__is_ref
[docs] def is_par_reference(self) -> bool: """Returns whether this paragraph is a reference to a single paragraph.""" return self.get_attr("rp") is not None
[docs] def is_area_reference(self) -> bool: """Returns whether this paragraph is a reference to an area.""" return self.get_attr("ra") is not None
[docs] def is_translation(self) -> bool: """Returns whether this paragraph is a translated paragraph.""" return self.get_attr("r") == "tr" and self.get_attr("rp") is not None
[docs] def get_referenced_pars( self, view_ctx: ViewContext | None = None ) -> list[DocParagraph]: cached = self.ref_pars.get(view_ctx) if cached is not None: return cached pars = [create_final_par(p, view_ctx) for p in self.get_referenced_pars_impl()] self.ref_pars[view_ctx] = pars return pars
[docs] def get_referenced_pars_impl( self, visited_pars: list[tuple[int, str]] | None = None ) -> list[DocParagraph]: """Returns the paragraphs that are referenced by this paragraph. The references are resolved recursively, i.e. if the referenced paragraphs are references themselves, they will also be resolved, and so on, until we get a list of non-reference paragraphs. :param visited_pars: A list of already visited paragraphs to prevent infinite recursion. :return: The list of resolved paragraphs. """ if visited_pars is None: visited_pars = [] par_doc_id = self.get_doc_id(), self.get_id() if par_doc_id in visited_pars: visited_pars.append(par_doc_id) raise InvalidReferenceException( f'Infinite referencing loop detected: {" -> ".join((f"{d}:{p}" for d, p in visited_pars))}' ) visited_pars.append(par_doc_id) ref_docid = None ref_doc = None attrs = self.attrs if "rd" in attrs: try: ref_docid = int(attrs["rd"]) except ValueError: raise InvalidReferenceException( f'Invalid reference document id: "{attrs["rd"]}"' ) else: ref_doc = ( self.doc.get_source_document() if not self.from_preamble() else self.from_preamble().document.get_source_document() ) if ref_doc is None: if ref_docid is None: raise InvalidReferenceException( "Source document for reference not specified." ) ref_doc = self.doc.get_ref_doc(ref_docid) if not ref_doc.exists(): raise InvalidReferenceException("The referenced document does not exist.") if self.is_par_reference(): try: par = ref_doc.get_paragraph(attrs["rp"]) par.prev_deref = self except TimDbException: raise InvalidReferenceException( "The referenced paragraph does not exist." ) if par.is_reference(): ref_pars = par.get_referenced_pars_impl(visited_pars=visited_pars) else: ref_pars = [par] elif self.is_area_reference(): if self.is_translation(): raise InvalidReferenceException( "A translated paragraph cannot be an area reference." ) section_pars = ref_doc.get_named_section(attrs["ra"]) ref_pars = [] for p in section_pars: p.prev_deref = self if p.is_reference(): ref_pars.extend( p.get_referenced_pars_impl(visited_pars=visited_pars) ) else: ref_pars.append(p) else: assert False return ref_pars
[docs] def is_dynamic(self) -> bool: """Returns whether this paragraph is a dynamic paragraph. A dynamic paragraph is a paragraph which is either * a plugin, * a reference which is not a translation, or * a setting. """ return ( self.is_plugin() or self.has_plugins() or (self.__is_ref and not self.is_translation()) or self.__is_setting )
[docs] def is_plugin(self) -> bool: """Returns whether this paragraph is a plugin.""" return bool(self.get_attr("plugin"))
[docs] def has_plugins(self) -> bool: """Returns whether this paragraph has inline plugins.""" return bool(self.get_attr("defaultplugin"))
[docs] def is_theme_style(self) -> bool: return self.get_attr("code_lang") in ("scss", "css")
[docs] def is_yaml(self) -> bool: """Returns whether this paragraph is YAML markup.""" return self.is_plugin() or self.is_setting()
[docs] def is_question(self) -> bool: """Returns whether this paragraph is a question paragraph.""" return self.is_plugin() and bool(self.get_attr("question"))
[docs] def is_setting(self) -> bool: """Returns whether this paragraph is a settings paragraph.""" return self.__is_setting
[docs] def from_preamble(self) -> DocInfo | None: """Returns the preamble document for this paragraph if the paragraph has been copied from a preamble.""" return getattr(self, "preamble_doc", None)
[docs] def set_id(self, par_id: str): """Sets the id for this paragraph. :param par_id: The new id for the paragraph. """ self.id = par_id
[docs] def is_citation(self): return self.get_attr("r") == "c"
[docs] def is_area(self): return ( self.get_attr("area") is not None or self.get_attr("area_end") is not None )
[docs] def has_dumbo_options(self): return bool( self.get_attr("math_type") or self.get_attr("math_preamble") or self.get_attr("input_format") or self.get_attr("smart_punct") )
[docs] def get_dumbo_options( self, base_opts: DumboOptions = DumboOptions.default() ) -> DumboOptions: return DumboOptions( math_type=MathType.from_string( self.get_attr("math_type") or base_opts.math_type ), math_preamble=self.get_attr("math_preamble") or base_opts.math_preamble, input_format=InputFormat.from_string(self.get_attr("input_format")) or base_opts.input_format, smart_punct=parse_bool(self.get_attr("smart_punct"), base_opts.smart_punct), )
[docs] def is_translation_out_of_date(self): if not self.ref_chain: return False last_ref = self.ref_chain.prev_deref reached_par = self.ref_chain return ( last_ref.is_translation() and not reached_par.is_setting() and reached_par.get_hash() != last_ref.get_attr("rt") )
[docs] def is_translation_unchecked(self): """ Checks whether or not the paragraph's translation has been checked by a human. :return: False if the paragraph is not a translation or it has been checked, true if it is not checked """ if not self.ref_chain: return False last_ref = self.ref_chain.prev_deref reached_par = self.ref_chain return ( last_ref.is_translation() and not reached_par.is_setting() and last_ref.get_attr("mt") is not None )
[docs]def is_real_id(par_id: str | None): """Returns whether the given paragraph id corresponds to some real paragraph instead of being None or a placeholder value ('HELP_PAR'). :param par_id: The paragraph id. :return: True if the given paragraph id corresponds to some real paragraph, False otherwise. """ return par_id is not None and par_id != "HELP_PAR"
[docs]def create_reference( doc: DocumentType, doc_id: int, par_id: str, translator: str | None = None, r: str | None = None, add_rd: bool = True, ) -> DocParagraph: """Creates a reference paragraph to a paragraph. :param par_id: Id of the original paragraph. :param doc_id: Id of the original document. :param doc: The Document object in which the reference paragraph will reside. :param r: The kind of the reference. :param add_rd: If True, sets the rd attribute for the reference paragraph. :param translator: The name of the machine translator set to mt on machine translation. :return: The created DocParagraph. """ par = DocParagraph.create(doc) par.set_attr("r", r) par.set_attr("rd", str(doc_id) if add_rd else None) par.set_attr("rp", par_id) par.set_attr("ra", None) par.set_attr("mt", translator) par._cache_props() return par
[docs]def create_final_par( reached_par: DocParagraph, view_ctx: ViewContext | None ) -> DocParagraph: """Creates the finalized dereferenced paragraph based on a chain of references.""" last_ref = reached_par.prev_deref if last_ref.is_translation() and last_ref.get_markdown(): md = last_ref.get_markdown() else: md = reached_par.get_markdown() first_ref = reached_par is_any_norm_reference = False ref_list = [] while True: ref_list.append(first_ref) if not first_ref.prev_deref: break first_ref = first_ref.prev_deref is_any_norm_reference = is_any_norm_reference or ( first_ref.is_reference() and not first_ref.is_translation() ) new_attrs = {} for r in reversed(ref_list): for k, v in r.get_attrs().items(): if k in SKIPPED_ATTRS: continue if isinstance(v, list): li = new_attrs.get(k) if not isinstance(li, list): li = [] new_attrs[k] = li li += v else: new_attrs[k] = v if all(p.is_setting() for p in ref_list): new_attrs["settings"] = "" final_par = DocParagraph.create( attrs=new_attrs, doc=reached_par.doc, md=md, par_hash=reached_par.get_hash(), par_id=reached_par.get_id(), ) # We need 2 different documents under final_par: # 1. what document to use for settings: "doc" attribute # 2. what document id to put in HTML's ref-doc-id (might not be same as settings): "ref_doc" attribute final_par.original = first_ref final_par.ref_doc = reached_par.doc final_par._cache_props() final_par.prepared_par = None if first_ref.from_preamble(): final_par.preamble_doc = first_ref.from_preamble() if first_ref.is_translation(): final_par.doc = first_ref.doc if not is_any_norm_reference: final_par.ref_doc = first_ref.doc.get_source_document() elif last_ref.is_translation(): final_par.doc = last_ref.doc final_par.ref_doc = last_ref.doc.get_source_document() final_par.ref_chain = reached_par if view_ctx: html = ( last_ref.get_html(view_ctx, no_persist=False) if last_ref.is_translation() else reached_par.get_html(view_ctx, no_persist=False) ) # if html is empty, use the source if html == "": html = reached_par.get_html(view_ctx, no_persist=False) final_par._set_html(html) return final_par
[docs]def get_heading_counts(ctx: DocParagraph): d = ctx.doc macro_cache_file = f"/tmp/tim_auto_macros_{d.doc_id}" ps = commonmark.Parser() with shelve.open(macro_cache_file) as cache: vals = cache.get(str((ctx.get_id(), d.get_version())), {}).get("h") return vals
[docs]def add_heading_numbers( s: str, ctx: DocParagraph, heading_format: dict, heading_ref_format: dict = None, jump_name: str = None, counters: AutoCounters = None, initial_heading_counts: dict[int, int] | None = None, ): d = ctx.doc macro_cache_file = f"/tmp/tim_auto_macros_{ctx.doc.doc_id}" # TODO: Cache sould be picked up only once and used as a paramter ps = commonmark.Parser() parsed = ps.parse(s) with shelve.open(macro_cache_file) as cache: vals = cache.get(str((ctx.get_id(), d.get_version())), {}).get("h") if not vals: return s lines = s.splitlines(keepends=False) curr: Node = parsed.first_child while curr: if curr.t == "heading": level = curr.level line_idx = curr.sourcepos[0][0] - 1 heading_line = lines[line_idx] heading_start = "#" * level # Pandoc's table syntax can conflict with CommonMark heading syntax, so we need an extra check. # This could be more accurate, but it's enough for now. if heading_line.startswith(heading_start + " "): line = heading_line[level + 1 :] if not line.endswith("{.unnumbered}"): # TODO: add heading counters to counter macros lines[line_idx] = ( heading_start + " " + format_heading( line, level, vals, heading_format, heading_ref_format, jump_name, counters, initial_counts=initial_heading_counts, ) ) if counters: counters.set_heading_vals(vals) curr = curr.nxt return "\n".join(lines)
[docs]def add_headings_to_counters( s: str, jump_name: str = None, counters: AutoCounters = None, ): if not counters: return s if not jump_name: return s ps = commonmark.Parser() parsed = ps.parse(s) lines = s.splitlines(keepends=False) curr: Node = parsed.first_child while curr: if curr.t == "heading": level = curr.level line_idx = curr.sourcepos[0][0] - 1 heading_line = lines[line_idx] heading_start = "#" * level if heading_line.startswith(heading_start + " "): line = heading_line[level + 1 :] line = line.replace("{.unnumbered}", "") if not line.strip(): continue counters.add_counter("chap", jump_name, "", line) curr = curr.nxt return s