from __future__ import annotations
import json
import os
import shutil
from datetime import datetime
from difflib import SequenceMatcher
from pathlib import Path
from tempfile import mkstemp
from time import time
from typing import Iterable, Generator
from typing import TYPE_CHECKING
from filelock import FileLock
from lxml import etree, html
from timApp.document.changelog import Changelog
from timApp.document.changelogentry import ChangelogEntry
from timApp.document.docparagraph import DocParagraph
from timApp.document.docsettings import DocSettings, resolve_settings_for_pars
from timApp.document.documentparser import DocumentParser
from timApp.document.documentparseroptions import DocumentParserOptions
from timApp.document.documentwriter import DocumentWriter
from timApp.document.editing.documenteditresult import DocumentEditResult
from timApp.document.exceptions import DocExistsError, ValidationException
from timApp.document.preloadoption import PreloadOption
from timApp.document.validationresult import ValidationResult
from timApp.document.version import Version
from timApp.document.viewcontext import ViewContext, default_view_ctx
from timApp.document.yamlblock import YamlBlock
from timApp.timdb.exceptions import (
TimDbException,
PreambleException,
InvalidReferenceException,
)
from timApp.timtypes import DocInfoType
from timApp.util.utils import get_error_html, trim_markdown, cache_folder_path
from tim_common.html_sanitize import presanitize_html_body
if TYPE_CHECKING:
from timApp.document.docinfo import DocInfo
[docs]def get_duplicate_id_msg(conflicting_ids):
return f'Duplicate paragraph id(s): {", ".join(conflicting_ids)}'
[docs]def par_list_to_text(sect: list[DocParagraph], export_hashes=False):
return DocumentWriter(
[par.dict() for par in sect], export_hashes=export_hashes
).get_text()
[docs]class Document:
def __init__(
self,
doc_id: int,
modifier_group_id: int | None = 0,
preload_option: PreloadOption = PreloadOption.none,
):
self.doc_id = doc_id
self.modifier_group_id = modifier_group_id
self.version = None
self.user = None
self.preload_option = preload_option
# Used to cache paragraphs in memory on request so the pars don't have to be read from disk in every for loop
self.par_cache: list[DocParagraph] | None = None
# List of par ids - it is much faster to load only ids and sometimes full pars are not needed
self.par_ids: list[str] | None = None
# List of corresponding hashes
self.par_hashes: list[str] | None = None
# Whether par_cache is incomplete - this is the case when insert_temporary_pars is called with PreloadOption.none
self.is_incomplete_cache: bool = False
# Whether the document exists on disk.
self.__exists: bool | None = None
# Cache for the original document.
self.source_doc: Document | None = None
# Cache for document settings.
self.settings_cache: DocSettings | None = None
# The corresponding DocInfo object.
self.docinfo: DocInfoType = None
# Cache for own settings; see get_own_settings
self.own_settings = None
# Whether preamble has been loaded
self.preamble_included = False
# Cache for documents that are referenced by this document
self.ref_doc_cache: dict[int, Document] = {}
# Cache for single paragraphs
self.single_par_cache: dict[str, DocParagraph] = {}
# Used for accessing previous/next paragraphs quickly based on id
self.par_map = None
# List of preamble pars if they have been inserted
self.preamble_pars = None
@property
def id(self):
return self.doc_id
[docs] @classmethod
def get_documents_dir(cls) -> Path:
from timApp.timdb.dbaccess import get_files_path
return get_files_path() / "docs"
[docs] def get_doc_dir(self):
return self.get_documents_dir() / str(self.doc_id)
def __repr__(self):
return f"Document(id={self.doc_id})"
def __iter__(self) -> DocParagraphIter | CacheIterator:
if self.par_cache is None:
return DocParagraphIter(self)
else:
return CacheIterator(self.par_cache.__iter__())
@classmethod
def __get_largest_file_number(cls, path: Path, default=None) -> int:
if not path.exists():
return default
largest = -1
for name in os.listdir(path):
try:
largest = max(largest, int(name))
except ValueError:
pass
return largest if largest > -1 else default
[docs] @classmethod
def version_exists(cls, doc_id: int, doc_ver: Version) -> bool:
"""Checks if a document version exists.
:param doc_id: Document id.
:param doc_ver: Document version.
:return: Boolean.
"""
return (
cls.get_documents_dir() / str(doc_id) / str(doc_ver[0]) / str(doc_ver[1])
).is_file()
def __update_par_map(self):
self.par_map = {}
for i in range(0, len(self.par_cache)):
curr_p = self.par_cache[i]
prev_p = self.par_cache[i - 1] if i > 0 else None
next_p = self.par_cache[i + 1] if i + 1 < len(self.par_cache) else None
self.par_map[curr_p.get_id()] = {"p": prev_p, "n": next_p, "c": curr_p}
self.par_ids = [par.get_id() for par in self.par_cache]
self.par_hashes = [par.get_hash() for par in self.par_cache]
if not self.is_incomplete_cache:
self.single_par_cache.update({p.get_id(): p for p in self.par_cache})
[docs] def load_pars(self):
"""Loads the paragraphs from disk to memory so that subsequent iterations for the Document are faster."""
self.par_cache = [par for par in self]
self.__update_par_map()
[docs] def ensure_pars_loaded(self):
if self.par_map is None:
self.load_pars()
[docs] def get_previous_par(
self, par: DocParagraph, get_last_if_no_prev=False
) -> DocParagraph | None:
return self.get_previous_par_by_id(par.get_id(), get_last_if_no_prev)
[docs] def get_previous_par_by_id(
self, par_id: str, get_last_if_no_prev=False
) -> DocParagraph | None:
if self.preload_option == PreloadOption.all:
self.ensure_pars_loaded()
else:
if self.par_map is not None:
pass
else:
self.ensure_par_ids_loaded()
try:
i = self.par_ids.index(par_id) - 1
except ValueError:
return (
self.get_paragraph(self.par_ids[-1])
if self.par_ids and get_last_if_no_prev
else None
)
return (
self.get_paragraph(self.par_ids[i])
if i >= 0 or get_last_if_no_prev
else None
)
prev = self.par_map.get(par_id)
result = None
if prev:
result = prev["p"]
if get_last_if_no_prev:
result = self.par_cache[-1] if self.par_cache else None
return result
[docs] def get_pars_till(self, par):
pars = []
i = self.__iter__()
try:
while True:
p = next(i)
pars.append(p)
if par.get_id() == p.get_id():
break
except StopIteration:
pass
# TODO: improve this
# 'i' might be a ListIterator or DocParagraphIter depending on whether the pars were cached
try:
i.close()
except AttributeError:
pass
return pars
[docs] def add_setting(self, key: str, value) -> None:
pars = list(self.get_settings_pars())
if not pars:
current_settings = {}
else:
current_settings = DocSettings.from_paragraph(pars[-1]).get_dict()
current_settings[key] = value
self.set_settings(current_settings)
[docs] def get_settings_pars(self) -> Generator[DocParagraph, None, None]:
self.ensure_par_ids_loaded()
for p_id in self.get_par_ids(no_preamble=True):
curr = self.get_paragraph(p_id)
if curr.is_setting():
yield curr
else:
break
[docs] def set_settings(self, settings: dict | YamlBlock, force_new_par: bool = False):
first_par = None
self.ensure_par_ids_loaded()
if self.par_ids:
first_par = self.get_paragraph(self.par_ids[0])
last_settings_par = None
settings_pars = list(self.get_settings_pars())
if settings_pars:
last_settings_par = settings_pars[-1]
if not isinstance(settings, YamlBlock):
assert isinstance(settings, dict)
settings = YamlBlock(values=settings)
new_par = DocSettings(self, settings).to_paragraph()
if first_par is None:
self.add_paragraph_obj(new_par)
else:
if last_settings_par is None:
self.insert_paragraph_obj(new_par, insert_before_id=first_par.get_id())
else:
if not last_settings_par.is_reference() and not force_new_par:
self.modify_paragraph_obj(last_settings_par.get_id(), new_par)
else:
self.insert_paragraph_obj(
new_par, insert_after_id=last_settings_par.get_id()
)
[docs] def get_tasks(self) -> Generator[DocParagraph, None, None]:
for p in self.get_dereferenced_paragraphs(default_view_ctx):
if p.is_task():
yield p
[docs] def get_lock(self) -> FileLock:
return FileLock(f"/tmp/doc_{self.doc_id}_lock")
[docs] def get_own_settings(self) -> YamlBlock:
"""Returns the settings for this document excluding any preamble documents."""
if self.own_settings is None:
self.ensure_par_ids_loaded()
self.own_settings = resolve_settings_for_pars(self.get_settings_pars())
return self.own_settings
[docs] def get_settings(self) -> DocSettings:
cached = self.settings_cache
if cached:
return cached
settings_block = self.get_own_settings()
final_settings = YamlBlock()
preambles = self.get_docinfo().get_preamble_docs()
for p in preambles:
final_settings = final_settings.merge_with(
resolve_settings_for_pars(p.document.get_settings_pars())
)
final_settings = final_settings.merge_with(settings_block)
settings = DocSettings(self, settings_dict=final_settings)
self.settings_cache = settings
return settings
[docs] def create(self, ignore_exists: bool = False):
path = self.get_doc_dir()
if not path.exists():
path.mkdir(exist_ok=True, parents=True)
self.__exists = None
elif not ignore_exists:
raise DocExistsError(self.doc_id)
[docs] def exists(self) -> bool:
if self.__exists is None:
self.__exists = self.get_doc_dir().exists()
return self.__exists
[docs] def export_markdown(
self,
export_hashes: bool = False,
export_ids: bool = True,
export_settings: bool = True,
with_tl: bool = False,
) -> str:
pars = [par for par in self if not par.is_setting() or export_settings]
if with_tl:
return "\n".join(
[par.get_exported_markdown(export_ids=export_ids) for par in pars]
)
return DocumentWriter(
[par.dict() for par in pars],
export_hashes=export_hashes,
export_ids=export_ids,
).get_text()
[docs] def export_raw_data(self):
"""Exports the raw JSON data of paragraphs. Useful for debugging."""
return [par.dict() for par in self]
[docs] def export_section(
self, par_id_start: str | None, par_id_end: str | None, export_hashes=False
) -> str:
sect = self.get_section(par_id_start, par_id_end)
return par_list_to_text(sect, export_hashes)
[docs] def get_section(
self, par_id_start: str | None, par_id_end: str | None
) -> list[DocParagraph]:
if par_id_start is None and par_id_end is None:
return []
if par_id_start is None or par_id_end is None:
raise TimDbException("Either of par_id_start and par_id_end was None")
all_pars = [par for par in self]
all_par_ids = [par.get_id() for par in all_pars]
try:
start_index = all_par_ids.index(par_id_start)
except ValueError:
return self._raise_not_found(par_id_start)
try:
end_index = all_par_ids.index(par_id_end)
except ValueError:
return self._raise_not_found(par_id_end)
if end_index < start_index:
start_index, end_index = end_index, start_index
return all_pars[start_index : end_index + 1]
[docs] def text_to_paragraphs(
self, text: str, break_on_elements: bool
) -> tuple[list[DocParagraph], ValidationResult]:
options = DocumentParserOptions()
options.break_on_code_block = break_on_elements
options.break_on_header = break_on_elements
options.break_on_normal = break_on_elements
dp = DocumentParser(text, options)
dp.add_missing_attributes()
vr = dp.validate_structure()
vr.raise_if_has_critical_issues()
blocks = [
DocParagraph.create(
doc=self,
md=trim_markdown(par["md"]),
attrs=par.get("attrs"),
par_id=par["id"],
)
for par in dp.get_blocks()
]
return blocks, vr
[docs] @classmethod
def remove(cls, doc_id: int, ignore_exists=False):
"""Removes the whole document.
:param doc_id: Document id to remove.
:return:
"""
d = Document(doc_id)
if d.exists():
shutil.rmtree(d.get_doc_dir())
elif not ignore_exists:
raise DocExistsError(doc_id)
[docs] def get_version(self) -> Version:
"""Gets the latest version of the document as a major-minor tuple.
:return: Latest version, or (-1, 0) if there isn't yet one.
"""
if self.version is not None:
return self.version
basedir = self.get_doc_dir()
major = self.__get_largest_file_number(basedir, default=0)
minor = (
0
if major < 1
else self.__get_largest_file_number(basedir / str(major), default=0)
)
self.version = major, minor
return major, minor
[docs] def get_id_version(self) -> tuple[int, int, int]:
major, minor = self.get_version()
return self.doc_id, major, minor
[docs] def get_doc_version(self, version=None) -> Document:
from timApp.document.documentversion import DocumentVersion
return DocumentVersion(
doc_id=self.doc_id,
doc_ver=version if version else self.get_version(),
modifier_group_id=self.modifier_group_id,
)
[docs] def get_version_path(self, ver: Version | None = None) -> Path:
version = self.get_version() if ver is None else ver
return (
self.get_documents_dir()
/ str(self.doc_id)
/ str(version[0])
/ str(version[1])
)
[docs] def get_refs_dir(self, ver: Version | None = None) -> Path:
version = self.get_version() if ver is None else ver
return (
cache_folder_path
/ "refs"
/ str(self.doc_id)
/ str(version[0])
/ str(version[1])
)
[docs] def get_reflist_filename(self, ver: Version | None = None) -> Path:
return self.get_refs_dir(ver) / "reflist_to"
[docs] def getlogfilename(self) -> Path:
return self.get_doc_dir() / "changelog"
def __write_changelog(
self, ver: Version, operation: str, par_id: str, op_params: dict | None = None
):
logname = self.getlogfilename()
src = logname.open("r") if logname.exists() else None
destfd, tmpname = mkstemp()
dest = os.fdopen(destfd, "w")
ts = time()
timestamp = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
entry = {
"group_id": self.modifier_group_id,
"par_id": par_id,
"op": operation,
"op_params": op_params,
"ver": ver,
"time": timestamp,
}
dest.write(json.dumps(entry))
dest.write("\n")
while src:
line = src.readline()
if line:
dest.write(line)
else:
src.close()
src = None
dest.close()
shutil.copyfile(tmpname, logname)
os.unlink(tmpname)
def __increment_version(
self, op: str, par_id: str, increment_major: bool, op_params: dict | None = None
) -> Version:
ver_exists = True
ver = self.get_version()
old_ver = None
while ver_exists:
old_ver = ver
ver = (
(old_ver[0] + 1, 0) if increment_major else (old_ver[0], old_ver[1] + 1)
)
ver_exists = (self.get_version_path(ver)).is_file()
if increment_major:
(self.get_documents_dir() / str(self.doc_id) / str(ver[0])).mkdir()
if old_ver[0] > 0:
shutil.copyfile(self.get_version_path(old_ver), self.get_version_path(ver))
else:
with self.get_version_path(ver).open("w"):
pass
self.__write_changelog(ver, op, par_id, op_params)
self.version = ver
self.par_cache = None
self.par_map = None
self.par_ids = None
self.par_hashes = None
self.source_doc = None
self.settings_cache = {}
self.own_settings = None
self.single_par_cache = {}
self.ref_doc_cache = {}
return ver
def __update_metadata(
self, pars: list[DocParagraph], old_ver: Version, new_ver: Version
):
if old_ver == new_ver:
raise TimDbException("__update_metadata called with old_ver == new_ver")
new_reflist_file = self.get_reflist_filename(new_ver)
reflist = self.get_referenced_document_ids(old_ver)
for p in pars:
if p.is_reference():
try:
referenced_pars = p.get_referenced_pars()
except TimDbException:
pass
else:
for par in referenced_pars:
try:
reflist.add(int(par.get_doc_id()))
except (ValueError, TypeError):
pass
self.__save_reflist(new_reflist_file, reflist)
[docs] def raise_if_not_exist(self, par_id: str):
if not self.has_paragraph(par_id):
self._raise_not_found(par_id)
def _raise_not_found(self, par_id: str):
raise TimDbException(self.get_par_not_found_msg(par_id))
[docs] def get_par_not_found_msg(self, par_id: str):
return f"Document {self.doc_id}: Paragraph not found: {par_id}"
[docs] def has_paragraph(self, par_id: str) -> bool:
"""Checks if the document has the given paragraph.
:param par_id: The paragraph id.
:return: Boolean.
"""
self.ensure_par_ids_loaded()
return par_id in self.par_ids
[docs] def get_paragraph(self, par_id: str) -> DocParagraph:
if self.preload_option == PreloadOption.all:
self.ensure_pars_loaded()
try:
return self.par_map[par_id]["c"]
except KeyError:
return self._raise_not_found(par_id)
cached = self.single_par_cache.get(par_id)
if cached:
return cached
self.ensure_par_ids_loaded()
try:
idx = self.par_ids.index(par_id)
except ValueError:
return self._raise_not_found(par_id)
fetched = DocParagraph.get(self, self.par_ids[idx], self.par_hashes[idx])
self.single_par_cache[par_id] = fetched
return fetched
[docs] def add_text(self, text: str) -> list[DocParagraph]:
"""Converts the given text to (possibly) multiple paragraphs and adds them to the document."""
pars, _ = self.text_to_paragraphs(text, False)
old_ver = self.get_version()
result = [self.add_paragraph_obj(p, update_meta=False) for p in pars]
new_ver = self.get_version()
self.__update_metadata(result, old_ver, new_ver)
return result
[docs] def add_paragraph_obj(self, p: DocParagraph, update_meta=True) -> DocParagraph:
"""Appends a new paragraph into the document.
:param update_meta: Whether to update metadata.
:param p: Paragraph to be added.
:return: The same paragraph object, or None if could not add.
"""
assert p.doc.doc_id == self.doc_id
p.store()
p.set_latest()
old_ver = self.get_version()
new_ver = self.__increment_version("Added", p.get_id(), increment_major=True)
old_path = self.get_version_path(old_ver)
new_path = self.get_version_path(new_ver)
if old_path.exists():
shutil.copyfile(old_path, new_path)
with new_path.open("a") as f:
f.write(p.get_id() + "/" + p.get_hash())
f.write("\n")
if update_meta:
self.__update_metadata([p], old_ver, new_ver)
return p
[docs] def add_paragraph(
self, text: str, par_id: str | None = None, attrs: dict | None = None
) -> DocParagraph:
"""Appends a new paragraph into the document.
:param par_id: The id of the paragraph or None if it should be autogenerated.
:param attrs: The attributes for the paragraph.
:param text: New paragraph text.
:return: The new paragraph object.
"""
p = DocParagraph.create(doc=self, par_id=par_id, md=text, attrs=attrs)
return self.add_paragraph_obj(p)
[docs] def delete_paragraph(self, par_id: str):
"""Removes a paragraph from the document.
:param par_id: Paragraph id to remove.
"""
self.raise_if_not_exist(par_id)
old_ver = self.get_version()
new_ver = self.__increment_version("Deleted", par_id, increment_major=True)
self.__update_metadata([], old_ver, new_ver)
with self.get_version_path(old_ver).open("r") as f_src:
with self.get_version_path(new_ver).open("w") as f:
while True:
line = f_src.readline()
if not line:
return
if line.startswith(par_id):
pass
else:
f.write(line)
[docs] def insert_paragraph(
self,
text: str,
insert_before_id: str | None = None,
insert_after_id: str | None = None,
attrs: dict | None = None,
par_id: str | None = None,
) -> DocParagraph:
"""Inserts a paragraph before a given paragraph id.
:param par_id: The id of the new paragraph or None if it should be autogenerated.
:param attrs: The attributes for the paragraph.
:param text: New paragraph text.
:param insert_before_id: Id of the paragraph to insert before, or None if last.
:param insert_after_id: Id of the paragraph to insert after, or None if first.
:return: The inserted paragraph object.
"""
p = DocParagraph.create(doc=self, par_id=par_id, md=text, attrs=attrs)
return self.insert_paragraph_obj(
p, insert_before_id=insert_before_id, insert_after_id=insert_after_id
)
[docs] def insert_paragraph_obj(
self,
p: DocParagraph,
insert_before_id: str | None = None,
insert_after_id: str | None = None,
) -> DocParagraph:
if not insert_before_id and not insert_after_id:
return self.add_paragraph_obj(p)
if "HELP_PAR" in (insert_after_id, insert_before_id):
return self.add_paragraph_obj(p)
p.store()
p.set_latest()
old_ver = self.get_version()
new_ver = self.__increment_version(
"Inserted",
p.get_id(),
increment_major=True,
op_params={"before_id": insert_before_id}
if insert_before_id
else {"after_id": insert_after_id},
)
new_line = p.get_id() + "/" + p.get_hash() + "\n"
with self.get_version_path(old_ver).open("r") as f_src, self.get_version_path(
new_ver
).open("w") as f:
while True:
line = f_src.readline()
if not line:
break
if insert_before_id and line.startswith(insert_before_id):
f.write(new_line)
f.write(line)
if insert_after_id and line.startswith(insert_after_id):
f.write(new_line)
self.__update_metadata([p], old_ver, new_ver)
return p
[docs] def modify_paragraph(
self, par_id: str, new_text: str, new_attrs: dict | None = None
) -> DocParagraph:
"""Modifies the text of the given paragraph.
:param par_id: Paragraph id.
:param new_text: New text.
:param new_attrs: New attributes.
:return: The new paragraph object.
"""
if new_attrs is None:
new_attrs = self.get_paragraph(par_id).get_attrs()
p = DocParagraph.create(md=new_text, doc=self, par_id=par_id, attrs=new_attrs)
return self.modify_paragraph_obj(par_id, p)
[docs] def modify_paragraph_obj(self, par_id: str, p: DocParagraph) -> DocParagraph:
if not self.has_paragraph(par_id):
raise KeyError(
f"No paragraph {par_id} in document {self.doc_id} version {self.get_version()}"
)
p_src = DocParagraph.get_latest(self, par_id)
p.set_id(par_id)
new_hash = p.get_hash()
p.store()
p.set_latest()
old_ver = self.get_version()
old_hash = p_src.get_hash()
if p.is_same_as(p_src):
return p
new_ver = self.__increment_version(
"Modified",
par_id,
increment_major=False,
op_params={"old_hash": old_hash, "new_hash": new_hash},
)
old_line_start = f"{par_id}/"
old_line_legacy = f"{par_id}\n"
new_line = f"{par_id}/{new_hash}\n"
with self.get_version_path(old_ver).open("r") as f_src, self.get_version_path(
new_ver
).open("w") as f:
while True:
line = f_src.readline()
if not line:
break
if line.startswith(old_line_start) or line == old_line_legacy:
f.write(new_line)
else:
f.write(line)
self.__update_metadata([p], old_ver, new_ver)
return p
[docs] def parwise_diff(self, other_doc: Document, view_ctx: ViewContext | None = None):
if self.get_version() == other_doc.get_version():
return
old_pars = self.get_paragraphs()
old_ids = [par.get_id() for par in old_pars]
new_pars = other_doc.get_paragraphs()
new_ids = [par.get_id() for par in new_pars]
s = SequenceMatcher(None, old_ids, new_ids)
opcodes = s.get_opcodes()
if view_ctx:
DocParagraph.preload_htmls(
old_pars, self.get_settings(), view_ctx, persist=False
)
DocParagraph.preload_htmls(
new_pars, other_doc.get_settings(), view_ctx, persist=False
)
for tag, i1, i2, j1, j2 in opcodes:
if tag == "insert":
yield {
"type": tag,
"after_id": old_ids[i2 - 1] if i2 > 0 else None,
"content": new_pars[j1:j2],
}
if tag == "replace":
yield {
"type": tag,
"start_id": old_ids[i1],
"end_id": old_ids[i2] if i2 < len(old_ids) else None,
"content": new_pars[j1:j2],
}
if tag == "delete":
yield {
"type": tag,
"start_id": old_ids[i1],
"end_id": old_ids[i2] if i2 < len(old_ids) else None,
}
if tag == "equal":
for old, new in zip(old_pars[i1:i2], new_pars[j1:j2]):
if not old.is_same_as(new):
yield {"type": "change", "id": old.get_id(), "content": [new]}
# Skip references because they have not been dereferenced and no HTML is available.
elif (
view_ctx
and not old.is_reference()
and not old.is_same_as_html(new, view_ctx)
):
yield {"type": "change", "id": old.get_id(), "content": [new]}
[docs] def update_section(
self, text: str, par_id_first: str, par_id_last: str
) -> tuple[str, str, DocumentEditResult]:
"""Updates a section of the document.
:param text: The text of the section.
:param par_id_first: The id of the paragraph that denotes the start of the section.
:param par_id_last: The id of the paragraph that denotes the end of the section.
"""
dp = DocumentParser(text)
dp.add_missing_attributes()
vr = dp.validate_structure()
vr.raise_if_has_critical_issues()
new_pars = dp.get_blocks()
new_par_id_set = {par["id"] for par in new_pars}
all_pars = [par for par in self]
all_par_ids = [par.get_id() for par in all_pars]
start_index, end_index = all_par_ids.index(par_id_first), all_par_ids.index(
par_id_last
)
old_pars = all_pars[start_index : end_index + 1]
other_par_ids = all_par_ids[:]
del other_par_ids[start_index : end_index + 1]
intersection = new_par_id_set & set(other_par_ids)
if intersection:
raise TimDbException("Duplicate id(s): " + str(intersection))
return self._perform_update(
new_pars,
old_pars,
last_par_id=all_par_ids[end_index + 1]
if end_index + 1 < len(all_par_ids)
else None,
)
[docs] def update(
self, text: str, original: str, strict_validation=True, regenerate_ids=False
) -> tuple[str, str, DocumentEditResult]:
"""Replaces the document's contents with the specified text.
:param text: The new text for the document.
:param original: The original text for the document.
:param strict_validation: Whether to use stricter validation rules for areas etc.
:param regenerate_ids: If True, paragraph IDs are regenerated for all blocks.
"""
dp = DocumentParser(text)
dp.add_missing_attributes(force_new_ids=regenerate_ids)
vr = dp.validate_structure()
if strict_validation:
vr.raise_if_has_any_issues()
else:
vr.raise_if_has_critical_issues()
new_pars = dp.get_blocks()
# If the original document has validation errors, it probably means the document export routine has a bug.
dp_orig = DocumentParser(original)
dp_orig.add_missing_attributes()
vr = dp_orig.validate_structure()
try:
vr.raise_if_has_critical_issues()
except ValidationException as e:
raise ValidationException(
"The original document contained a syntax error. "
"This is probably a TIM bug; please report it. "
f"Additional information: {e}"
)
blocks = dp_orig.get_blocks()
new_ids = {p["id"] for p in new_pars} - {p["id"] for p in blocks}
conflicting_ids = new_ids & set(self.get_par_ids())
if conflicting_ids:
raise ValidationException(get_duplicate_id_msg(conflicting_ids))
old_pars = [DocParagraph.from_dict(doc=self, d=d) for d in blocks]
return self._perform_update(new_pars, old_pars)
def _perform_update(
self, new_pars: list[dict], old_pars: list[DocParagraph], last_par_id=None
) -> tuple[str, str, DocumentEditResult] | tuple[None, None, DocumentEditResult]:
old_ids = [par.get_id() for par in old_pars]
new_ids = [par["id"] for par in new_pars]
s = SequenceMatcher(None, old_ids, new_ids)
opcodes = s.get_opcodes()
result = DocumentEditResult()
# Do delete operations first to avoid duplicate ids
for tag, i1, i2, j1, j2 in [
opcode for opcode in opcodes if opcode[0] in ["delete", "replace"]
]:
for par, par_id in zip(old_pars[i1:i2], old_ids[i1:i2]):
self.delete_paragraph(par_id)
result.deleted.append(par)
for tag, i1, i2, j1, j2 in opcodes:
if tag == "replace":
for par in new_pars[j1:j2]:
before_i = self.find_insert_index(i2, old_ids)
inserted = self.insert_paragraph(
par["md"],
attrs=par.get("attrs"),
par_id=par["id"],
insert_before_id=old_ids[before_i]
if before_i < len(old_ids)
else last_par_id,
)
result.added.append(inserted)
elif tag == "insert":
for par in new_pars[j1:j2]:
before_i = self.find_insert_index(i2, old_ids)
inserted = self.insert_paragraph(
par["md"],
attrs=par.get("attrs"),
par_id=par["id"],
insert_before_id=old_ids[before_i]
if before_i < len(old_ids)
else last_par_id,
)
result.added.append(inserted)
elif tag == "equal":
for idx, (new_par, old_par) in enumerate(
zip(new_pars[j1:j2], old_pars[i1:i2])
):
if (
new_par["t"] != old_par.get_hash()
or new_par.get("attrs", {}) != old_par.get_attrs()
):
if self.has_paragraph(old_par.get_id()):
self.modify_paragraph(
old_par.get_id(),
new_par["md"],
new_attrs=new_par.get("attrs"),
)
result.changed.append(old_par)
else:
before_i = self.find_insert_index(j1 + idx, new_ids)
inserted = self.insert_paragraph(
new_par["md"],
attrs=new_par.get("attrs"),
par_id=new_par["id"],
insert_before_id=old_ids[before_i]
if before_i < len(old_ids)
else last_par_id,
)
result.added.append(inserted)
if not new_ids:
return None, None, result
return new_ids[0], new_ids[-1], result
[docs] def find_insert_index(self, i2, old_ids):
before_i = i2
while before_i < len(old_ids) and not self.has_paragraph(old_ids[before_i]):
before_i += 1
return before_i
[docs] def get_index(self, view_ctx: ViewContext) -> list[tuple]:
pars = [par for par in DocParagraphIter(self)]
DocParagraph.preload_htmls(pars, self.get_settings(), view_ctx)
pars = dereference_pars(pars, context_doc=self, view_ctx=view_ctx)
# Skip plugins
html_list = [
par.get_html(view_ctx, no_persist=False)
for par in pars
if not par.is_dynamic()
]
return get_index_from_html_list(html_list)
[docs] def get_changelog(self, max_entries: int = 100) -> Changelog:
log = Changelog()
logname = self.getlogfilename()
if not logname.is_file():
return Changelog()
lc = max_entries
with logname.open("r") as f:
while lc != 0:
line = f.readline()
if not line:
break
try:
entry = json.loads(line)
log.append(ChangelogEntry(**entry))
except ValueError:
print(f"doc id {self.doc_id}: malformed log line: {line}")
lc -= 1
return log
[docs] def delete_section(self, area_start, area_end) -> DocumentEditResult:
result = DocumentEditResult()
for par in self.get_section(area_start, area_end):
self.delete_paragraph(par.get_id())
result.deleted.append(par)
return result
[docs] def get_named_section(self, section_name: str) -> list[DocParagraph]:
if self.preload_option == PreloadOption.all:
self.ensure_pars_loaded()
start_found = False
end_found = False
pars = []
with self.__iter__() as i:
for par in i:
if par.get_attr("area") == section_name:
start_found = True
if start_found:
pars.append(par)
if par.get_attr("area_end") == section_name:
end_found = True
break
if not start_found or not end_found:
raise InvalidReferenceException("Area not found: " + section_name)
return pars
[docs] def named_section_exists(self, section_name: str) -> bool:
with self.__iter__() as i:
for par in i:
if par.get_attr("area") == section_name:
return True
return False
[docs] def calculate_referenced_document_ids(self, ver: Version | None = None) -> set[int]:
"""Gets all the document ids that are referenced from this document recursively.
:return: The set of the document ids.
"""
refs = set()
source = self
if ver is not None:
from timApp.document.documentversion import DocumentVersion
source = DocumentVersion(self.doc_id, ver)
source.docinfo = self.docinfo
for p in source:
if p.is_reference():
try:
referenced_pars = p.get_referenced_pars()
except TimDbException:
pass
else:
for par in referenced_pars:
try:
refs.add(int(par.get_doc_id()))
except (ValueError, TypeError):
pass
return refs
def __load_reflist(self, reflist_name: Path) -> set[int]:
with reflist_name.open("r") as reffile:
return set(json.loads(reffile.read()))
def __save_reflist(self, reflist_name: Path, reflist: set[int]):
f: Path = reflist_name.parent
f.mkdir(exist_ok=True, parents=True)
with reflist_name.open("w") as reffile:
reffile.write(json.dumps(list(reflist)))
[docs] def get_referenced_document_ids(self, ver: Version | None = None) -> set[int]:
reflist_name = self.get_reflist_filename(ver)
if reflist_name.is_file():
reflist = self.__load_reflist(reflist_name)
else:
reflist = self.calculate_referenced_document_ids(ver)
self.__save_reflist(reflist_name, reflist)
return reflist
[docs] def get_paragraphs(self, include_preamble=False) -> list[DocParagraph]:
self.ensure_pars_loaded()
if include_preamble and not self.preamble_included:
# Make sure settings has been cached before preamble inclusion.
# Otherwise, getting settings after preamble inclusion will not work properly.
self.get_settings()
self.insert_preamble_pars()
return self.par_cache
[docs] def get_dereferenced_paragraphs(self, view_ctx: ViewContext) -> list[DocParagraph]:
return dereference_pars(
self.get_paragraphs(), context_doc=self, view_ctx=view_ctx
)
[docs] def get_closest_paragraph_title(self, par_id: str | None):
last_title = None
with self.__iter__() as it:
for par in it:
title = par.get_title()
if title is not None:
last_title = title
if par.get_id() == par_id:
return last_title
return None
[docs] def get_latest_version(self):
from timApp.document.documentversion import DocumentVersion
return DocumentVersion(
self.doc_id, self.get_version(), self.modifier_group_id, self.preload_option
)
[docs] def get_docinfo(self) -> DocInfo:
if self.docinfo is None:
from timApp.document.docentry import DocEntry
self.docinfo = DocEntry.find_by_id(self.doc_id)
return self.docinfo
[docs] def get_source_document(self) -> Document | None:
if self.source_doc is None:
docinfo = self.get_docinfo()
if docinfo.is_original_translation:
# We can't call get_settings method here because of potential infinite recursion.
# We therefore require that the source_document is always in the first settings paragraph of the
# document. This should be true for citation docs.
first_setting_par = next(self.get_settings_pars(), None)
if not first_setting_par:
return None
try:
settings = DocSettings.from_paragraph(first_setting_par)
except TimDbException:
return None
src_docid = settings.get_source_document()
self.source_doc = (
Document(src_docid, preload_option=self.preload_option)
if src_docid is not None
else None
)
else:
self.source_doc = docinfo.src_doc.document
self.ref_doc_cache[self.source_doc.doc_id] = self.source_doc
return self.source_doc
[docs] def get_last_par(self):
pars = [par for par in self]
return pars[-1] if pars else None
[docs] def get_par_ids(self, no_preamble=False):
self.ensure_par_ids_loaded()
if self.preamble_included and no_preamble:
return self.par_ids[len(self.preamble_pars) :]
else:
return self.par_ids
[docs] def ensure_par_ids_loaded(self) -> None:
if self.par_ids is None or self.is_incomplete_cache:
self._load_par_ids()
def _load_par_ids(self):
self.par_ids = []
self.par_hashes = []
if not self.get_version_path().exists():
return
with self.get_version_path().open("r", encoding="UTF-8") as f:
while True:
line = f.readline()
if not line:
break
if len(line) > 14:
# Line contains both par_id and t
par_id, t = line.replace("\n", "").split("/")
else:
par_id, t = line.replace("\n", ""), None
self.par_ids.append(par_id)
self.par_hashes.append(t)
[docs] def insert_preamble_pars(self, class_names: list[str] | None = None):
"""
Add preamble pars.
:param class_names: Optionally include only pars any of the listed classes.
:return: Preamble pars.
"""
if self.preamble_included:
return self.preamble_pars
self.ensure_pars_loaded()
# We must clone the preamble pars because they may be used in the context of multiple documents.
# See the test test_preamble_ref.
if not class_names:
pars = [p.clone() for p in self.get_docinfo().get_preamble_pars()]
else:
# Get pars with the any of the filter class names.
pars = [
p.clone()
for p in self.get_docinfo().get_preamble_pars_with_class(class_names)
]
current_ids = set(self.par_ids)
preamble_ids = {p.get_id() for p in pars}
if len(pars) != len(preamble_ids):
raise PreambleException(
"The paragraphs in preamble documents must have distinct ids among themselves."
)
isect = current_ids & preamble_ids
if isect:
raise PreambleException(
"The paragraphs in the main document must "
f"have distinct ids from the preamble documents. Conflicting ids: {isect}"
)
for p in pars:
p.preamble_doc = p.doc.get_docinfo()
p.doc = self
self.preamble_pars = pars
self.par_cache = pars + self.par_cache
self.__update_par_map()
self.preamble_included = True
return pars
[docs] def insert_temporary_pars(self, pars, context_par):
if self.preload_option == PreloadOption.all:
self.ensure_pars_loaded()
if context_par is None:
self.par_cache = pars + self.par_cache
else:
i = 0
for i, par in enumerate(self.par_cache):
if par.get_id() == context_par.get_id():
break
self.par_cache = (
self.par_cache[: i + 1] + pars + self.par_cache[i + 1 :]
)
else:
if context_par is None:
self.par_cache = pars
else:
self.par_cache = [context_par] + pars
self.is_incomplete_cache = True
self.__update_par_map()
[docs] def clear_mem_cache(self):
self.par_cache = None
self.par_map = None
self.version = None
self.par_ids = None
self.par_hashes = None
self.source_doc = None
self.settings_cache = {}
self.ref_doc_cache = {}
self.single_par_cache = {}
[docs] def get_ref_doc(self, ref_docid: int):
cached = self.ref_doc_cache.get(ref_docid)
if not cached:
cached = Document(ref_docid, preload_option=self.preload_option)
if not cached.exists():
raise InvalidReferenceException(
"The referenced document does not exist."
)
# It is allowed to reference things in preamble.
cached.insert_preamble_pars()
self.ref_doc_cache[ref_docid] = cached
return cached
[docs] def validate(self) -> ValidationResult:
return DocumentParser(self.export_markdown()).validate_structure()
[docs] def get_word_list(self) -> list[str]:
set_of_words = set()
for p in self:
if p.is_reference() and not p.is_translation():
continue
md = p.get_markdown()
parts = md.split()
for part in parts:
if part.isalnum():
set_of_words.add(part)
return list(set_of_words)
[docs]def add_index_entry(index_table, current_headers, header):
level = int(header.tag[1:])
current = {"id": header.get("id"), "text": header.text_content(), "level": level}
if level == 1:
if current_headers is not None:
index_table.append(current_headers)
current_headers = (current, [])
elif current_headers is not None:
current_headers[1].append(current)
return current_headers
[docs]class CacheIterator:
def __init__(self, i):
self.i = i
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __iter__(self):
return self.i
def __next__(self) -> DocParagraph:
return self.i.__next__()
[docs]class DocParagraphIter:
def __init__(self, doc: Document):
self.doc = doc
self.next_index = 0
name = doc.get_version_path(doc.get_version())
self.f = name.open("r") if name.is_file() else None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __iter__(self):
return self
def __next__(self) -> DocParagraph:
if not self.f:
raise StopIteration
while True:
line = self.f.readline()
if not line:
self.close()
raise StopIteration
if line != "\n":
if len(line) > 14:
# Line contains both par_id and t
par_id, t = line.rstrip("\n").split("/")
cached = self.doc.single_par_cache.get(par_id)
if cached:
return cached
fetched = DocParagraph.get(self.doc, par_id, t)
self.doc.single_par_cache[par_id] = fetched
return fetched
else:
# Line contains just par_id, use the latest t
return DocParagraph.get_latest(self.doc, line.rstrip("\n"))
[docs] def close(self):
if self.f:
self.f.close()
self.f = None
[docs]def get_index_from_html_list(html_table) -> list[tuple]:
index = []
current_headers = None
for htmlstr in html_table:
try:
index_entry = html.fragment_fromstring(
presanitize_html_body(htmlstr), create_parent=True
)
except etree.XMLSyntaxError:
continue
if index_entry.tag == "div":
for header in index_entry.iter("h1", "h2", "h3"):
current_headers = add_index_entry(index, current_headers, header)
elif index_entry.tag.startswith("h"):
current_headers = add_index_entry(index, current_headers, index_entry)
if current_headers is not None:
index.append(current_headers)
return index
[docs]def dereference_pars(
pars: Iterable[DocParagraph], context_doc: Document, view_ctx: ViewContext | None
) -> list[DocParagraph]:
"""Resolves references in the given paragraphs.
:param view_ctx:
:param pars: The DocParagraphs to be processed.
:param context_doc: The document being processing.
"""
new_pars = []
src_doc = context_doc.get_source_document()
for par in pars:
if par.is_reference():
try:
new_pars += par.get_referenced_pars(view_ctx)
except TimDbException as e:
err_par = DocParagraph.create(
par.doc, par_id=par.get_id(), md="", html=get_error_html(e)
)
new_pars.append(err_par)
else:
# If all of the following is true:
#
# * we are processing a translated document
# * the document has a preamble that has at least one plugin
# * the preamble does not have a translation
# * the current paragraph has not already been dereferenced earlier
#
# then, in order to make the answers go under the plugin at the original document,
# we have to "lie" that the paragraph has been dereferenced.
# This case is tested in test_plugin_in_preamble.
if src_doc is not None and par.original is None:
p = par.from_preamble()
if p and p.document.get_source_document() is None:
par.original = par
par.ref_doc = src_doc
new_pars.append(par)
return new_pars