"""Routes for document view."""
import dataclasses
import html
import time
from difflib import context_diff
from typing import Union, Any, ValuesView, Generator
import attr
import filelock
import sass
from flask import current_app
from flask import flash
from flask import redirect
from flask import render_template, make_response, Response, stream_with_context
from flask import request
from flask import session
from markupsafe import Markup
from sqlalchemy.orm import joinedload, defaultload
from timApp.answer.answers import add_missing_users_from_group, get_points_by_rule
from timApp.auth.accesshelper import (
verify_view_access,
verify_teacher_access,
verify_seeanswers_access,
get_doc_or_abort,
verify_manage_access,
AccessDenied,
ItemLockedException,
)
from timApp.auth.auth_models import BlockAccess
from timApp.auth.get_user_rights_for_item import get_user_rights_for_item
from timApp.auth.sessioninfo import get_current_user_object, logged_in, save_last_page
from timApp.document.caching import check_doc_cache, set_doc_cache, refresh_doc_expire
from timApp.document.create_item import create_or_copy_item, create_citation_doc
from timApp.document.docentry import DocEntry, get_documents
from timApp.document.docinfo import DocInfo
from timApp.document.docparagraph import DocParagraph
from timApp.document.docrenderresult import DocRenderResult
from timApp.document.docsettings import DocSettings, get_minimal_visibility_settings
from timApp.document.document import (
get_index_from_html_list,
dereference_pars,
Document,
)
from timApp.document.docviewparams import DocViewParams, ViewModelSchema
from timApp.document.hide_names import is_hide_names, force_hide_names
from timApp.document.post_process import post_process_pars, should_auto_read
from timApp.document.preloadoption import PreloadOption
from timApp.document.usercontext import UserContext
from timApp.document.viewcontext import (
default_view_ctx,
ViewRoute,
ViewContext,
viewmode_templates,
DEFAULT_VIEWMODE_TEMPLATE,
)
from timApp.document.viewparams import ViewParams, ViewParamsSchema
from timApp.folder.folder import Folder
from timApp.folder.folder_view import try_return_folder
from timApp.item.block import BlockType, Block
from timApp.item.blockrelevance import BlockRelevance
from timApp.item.item import Item
from timApp.item.partitioning import (
get_piece_size_from_cookie,
decide_view_range,
get_doc_version_hash,
load_index,
INCLUDE_IN_PARTS_CLASS_NAME,
save_index,
partition_texts,
get_index_with_header_id,
get_document_areas,
RequestedViewRange,
IndexedViewRange,
)
from timApp.item.scoreboard import get_score_infos_if_enabled
from timApp.item.tag import GROUP_TAG_PREFIX
from timApp.item.validation import has_special_chars
from timApp.messaging.messagelist.messagelist_utils import (
MESSAGE_LIST_DOC_PREFIX,
MESSAGE_LIST_ARCHIVE_FOLDER_PREFIX,
)
from timApp.peerreview.peerreview_utils import (
generate_review_groups,
get_reviews_for_user,
check_review_grouping,
PeerReviewException,
is_peerreview_enabled,
)
from timApp.plugin.plugin import find_task_ids
from timApp.plugin.pluginControl import get_all_reqs
from timApp.readmark.readings import mark_all_read
from timApp.tim_app import app
from timApp.timdb.exceptions import PreambleException
from timApp.timdb.sqa import db
from timApp.user.groups import verify_group_view_access
from timApp.user.settings.style_utils import resolve_themes
from timApp.user.settings.styles import generate_style
from timApp.user.user import User, has_no_higher_right
from timApp.user.usergroup import (
UserGroup,
get_usergroup_eager_query,
UserGroupWithSisuInfo,
)
from timApp.user.users import get_rights_holders_all
from timApp.user.userutils import DeletedUserException
from timApp.util.flask.requesthelper import (
view_ctx_with_urlmacros,
RouteException,
NotExist,
)
from timApp.util.flask.responsehelper import add_no_cache_headers
from timApp.util.flask.responsehelper import (
json_response,
ok_response,
get_grid_modules,
)
from timApp.util.flask.typedblueprint import TypedBlueprint
from timApp.util.timtiming import taketime
from timApp.util.utils import get_error_message, cache_folder_path
from timApp.util.utils import remove_path_special_chars, seq_to_str
from timApp.velp.velpgroups import set_default_velp_group_selected_and_visible
from tim_common.html_sanitize import sanitize_html
DEFAULT_RELEVANCE = 10
view_page = TypedBlueprint(
"view_page",
__name__,
url_prefix="",
)
DocumentSlice = tuple[list[DocParagraph], IndexedViewRange]
ViewRange = Union[RequestedViewRange, IndexedViewRange]
FlaskViewResult = Union[Response, tuple[Any, int]]
[docs]def get_partial_document(doc: Document, view_range: ViewRange) -> DocumentSlice:
all_pars = doc.get_paragraphs()
# Handle common unrestricted case first.
if view_range.is_full:
return all_pars, IndexedViewRange(b=0, e=len(all_pars), par_count=len(all_pars))
si = view_range.start_index
ei = view_range.end_index
if isinstance(view_range, RequestedViewRange):
spid = view_range.start_par_id
epid = view_range.end_par_id
else:
spid = None
epid = None
actual_start_index = None
actual_end_index = None
if si is not None and ei is not None:
actual_start_index = si
actual_end_index = ei
elif si is not None:
actual_start_index = si
elif ei is not None:
actual_end_index = ei
else:
for i, par in enumerate(all_pars):
if par.get_id() == epid:
actual_end_index = i
if par.get_id() == spid:
actual_start_index = i
if actual_start_index is not None and actual_end_index is not None:
break
if actual_start_index is None:
actual_start_index = 0
if actual_end_index is None:
actual_end_index = (
len(all_pars)
if view_range.size is None
else actual_start_index + view_range.size
)
pars = all_pars[actual_start_index:actual_end_index]
return pars, IndexedViewRange(
b=actual_start_index, e=actual_end_index, par_count=len(all_pars)
)
[docs]def get_document(doc_info: DocInfo, view_range: ViewRange) -> DocumentSlice:
doc = doc_info.document
doc.preload_option = PreloadOption.all
return get_partial_document(doc, view_range)
[docs]@view_page.get("/show_slide/<path:doc_path>")
def show_slide(doc_path):
return view(doc_path, ViewRoute.ShowSlide)
[docs]@view_page.get("/view/<path:doc_path>")
def view_document(doc_path):
taketime("route view begin")
ret = view(doc_path, ViewRoute.View)
taketime("route view end")
return ret
[docs]@view_page.get("/teacher/<path:doc_path>")
def teacher_view(doc_path):
return view(doc_path, ViewRoute.Teacher)
[docs]@view_page.get("/velp/<path:doc_path>")
def velp_view(doc_path):
return view(doc_path, ViewRoute.Velp)
[docs]@view_page.get("/answers/<path:doc_path>")
def see_answers_view(doc_path):
return view(doc_path, ViewRoute.Answers)
[docs]@view_page.get("/lecture/<path:doc_path>")
def lecture_view(doc_path):
return view(doc_path, ViewRoute.Lecture)
[docs]@view_page.get("/review/<path:doc_name>")
def review_view(doc_name):
return view(doc_name, ViewRoute.Review)
[docs]@view_page.get("/slide/<path:doc_path>")
def slide_view(doc_path):
return view(doc_path, ViewRoute.Slide, render_doc=False)
[docs]@view_page.get("/par_info/<int:doc_id>/<par_id>")
def par_info(doc_id, par_id):
doc = get_doc_or_abort(doc_id)
verify_view_access(doc)
for o in doc.owners:
o.load_personal_user()
par_name = doc.document.get_closest_paragraph_title(par_id)
return json_response(
{
"item": doc,
"par_name": par_name,
}
)
[docs]@view_page.get("/docViewInfo/<path:doc_name>")
def doc_access_info(doc_name):
doc_info = DocEntry.find_by_path(doc_name, fallback_to_id=True)
if not doc_info:
raise NotExist()
can_access = False
try:
view_access = verify_view_access(doc_info, require=False, check_duration=True)
can_access = view_access is not None
except ItemLockedException as ile:
view_access = ile.access
return json_response(
{"can_access": can_access, "right": view_access}, date_conversion=True
)
[docs]@attr.s(auto_attribs=True)
class ItemWithRights:
i: Item
rights: list[BlockAccess]
[docs] def to_json(self):
return {
**self.i.to_json(),
"grouprights": self.rights,
}
[docs]@view_page.get("/getItems")
def items_route(
folder: str | None = None,
folder_id: int | None = None,
recursive: bool = False,
include_rights: bool = False,
):
if folder is not None:
f = Folder.find_by_path(folder)
elif folder_id is not None:
f = Folder.get_by_id(folder_id)
else:
raise RouteException()
if not f:
raise NotExist("Folder not found.")
if not f.is_root():
verify_view_access(f)
items = get_items(f.path, recurse=recursive)
if include_rights:
u = get_current_user_object()
rights = get_rights_holders_all(
[i.id for i in items if u.has_manage_access(i)], order_by=BlockAccess.type
)
items = [ItemWithRights(i, rights[i.id]) for i in items]
return json_response(items)
[docs]@view_page.get("/view")
def index_page():
save_last_page()
return render_template("index.jinja2", items=get_items(""), item=Folder.get_root())
[docs]@view_page.get("/generateCache/<path:doc_path>")
def gen_cache(
doc_path: str,
same_for_all: bool = False,
force: bool = False,
print_diffs: bool = False,
group: str | None = None,
):
"""Pre-generates document cache for the users with non-expired rights.
Useful for exam documents to reduce server load at the beginning of the exam.
:param group: The usergroup for which to generate the cache. If omitted, the users are computed from the
currently active (or upcoming) rights.
:param print_diffs: Whether to output diff information about cache content. Each cache entry is compared with
the first cache entry.
:param doc_path: Path of the document for which to generate the cache.
:param same_for_all: Whether to use same cache for all users.
This speeds up cache generation significantly.
:param force: Whether to force cache generation even if the existing cache seems up-to-date.
"""
doc_info = DocEntry.find_by_path(doc_path, fallback_to_id=True)
if not doc_info:
raise NotExist("Document not found")
verify_manage_access(doc_info)
s = doc_info.document.get_settings()
if not s.is_cached():
raise RouteException("Document does not have caching enabled.")
if group:
ug = UserGroup.get_by_name(group)
if not ug:
raise RouteException("usergroup not found")
groups_that_need_access_check = {ug}
user_set = set(ug.users)
else:
# Compute users from the current rights.
accesses: ValuesView[BlockAccess] = doc_info.block.accesses.values()
group_ids = {a.usergroup_id for a in accesses if not a.expired}
users: list[tuple[User, UserGroup]] = (
User.query.join(UserGroup, User.groups)
.filter(UserGroup.id.in_(group_ids))
.with_entities(User, UserGroup)
.all()
)
groups_that_need_access_check = {
g for u, g in users if u.get_personal_group() != g
}
user_set = {u for u, _ in users}
for g in groups_that_need_access_check:
verify_group_view_access(g)
view_ctx = default_view_ctx
m = DocViewParams()
vp = ViewParams()
users_uniq = list(sorted(user_set, key=lambda u: u.name))
total = len(users_uniq)
digits = len(str(total))
# Make sure tags attribute is always loaded.
# Otherwise the "translations" variable (in doc_head.jinja2) will first not have "tags" and
# after encountering someone with manage access, it is loaded and all subsequent users will get it too.
# This wouldn't cause any user-facing bugs, but it makes it easier to compare cached HTMLs.
_ = doc_info.block.tags
def generate() -> Generator[tuple[str, DocRenderResult | None], None, None]:
first_cache = None
for i, u in enumerate(users_uniq):
start = f"{i + 1:>{digits}}/{total} {u.name}: "
cr = check_doc_cache(doc_info, u, view_ctx, m, vp.nocache)
view_ctx_cached = dataclasses.replace(view_ctx, for_cache=True)
if cr.doc and not force:
yield f"{start}already cached\n", cr.doc
else:
if first_cache is None or not same_for_all:
dr = render_doc_view(doc_info, m, view_ctx_cached, u, False)
first_cache = dr
else:
dr = first_cache
if dr.allowed_to_cache:
set_doc_cache(cr.key, dr)
yield f"{start}ok\n", dr
else:
yield f"{start}not allowed to cache (one or more plugins had errors)\n", None
def generate_with_lock():
try:
results = []
with filelock.FileLock(f"/tmp/generateCache_{doc_info.id}", timeout=0):
for r, cache_result in generate():
if cache_result:
results.append(cache_result)
yield r
if not print_diffs:
return
if not results:
return
yield "\n"
yield "---Start of diffs---\n"
# For checking cache correctness, print cache differences compared to the first cached result.
compare_head = results[0].head_html.splitlines(keepends=True)
compare_content = results[0].content_html.splitlines(keepends=True)
for r in results[1:]:
diff = context_diff(
compare_head, r.head_html.splitlines(keepends=True), n=0
)
yield "".join(diff)
diff = context_diff(
compare_content, r.content_html.splitlines(keepends=True), n=0
)
yield "".join(diff)
yield "-----------------\n"
yield "---End of diffs---\n"
except filelock.Timeout:
yield "Cache generation for this document is already in progress.\n"
return Response(stream_with_context(generate_with_lock()), mimetype="text/plain")
debug_time = time.time()
[docs]def show_time(s):
global debug_time
now = time.time()
print(s, now - debug_time)
debug_time = now
[docs]def get_module_ids(js_paths: list[str]):
for jsfile in js_paths:
yield jsfile.lstrip("/").rstrip(".js")
[docs]def goto_view(item_path, model: ViewParams) -> FlaskViewResult:
return make_response(
render_template(
"goto_view.jinja2",
item_path=item_path,
display_text=model.goto,
wait_max=model.wait_max,
direct_link_timer=model.direct_link_timer,
)
)
[docs]def view(item_path: str, route: ViewRoute, render_doc: bool = True) -> FlaskViewResult:
taketime("view begin", zero=True)
m: DocViewParams = ViewModelSchema.load(request.args, unknown="EXCLUDE")
vp: ViewParams = ViewParamsSchema.load(request.args, unknown="EXCLUDE")
if vp.goto:
return goto_view(item_path, vp)
if has_special_chars(item_path):
qs = request.query_string.decode("utf8")
return redirect(
remove_path_special_chars(request.path) + (f"?{qs}" if qs else "")
)
save_last_page()
doc_info = DocEntry.find_by_path(
item_path,
fallback_to_id=True,
docentry_load_opts=(
defaultload(DocEntry._block)
.defaultload(Block.accesses)
.joinedload(BlockAccess.usergroup),
joinedload(DocEntry.trs)
# TODO: These joinedloads are for some reason very inefficient at least for certain documents.
# See https://gitlab.com/tim-jyu/tim/-/issues/2201. Needs more investigation.
# .joinedload(Translation.docentry),
# joinedload(DocEntry.trs).joinedload(Translation._block)
),
)
if doc_info is None:
return try_return_folder(item_path)
if m.hide_names is not None:
session["hide_names"] = m.hide_names
should_hide_names = False
if route == ViewRoute.Teacher:
if not verify_teacher_access(doc_info, require=False):
verify_view_access(doc_info)
return redirect(f"/view/{item_path}")
elif route == ViewRoute.Answers:
if not verify_seeanswers_access(doc_info, require=False):
verify_view_access(doc_info)
return redirect(f"/view/{item_path}")
if not verify_teacher_access(doc_info, require=False):
should_hide_names = True
elif route == ViewRoute.Review:
if not is_peerreview_enabled(doc_info):
verify_view_access(doc_info)
return redirect(f"/view/{item_path}")
if not verify_teacher_access(doc_info, require=False):
should_hide_names = True
access = verify_view_access(doc_info, require=False, check_duration=True)
if not access:
if not logged_in():
return render_login(doc_info.document)
else:
adm = doc_info.document.get_settings().access_denied_message()
raise AccessDenied(*((adm,) if adm else ()))
if vp.login and not logged_in():
return render_login(doc_info.document)
current_user = get_current_user_object()
ug = current_user.get_personal_group()
if current_user.is_deleted:
raise DeletedUserException()
view_ctx = view_ctx_with_urlmacros(
route, hide_names_requested=should_hide_names or is_hide_names()
)
if render_doc:
cr = check_doc_cache(doc_info, current_user, view_ctx, m, vp.nocache)
if not cr.doc:
result = render_doc_view(doc_info, m, view_ctx, current_user, vp.nocache)
if result.allowed_to_cache:
set_doc_cache(cr.key, result)
else:
# Document reading was skipped during caching, mark it now
if not cr.doc.hide_readmarks and should_auto_read(
doc_info.document, [ug.id], current_user
):
mark_all_read(ug.id, doc_info.document)
db.session.commit()
refresh_doc_expire(cr.key)
result = cr.doc
else:
result = None
# This is only used for optimizing database access so that we can close the db session
# as early as possible.
preload_personal_folder_and_breadcrumbs(current_user, doc_info)
# TODO: Closing session here breaks is_attribute_loaded function.
# According to https://docs.sqlalchemy.org/en/13/errors.html#error-bhk3, it may not be good practice to close
# the session manually in the first place.
# db.session.close()
final_html = render_template(
viewmode_templates.get(route, DEFAULT_VIEWMODE_TEMPLATE) + ".jinja2",
access=access,
doc_content=result.content_html if result else "",
doc_head=result.head_html if result else "",
item=doc_info,
route=view_ctx.route.value,
override_theme=result.override_theme if result else None,
)
r = make_response(final_html)
add_no_cache_headers(r)
# db.session.commit()
return r
[docs]def preload_personal_folder_and_breadcrumbs(current_user: User, doc_info: DocInfo):
if current_user.logged_in:
current_user.get_personal_folder()
_ = doc_info.parents_to_root_eager
[docs]def get_additional_angular_modules(doc_info: DocInfo) -> set[str]:
result = set()
doc_settings = doc_info.document.get_settings()
if doc_info.path.startswith(MESSAGE_LIST_DOC_PREFIX):
result.add("timMessageListManagement")
if doc_info.path.startswith(MESSAGE_LIST_ARCHIVE_FOLDER_PREFIX):
result.add("timArchive")
if doc_settings.is_style_document():
result.add("stylePreview")
result |= set(doc_settings.additional_angular_modules())
return result
[docs]def render_doc_view(
doc_info: DocInfo,
m: DocViewParams,
view_ctx: ViewContext,
current_user: User,
clear_cache: bool,
) -> DocRenderResult:
# Check for incorrect group tags.
linked_groups = []
if current_user.has_manage_access(doc_info):
linked_groups, group_tags = get_linked_groups(doc_info)
if group_tags:
names = {ug.ug.name for ug in linked_groups}
missing = set(group_tags) - names
if missing:
flash(f"Document has incorrect group tags: {seq_to_str(list(missing))}")
piece_size = get_piece_size_from_cookie(request)
areas = None
if piece_size:
areas = get_document_areas(doc_info)
r_view_range = RequestedViewRange(b=m.b, e=m.e, size=m.size)
load_preamble = m.preamble
view_range = None
if piece_size and r_view_range.is_full:
view_range = decide_view_range(doc_info, piece_size, areas=areas)
load_preamble = True # If partitioning without URL-param, true is default.
index_cache_folder = cache_folder_path / "indexcache" / str(doc_info.id)
contents_have_changed = False
doc_hash = get_doc_version_hash(doc_info)
# db.session.close()
index = load_index(index_cache_folder / f"{doc_hash}.json")
if index is not None:
# If cached header is up to date, partition document here.
xs, view_range = get_document(doc_info, view_range or r_view_range)
else:
# Otherwise the partitioning is done after forming the index.
contents_have_changed = True
xs, _ = get_document(doc_info, RequestedViewRange(b=None, e=None, size=None))
_, view_range = get_document(doc_info, view_range or r_view_range)
doc = doc_info.document
hide_answers = m.noanswers
doc_settings = doc.get_settings()
# Used later to get partitioning with preambles included correct.
# Includes either only special class preambles, or all of them if b=0.
preamble_count = 0
preamble_pars = None
if (
load_preamble
or view_range.starts_from_beginning
and not doc.preamble_included # e.g. document is being cached in which case the preamble is preloaded
):
try:
preamble_pars = doc.insert_preamble_pars(
[INCLUDE_IN_PARTS_CLASS_NAME]
if not view_range.starts_from_beginning
else None
)
except PreambleException as e:
flash(e)
else:
xs = preamble_pars + xs
preamble_count = len(preamble_pars)
# Preload htmls here to make dereferencing faster
DocParagraph.preload_htmls(xs, doc_settings, view_ctx, clear_cache)
src_doc = doc.get_source_document()
if src_doc is not None:
DocParagraph.preload_htmls(
src_doc.get_paragraphs(), src_doc.get_settings(), view_ctx, clear_cache
)
rights = get_user_rights_for_item(doc_info, current_user)
word_list = (
doc_info.document.get_word_list()
if rights["editable"] and current_user.get_prefs().use_document_word_list
else []
)
# We need to deference paragraphs at this point already to get the correct task ids
xs = dereference_pars(xs, context_doc=doc, view_ctx=view_ctx)
total_points = None
tasks_done = None
task_groups = None
show_task_info = False
breaklines = False
user_list = []
teacher_or_see_answers = view_ctx.route.teacher_or_see_answers
task_ids, plugin_count, no_accesses = find_task_ids(
xs,
view_ctx,
UserContext.from_one_user(current_user),
check_access=teacher_or_see_answers,
)
if teacher_or_see_answers and no_accesses:
flash(
"You do not have full access to the following tasks: "
+ ", ".join([t.doc_task for t in no_accesses])
)
points_sum_rule = doc_settings.point_sum_rule()
if points_sum_rule and not points_sum_rule.count_all:
total_tasks = len(points_sum_rule.groups)
else:
total_tasks = len(task_ids)
if points_sum_rule and points_sum_rule.scoreboard_error:
flash(f"Error in point_sum_rule scoreboard: {points_sum_rule.scoreboard_error}")
usergroup = m.group
peer_review_start = doc_settings.peer_review_start()
peer_review_stop = doc_settings.peer_review_stop()
if teacher_or_see_answers:
user_list = None
ug = None
if usergroup is None:
try:
usergroup = doc_settings.group()
except ValueError:
flash("The setting 'group' must be a string.")
can_add_missing = True
if usergroup is not None:
ug = UserGroup.get_by_name(usergroup)
if not ug:
flash(f"User group {usergroup} not found")
else:
if not verify_group_view_access(ug, require=False, user=current_user):
if not ug.is_personal_group:
flash(f"You don't have access to group '{ug.name}'.")
ug = None
else:
can_add_missing = False
if ug:
user_list = [u.id for u in ug.users]
user_list = get_points_by_rule(points_sum_rule, task_ids, user_list)
if ug and can_add_missing:
user_list = add_missing_users_from_group(user_list, ug)
elif ug and not user_list and not can_add_missing:
flash(f"You don't have access to group '{ug.name}'.")
elif doc_settings.show_task_summary() and current_user.logged_in:
info = get_points_by_rule(
points_sum_rule, task_ids, [current_user.id], force_user=current_user
)
if info:
total_points = info[0]["total_points"]
tasks_done = info[0]["task_count"]
task_groups = info[0].get("groups")
breaklines = False
show_task_info = tasks_done > 0 or total_points != 0
if points_sum_rule:
breaklines = points_sum_rule.breaklines
show_task_info = show_task_info or points_sum_rule.force
no_question_auto_numbering = None
if view_ctx.route == ViewRoute.Lecture and current_user.has_edit_access(doc_info):
no_question_auto_numbering = doc_settings.auto_number_questions()
current_list_user: User | None = None
# teacher view sorts user by real name and selects the lowest - ensure first loaded answer matches the user
if user_list:
current_list_user = min(
user_list, key=lambda u: (u["user"].real_name or "").lower()
)["user"]
raw_css = doc_settings.css()
if raw_css:
try:
compiled_sass = sass.compile(string=raw_css, output_style="compact")
except sass.CompileError as e:
doc_css = None
flash(
Markup(
f"Document stylesheet has errors: <pre>{html.escape(str(e))}</pre>"
)
)
else:
doc_css = sanitize_html(
'<style type="text/css">' + compiled_sass + "</style>",
allow_styles=True,
)
else:
doc_css = None
# Custom backgrounds for slides
slide_background_url = None
slide_background_color = None
is_slide = view_ctx.route == ViewRoute.ShowSlide
if is_slide:
slide_background_url = doc_settings.get_slide_background_url()
slide_background_color = doc_settings.get_slide_background_color()
do_lazy = False
else:
do_lazy = (
m.lazy
if m.lazy is not None
else doc_settings.lazy(
default=plugin_count >= current_app.config["PLUGIN_COUNT_LAZY_LIMIT"]
)
)
user_ctx = UserContext(
user=current_list_user or current_user,
logged_user=current_user,
)
post_process_result = post_process_pars(
doc,
xs,
user_ctx,
view_ctx,
sanitize=False,
do_lazy=do_lazy,
load_plugin_states=not hide_answers,
)
if view_ctx.route.is_review:
user_list = []
if is_peerreview_enabled(doc_info):
if not check_review_grouping(doc_info):
try:
if not r_view_range.is_full:
# peer_review pairing generation may be called when only a part of the document is requested,
# however we need to know answerers from every task in the document, so we generate full
# document here
# TODO: alternative approach (separate route, timer etc) for launching peer_review generation
full_document_for_review, _ = get_document(
doc_info, RequestedViewRange(b=None, e=None, size=None)
)
if preamble_pars:
full_document_for_review = (
preamble_pars + full_document_for_review
)
DocParagraph.preload_htmls(
full_document_for_review,
doc_settings,
view_ctx,
clear_cache,
)
full_document_for_review = dereference_pars(
full_document_for_review, context_doc=doc, view_ctx=view_ctx
)
full_document_for_review = post_process_pars(
doc,
full_document_for_review,
user_ctx,
view_ctx,
sanitize=False,
do_lazy=do_lazy,
load_plugin_states=not hide_answers,
)
generate_review_groups(
doc_info, full_document_for_review.plugins
)
else:
generate_review_groups(doc_info, post_process_result.plugins)
set_default_velp_group_selected_and_visible(doc_info)
except PeerReviewException as e:
flash(str(e))
reviews = get_reviews_for_user(doc_info, current_user)
for review in reviews:
user_list.append(review.reviewable_id)
user_list = get_points_by_rule(points_sum_rule, task_ids, user_list)
if index is None:
index = get_index_from_html_list(t.output for t in post_process_result.texts)
doc_hash = get_doc_version_hash(doc_info)
save_index(index, index_cache_folder / f"{doc_hash}.json")
# If index was in cache, partitioning will be done earlier.
if view_range.is_restricted and contents_have_changed:
post_process_result.texts = partition_texts(
post_process_result.texts, view_range, preamble_count
)
if force_hide_names(current_user, doc_info) or view_ctx.hide_names_requested:
model_u = User.get_model_answer_user()
model_u_id = model_u.id if model_u else None
for entry in user_list:
eid = entry["user"].id
if eid != current_user.id and eid != model_u_id:
entry["user"].hide_name = True
show_unpublished_bg = doc_info.block.is_unpublished() and not app.config["TESTING"]
taketime("view to render")
score_infos = get_score_infos_if_enabled(doc_info, doc_settings, user_ctx)
reqs = get_all_reqs() # This is cached so only first time after restart takes time
taketime("reqs done")
doctemps = doc_settings.get("editor_templates")
if doctemps:
reqs["usertemps"] = doctemps
if is_slide:
post_process_result.js_paths.append("tim/document/slide")
angular_module_names = []
if teacher_or_see_answers or view_ctx.route.is_review:
post_process_result.js_paths.append("angular-ui-grid")
angular_module_names += get_grid_modules()
post_process_result.js_paths += get_additional_angular_modules(doc_info)
taketime("before render")
nav_ranges = []
if view_range.is_restricted:
piece_size = get_piece_size_from_cookie(request) or 20
first_range = decide_view_range(
doc_info, preferred_set_size=piece_size, index=0, forwards=True, areas=areas
)
previous_range = decide_view_range(
doc_info,
preferred_set_size=piece_size,
index=view_range.start_index,
forwards=False,
areas=areas,
)
next_range = decide_view_range(
doc_info,
preferred_set_size=piece_size,
index=view_range.end_index,
forwards=True,
areas=areas,
)
last_range = decide_view_range(
doc_info,
preferred_set_size=piece_size,
index=len(doc_info.document.get_paragraphs()),
forwards=False,
areas=areas,
)
# TODO: Find out if it's better to raise an error when any of these is None.
if first_range and previous_range and next_range and last_range:
nav_ranges = [
first_range.to_json("First"),
previous_range.to_json("Previous"),
next_range.to_json("Next"),
last_range.to_json("Last"),
]
if post_process_result.should_mark_all_read and not view_ctx.for_cache:
# TODO: Support multiple logged in users without using globals.
# On the other hand, should_mark_all_read is used only in exam mode,
# so we know there's only one user.
for group_id in [current_user.get_personal_group().id]:
mark_all_read(group_id, doc)
db.session.commit()
exam_mode = is_exam_mode(doc_settings, rights)
document_themes = doc_settings.themes()
if exam_mode:
document_themes = list(
dict.fromkeys(doc_settings.exam_mode_themes() + document_themes)
)
override_theme = None
document_themes_final = []
for theme in document_themes:
parts = theme.split(":", 1)
if len(parts) == 2:
view_route, theme = parts
if not theme:
continue
if view_route and view_ctx.route.value != view_route:
continue
document_themes_final.append(theme)
if document_themes_final:
document_theme_docs = resolve_themes(document_themes_final)
# If the user themes are not overridden, they are merged with document themes
user_themes = current_user.get_prefs().theme_docs()
if user_themes and not doc_settings.override_user_themes():
document_theme_docs = list(
(
{d.id: d for d in document_theme_docs}
| {d.id: d for d in user_themes}
).values()
)
theme_style, theme_hash = generate_style(document_theme_docs)
override_theme = f"{theme_style}?{theme_hash}"
hide_readmarks = doc_settings.hide_readmarks()
templates_to_render = (
["slide_head.jinja2", "slide_content.jinja2"]
if is_slide
else ["doc_head.jinja2", "doc_content.jinja2"]
)
tmpl_params = dict(
hide_links=should_hide_links(doc_settings, rights),
hide_top_buttons=should_hide_top_buttons(doc_settings, rights),
pars_only=m.pars_only or should_hide_paragraphs(doc_settings, rights),
hide_sidemenu=should_hide_sidemenu(doc_settings, rights),
show_unpublished_bg=show_unpublished_bg,
exam_mode=exam_mode,
rights=rights,
route=view_ctx.route.value,
edit_mode=(m.edit if current_user.has_edit_access(doc_info) else None),
item=doc_info,
text=post_process_result.texts,
headers=index,
plugin_users=user_list,
version=doc.get_version(),
js=post_process_result.js_paths,
cssFiles=post_process_result.css_paths,
jsMods=angular_module_names,
doc_css=doc_css,
start_index=view_range.start_index,
group=usergroup,
translations=[
tr.to_json(curr_user=current_user) for tr in doc_info.translations
],
reqs=reqs,
no_browser=hide_answers,
no_question_auto_numbering=no_question_auto_numbering,
live_updates=doc_settings.live_updates(),
slide_background_url=slide_background_url,
slide_background_color=slide_background_color,
score_infos=score_infos,
# TODO: Unify "task summary" and "scoreboard" features somehow.
task_info={
"total_points": total_points,
"tasks_done": tasks_done,
"total_tasks": total_tasks,
"show": show_task_info,
"groups": task_groups,
"breaklines": breaklines,
},
peer_review_start=peer_review_start,
peer_review_stop=peer_review_stop,
doc_settings=doc_settings,
word_list=word_list,
memo_minutes=doc_settings.memo_minutes(),
linked_groups=linked_groups,
current_view_range=view_range,
nav_ranges=nav_ranges,
should_mark_all_read=post_process_result.should_mark_all_read,
hide_readmarks=hide_readmarks,
override_theme=override_theme,
current_list_user=current_list_user,
)
# db.session.close()
head, content = (
render_template("partials/" + tmpl, **tmpl_params)
for tmpl in templates_to_render
)
# db.session.close()
return DocRenderResult(
head_html=head,
content_html=content,
allowed_to_cache=doc_settings.is_cached()
and not post_process_result.has_plugin_errors,
override_theme=override_theme,
hide_readmarks=doc_settings.hide_readmarks(),
)
[docs]def render_login(item: Document | None) -> FlaskViewResult:
view_settings = get_minimal_visibility_settings(item)
session["came_from"] = request.url
session["anchor"] = request.args.get("anchor", "")
return (
render_template(
"loginpage.jinja2",
came_from=request.full_path,
anchor=session["anchor"],
view_settings=view_settings,
),
403,
)
[docs]def get_items(folder: str, recurse=False):
u = get_current_user_object()
docs = get_documents(
search_recursively=recurse, filter_folder=folder, filter_user=u
)
docs.sort(key=lambda d: d.title.lower())
folders = Folder.get_all_in_path(root_path=folder, recurse=recurse)
folders.sort(key=lambda d: d.title.lower())
return [f for f in folders if u.has_view_access(f)] + docs
[docs]def get_linked_groups(i: Item) -> tuple[list[UserGroupWithSisuInfo], list[str]]:
group_tags = [
t.get_group_name() for t in i.block.tags if t.name.startswith(GROUP_TAG_PREFIX)
]
if group_tags:
return (
list(
map(
UserGroupWithSisuInfo,
get_usergroup_eager_query()
.filter(UserGroup.name.in_(group_tags))
.all(),
)
),
group_tags,
)
return [], group_tags
[docs]@view_page.get("/items/linkedGroups/<int:item_id>")
def get_linked_groups_route(item_id: int):
d = get_doc_or_abort(item_id)
verify_teacher_access(d)
return json_response(get_linked_groups(d)[0])
[docs]def should_hide_links(settings: DocSettings, rights: dict):
return has_no_higher_right(settings.hide_links(), rights)
[docs]def should_hide_paragraphs(settings: DocSettings, rights: dict):
return has_no_higher_right(settings.pars_only(), rights)
[docs]def is_exam_mode(settings: DocSettings, rights: dict):
return has_no_higher_right(settings.exam_mode(), rights)
[docs]@view_page.get("/getParDiff/<int:doc_id>/<int:major>/<int:minor>")
def check_updated_pars(doc_id, major, minor):
# return json_response({'diff': None,
# 'version': None})
# taketime("before verify")
doc = get_doc_or_abort(doc_id)
verify_view_access(doc)
# taketime("before liveupdates")
d = doc.document
settings = d.get_settings()
live_updates = settings.live_updates(0) # this cost 1-3 ms.
global_live_updates = 2 # TODO: take this from somewhere that it is possible to admin to change it by a route
if 0 < live_updates < global_live_updates:
live_updates = global_live_updates
if global_live_updates == 0: # To stop all live updates
live_updates = 0
# taketime("after liveupdates")
view_ctx = default_view_ctx
diffs = list(
d.get_doc_version((major, minor)).parwise_diff(d, view_ctx)
) # TODO cache this, about <5 ms
# taketime("after diffs")
curr_user = get_current_user_object()
rights = get_user_rights_for_item(
doc, curr_user
) # about 30-40 ms # TODO: this is the slowest part
# taketime("after rights")
for diff in diffs: # about < 1 ms
if diff.get("content"):
post_process_result = post_process_pars(
d,
diff["content"],
UserContext.from_one_user(curr_user),
view_ctx,
)
diff["content"] = {
"texts": render_template(
"partials/paragraphs.jinja2",
text=post_process_result.texts,
rights=rights,
preview=False,
hide_readmarks=settings.hide_readmarks(),
),
"js": post_process_result.js_paths,
"css": post_process_result.css_paths,
}
# taketime("after for diffs")
return json_response(
{"diff": diffs, "version": d.get_version(), "live": live_updates}
)
[docs]@view_page.get("/manage")
@view_page.get("/slide")
@view_page.get("/teacher")
@view_page.get("/answers")
@view_page.get("/review")
@view_page.get("/lecture")
def index_redirect():
return redirect("/view")
[docs]@view_page.post("/createItem")
def create_item_route(
item_path: str,
item_type: str,
item_title: str,
cite: int | None = None,
copy: int | None = None,
template: str | None = None,
use_template: bool = True,
):
if (
not app.config["ALLOW_CREATE_DOCUMENTS"]
and not get_current_user_object().is_admin
):
raise AccessDenied("Creating items is disabled.")
return json_response(
create_item_direct(
item_path,
item_type,
item_title,
cite,
copy,
template,
use_template,
)
)
[docs]def create_item_direct(
item_path: str,
item_type: str,
item_title: str,
cite: int | None = None,
copy: int | None = None,
template: str | None = None,
use_template: bool = True,
):
cite_id, copy_id, template_name = cite, copy, template
if use_template is None:
use_template = True
if cite_id:
item = create_citation_doc(cite_id, item_path, item_title)
else:
item = create_or_copy_item(
item_path,
BlockType.Document if item_type == "document" else BlockType.Folder,
item_title,
copy_id,
template_name,
use_template,
)
db.session.commit()
return item
[docs]@view_page.get("/items/<int:item_id>")
def get_item(item_id: int):
i = Item.find_by_id(item_id)
if not i:
raise NotExist("Item not found")
verify_view_access(i)
return json_response(i)
[docs]@view_page.post("/items/relevance/set/<int:item_id>")
def set_blockrelevance(item_id: int, value: int):
"""
Add block relevance or edit if it already exists for the block.
:param value: The relevance value.
:param item_id: Item id.
:return: Ok response.
"""
i = Item.find_by_id(item_id)
if not i:
raise NotExist("Item not found")
verify_manage_access(i)
relevance_value = value
# If block has existing relevance, delete it before adding the new one.
blockrelevance = i.relevance
if blockrelevance:
try:
db.session.delete(blockrelevance)
except Exception as e:
db.session.rollback()
raise RouteException(
f"Changing block relevance failed: {get_error_message(e)}"
)
blockrelevance = BlockRelevance(relevance=relevance_value)
try:
i.block.relevance = blockrelevance
db.session.commit()
except Exception as e:
db.session.rollback()
raise RouteException(
f"Setting block relevance failed: {get_error_message(e)}: {str(e)}"
)
return ok_response()
[docs]@view_page.get("/items/relevance/reset/<int:item_id>")
def reset_blockrelevance(item_id: int):
"""
Reset (delete) block relevance.
:param item_id: Item id.
:return: Ok response.
"""
i = Item.find_by_id(item_id)
if not i:
raise NotExist("Item not found")
verify_manage_access(i)
blockrelevance = i.relevance
if blockrelevance:
try:
db.session.delete(blockrelevance)
db.session.commit()
except Exception as e:
db.session.rollback()
raise RouteException(
f"Resetting block relevance failed: {get_error_message(e)}"
)
return ok_response()
[docs]@view_page.get("/items/relevance/get/<int:item_id>")
def get_relevance_route(item_id: int):
"""
Returns item relevance or first non-null parent relevance. If no relevance was found until root,
return default relevance.
:param item_id: Item id.
:return: Relevance object and whether it was inherited or not set (default).
"""
i = Item.find_by_id(item_id)
if not i:
raise NotExist("Item not found")
verify_view_access(i)
default = False
inherited = False
# If block has set relevance, return it.
if i.relevance:
return json_response(
{"relevance": i.relevance, "default": default, "inherited": inherited}
)
# Check parents for relevance in case target block didn't have one.
parents = i.parents_to_root(include_root=False)
for parent in parents:
if parent.relevance:
inherited = True
# Return relevance with parent's id.
return json_response(
{
"relevance": parent.relevance,
"default": default,
"inherited": inherited,
}
)
# If parents don't have relevance either, return default relevance.
default = True
return json_response(
{
"relevance": {"block_id": item_id, "relevance": DEFAULT_RELEVANCE},
"default": default,
"inherited": inherited,
}
)
[docs]def get_document_relevance(i: DocInfo) -> int:
"""
Returns document relevance value or first non-null parent relevance value.
If no relevance was found until root, return default relevance value.
:param i: Document.
:return: Relevance value.
"""
# If block has set relevance, return it.
if i.relevance:
return i.relevance.relevance
# Check parents for relevance in case target document didn't have one.
parents = i.parents_to_root(include_root=False)
for parent in parents:
if parent.relevance:
# Return parent relevance.
return parent.relevance.relevance
# If parents don't have relevance either, return default value as relevance.
return DEFAULT_RELEVANCE
[docs]@view_page.get("/viewrange/unset/piecesize")
def unset_piece_size():
resp = make_response()
resp.set_cookie(
key="r",
value="-1",
expires=0,
samesite="None",
secure=app.config["SESSION_COOKIE_SECURE"],
)
return resp
[docs]@view_page.post("/viewrange/set/piecesize")
def set_piece_size(pieceSize: int):
"""
Add cookie for user defined view range (if isn't set, doc won't be partitioned).
"""
piece_size = pieceSize
if not piece_size or piece_size < 1:
raise RouteException("Invalid piece size")
resp = make_response()
resp.set_cookie(
key="r",
value=str(piece_size),
samesite="None",
secure=app.config["SESSION_COOKIE_SECURE"],
)
return resp
[docs]@view_page.get("/viewrange/get/<int:doc_id>/<int:index>/<int:forwards>")
def get_viewrange(doc_id: int, index: int, forwards: int):
taketime("route view begin")
current_set_size = get_piece_size_from_cookie(request)
if not current_set_size:
raise RouteException("Piece size not found!")
doc_info = get_doc_or_abort(doc_id)
verify_view_access(doc_info)
view_range = decide_view_range(
doc_info, current_set_size, index, forwards=forwards > 0
)
return json_response(view_range)