Source code for timApp.auth.accesshelper

import ipaddress
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from pathlib import Path

from flask import flash, current_app
from flask import request, g
from sqlalchemy import inspect

from timApp.auth.accesstype import AccessType
from timApp.auth.auth_models import BlockAccess
from timApp.auth.session.util import (
    SessionExpired,
    session_has_access,
)
from timApp.auth.sessioninfo import (
    logged_in,
    get_other_users_as_list,
    get_current_user_group,
    get_current_user_object,
)
from timApp.document.docentry import DocEntry
from timApp.document.docinfo import DocInfo
from timApp.document.docparagraph import DocParagraph
from timApp.document.document import Document, dereference_pars
from timApp.document.usercontext import UserContext
from timApp.document.viewcontext import ViewContext, OriginInfo
from timApp.folder.folder import Folder
from timApp.item.block import BlockType, Block
from timApp.item.item import Item
from timApp.notification.send_email import send_email
from timApp.plugin.plugin import (
    Plugin,
    find_plugin_from_document,
    maybe_get_plugin_from_par,
)
from timApp.plugin.pluginexception import PluginException
from timApp.plugin.taskid import TaskId, TaskIdAccess
from timApp.timdb.exceptions import TimDbException
from timApp.timdb.sqa import db
from timApp.user.user import ItemOrBlock, User
from timApp.user.usergroup import UserGroup
from timApp.user.userutils import grant_access
from timApp.util.flask.requesthelper import get_option, RouteException, NotExist
from timApp.util.logger import log_warning
from timApp.util.utils import get_current_time


[docs]def get_doc_or_abort(doc_id: int, msg: str | None = None) -> DocInfo: d = DocEntry.find_by_id(doc_id) if not d: raise NotExist(msg or "Document not found") return d
[docs]def get_item_or_abort(item_id: int): i = Item.find_by_id(item_id) if not i: raise NotExist("Item not found") return i
[docs]def get_folder_or_abort(folder_id: int): f = Folder.get_by_id(folder_id) if not f: raise NotExist("Folder not found") return f
[docs]def verify_admin(require: bool = True, user: User | None = None) -> bool: if not check_admin_access(user=user): if require: raise AccessDenied("This action requires administrative rights.") return False return True
[docs]def verify_admin_no_ret(require=True): verify_admin(require)
[docs]def verify_edit_access( b: ItemOrBlock, require=True, message=None, check_duration=False, check_parents=False, ): return verify_access( b, AccessType.edit, require=require, message=message, check_duration=check_duration, check_parents=check_parents, )
[docs]def verify_manage_access( b: ItemOrBlock, require=True, message=None, check_duration=False, check_parents=False, ): return verify_access( b, AccessType.manage, require=require, message=message, check_duration=check_duration, check_parents=check_parents, )
[docs]def verify_access( b: ItemOrBlock, access_type: AccessType, require: bool = True, message: str | None = None, check_duration=False, check_parents=False, grace_period=timedelta(seconds=0), user: User | None = None, ): u = user or get_current_user_object() has_access = u.has_access(b, access_type, grace_period) if not has_access and check_parents: # Only uploaded files and images have a parent so far. for x in (u.has_access(p, access_type, grace_period) for p in b.parents): if x: has_access = x break return abort_if_not_access_and_required( has_access, u, b, access_type, require, message, check_duration=check_duration, )
[docs]def check_inherited_right( u: User, b: ItemOrBlock, access_type: AccessType, grace_period: timedelta, ) -> BlockAccess | None: has_access = None is_docinfo = isinstance(b, DocInfo) if is_docinfo or (isinstance(b, Block) and b.type_id == BlockType.Document.value): doc = b if is_docinfo else DocEntry.find_by_id(b.id) if not doc: return None if doc.path_without_lang in current_app.config["INHERIT_FOLDER_RIGHTS_DOCS"]: has_access = u.has_access(b.parent, access_type, grace_period) return has_access
[docs]def get_inherited_right_blocks(b: ItemOrBlock) -> list[Block]: inherited_right_docs = current_app.config["INHERIT_FOLDER_RIGHTS_DOCS"] if not inherited_right_docs: return [] is_docinfo = isinstance(b, DocInfo) if is_docinfo or (isinstance(b, Block) and b.type_id == BlockType.Document.value): doc = b if is_docinfo else DocEntry.find_by_id(b.id) if not doc: return [] if doc.path_without_lang in inherited_right_docs: return [b.parent.id] return []
[docs]def verify_view_access( b: ItemOrBlock, require=True, message=None, check_duration=False, check_parents=False, user=None, ): return verify_access( b, AccessType.view, require=require, message=message, check_duration=check_duration, check_parents=check_parents, user=user, )
[docs]def verify_teacher_access( b: ItemOrBlock, require=True, message=None, check_duration=False, check_parents=False, user=None, ): return verify_access( b, AccessType.teacher, require=require, message=message, check_duration=check_duration, check_parents=check_parents, user=user, )
[docs]def verify_copy_access( b: ItemOrBlock, require=True, message=None, check_duration=False, check_parents=False, ): return verify_access( b, AccessType.copy, require=require, message=message, check_duration=check_duration, check_parents=check_parents, )
[docs]def verify_seeanswers_access( b: ItemOrBlock, require=True, message=None, check_duration=False, check_parents=False, user=None, ): return verify_access( b, AccessType.see_answers, require=require, message=message, check_duration=check_duration, check_parents=check_parents, user=user, )
[docs]class ItemLockedException(Exception): """The exception that is raised (in /view route) when a user attempts to access an item for which he has a duration access that has not yet begun or the access has expired.""" def __init__( self, access: BlockAccess, msg: str | None = None, next_doc: DocInfo | None = None, ): self.access = access self.msg = msg self.next_doc = next_doc
[docs]def abort_if_not_access_and_required( access_obj: BlockAccess, user: User, block: ItemOrBlock, access_type: AccessType, require=True, message=None, check_duration=False, ): if access_obj: return access_obj if not session_has_access(block, user): raise SessionExpired() block_ids = [block.id, *get_inherited_right_blocks(block)] if check_duration: ba = ( BlockAccess.query.filter(BlockAccess.block_id.in_(block_ids)) .filter_by( type=access_type.value, usergroup_id=get_current_user_group(), ) .first() ) if ba is None: ba_group: BlockAccess = ( BlockAccess.query.filter(BlockAccess.block_id.in_(block_ids)) .filter_by(type=access_type.value) .filter( BlockAccess.usergroup_id.in_( get_current_user_object() .get_groups(include_expired=False) .with_entities(UserGroup.id) ) ) .first() ) if ba_group is not None: ba = BlockAccess( block_id=ba_group.block_id, type=ba_group.type, usergroup_id=get_current_user_group(), accessible_from=ba_group.accessible_from, accessible_to=ba_group.accessible_to, duration=ba_group.duration, duration_from=ba_group.duration_from, duration_to=ba_group.duration_to, require_confirm=ba_group.require_confirm, ) if ba is not None: unlock = get_option(request, "unlock", False) if unlock and ba.unlockable: ba.accessible_from = get_current_time() ba.accessible_to = ba.accessible_from + ba.duration_now # if this is a group duration, it means we created a personal BlockAccess instance above, so we # need to add it if inspect(ba).transient: db.session.add(ba) db.session.commit() # TODO ensure nothing else gets committed than the above if isinstance(block, Item): targets = current_app.config["DIST_RIGHTS_UNLOCK_TARGETS"] curr_targets = targets.get(block.path) if curr_targets: from timApp.tim_celery import send_unlock_op send_unlock_op.delay( get_current_user_object().email, curr_targets ) flash("Item was unlocked successfully.") if ba.accessible_from < ba.accessible_to: return ba else: raise ItemLockedException(ba) else: # Chaining: If the right to this document has expired, check if there is a document that should # get auto-confirmed. if ba.expired: msg, next_doc = maybe_auto_confirm(block) else: msg, next_doc = None, None raise ItemLockedException(ba, msg, next_doc) if require: raise AccessDenied( message or "Sorry, you don't have permission to use this resource." ) return None
[docs]def maybe_auto_confirm(block: ItemOrBlock): msg = None next_doc = None if isinstance(block, DocInfo): s = block.document.get_settings() ac = s.auto_confirm() if isinstance(ac, str): target = None if block.lang_id: target = DocEntry.find_by_path(f"{ac}/{block.lang_id}") if not target: target = DocEntry.find_by_path(ac) if not target: flash("auto_confirm document does not exist") else: t_s = target.document.get_settings() asc = t_s.allow_self_confirm_from() allowed = set() if isinstance(asc, str): allowed.add(asc) elif isinstance(asc, list): for a in asc: if isinstance(a, str): allowed.add(a) aliases = {a.path for a in block.aliases} if allowed & aliases: try: acc = get_single_view_access(target, allow_group=True) except AccessDenied as e: flash("Cannot get access to target document: " + str(e)) else: next_doc = target msg = s.expire_next_doc_message() acc.do_confirm() db.session.commit() else: flash("Document is not authorized to auto-confirm rights") return msg, next_doc
[docs]def has_view_access(b: ItemOrBlock): u = get_current_user_object() return check_inherited_right( u, b, AccessType.view, grace_period=timedelta(seconds=0) ) or u.has_view_access(b)
[docs]def has_edit_access(b: ItemOrBlock): return get_current_user_object().has_edit_access(b)
[docs]def has_comment_right(b: ItemOrBlock): return has_view_access(b) if logged_in() else None
[docs]def has_read_marking_right(b: ItemOrBlock): return has_view_access(b) if logged_in() else None
[docs]def has_teacher_access(b: ItemOrBlock): return get_current_user_object().has_teacher_access(b)
[docs]def has_manage_access(b: ItemOrBlock): return get_current_user_object().has_manage_access(b)
[docs]def has_seeanswers_access(b: ItemOrBlock): return get_current_user_object().has_seeanswers_access(b)
[docs]def has_ownership(b: ItemOrBlock): return get_current_user_object().has_ownership(b)
[docs]def check_admin_access(block_id=None, user=None) -> BlockAccess | None: curr_user = user if curr_user is None: curr_user = get_current_user_object() if curr_user.is_admin: return BlockAccess( block_id=block_id, accessible_from=datetime.min.replace(tzinfo=timezone.utc), type=AccessType.owner.value, usergroup_id=curr_user.get_personal_group().id, ) return None
[docs]def verify_logged_in() -> None: if not logged_in(): raise AccessDenied("You have to be logged in to perform this action.")
[docs]def verify_ownership( b: ItemOrBlock, require=True, message=None, check_duration=False, check_parents=False, ): return verify_access( b, AccessType.owner, require=require, message=message, check_duration=check_duration, check_parents=check_parents, )
[docs]def verify_read_marking_right(b: ItemOrBlock): if not has_read_marking_right(b): raise AccessDenied()
[docs]def verify_comment_right(b: ItemOrBlock): if not has_comment_right(b): raise AccessDenied()
[docs]def get_plugin_from_request( doc: Document, task_id: TaskId, u: UserContext, view_ctx: ViewContext, answernr: int | None = None, ) -> tuple[Document, Plugin]: assert doc.doc_id == task_id.doc_id orig_info = view_ctx.origin orig_doc_id, orig_par_id = ( (orig_info.doc_id, orig_info.par_id) if orig_info else (None, None) ) plug = find_plugin_from_document(doc, task_id, u, view_ctx) par_id = plug.par.get_id() if orig_doc_id is None or orig_par_id is None: if not doc.has_paragraph(par_id): raise RouteException("Plugin not found") return doc, plug if orig_doc_id != doc.doc_id: orig_doc = Document(orig_doc_id) else: orig_doc = doc orig_doc.insert_preamble_pars() try: orig_par = orig_doc.get_paragraph(orig_par_id) except TimDbException: raise PluginException(f"Plugin paragraph not found: {orig_par_id}") pars = dereference_pars([orig_par], context_doc=orig_doc, view_ctx=view_ctx) ctx_doc = ( orig_doc if ( not orig_doc.get_docinfo().is_original_translation and orig_par.is_translation() ) else doc ) for p in pars: if p.get_id() == par_id: if answernr is not None: p.answer_nr = answernr return ctx_doc, maybe_get_plugin_from_par(p, task_id, u, view_ctx) return doc, plug
[docs]def get_origin_from_request() -> OriginInfo | None: ref_from = (request.get_json(silent=True) or {}).get("ref_from") or {} doc_id = ref_from.get( "docId", get_option(request, "ref_from_doc_id", default=None, cast=int) ) par_id = ref_from.get("par", get_option(request, "ref_from_par_id", default=None)) return ( OriginInfo(doc_id=doc_id, par_id=par_id) if doc_id is not None and par_id is not None else None )
[docs]@dataclass class TaskAccessVerification: plugin: Plugin access: BlockAccess is_expired: bool # True if grace period is allowed and the current time is within the grace period. is_invalid: bool = ( False # user has access, but any possible answers are deemed invalid ) invalidate_reason: str | None = None
[docs]def verify_task_access( d: DocInfo, task_id: TaskId, access_type: AccessType, required_task_access_level: TaskIdAccess, context_user: UserContext, view_ctx: ViewContext, allow_grace_period: bool = False, answernr: int | None = None, ) -> TaskAccessVerification: assert d.id == task_id.doc_id doc, found_plugin = get_plugin_from_request( d.document, task_id, context_user, view_ctx, answernr ) access = verify_access( doc.get_docinfo(), access_type, require=False, user=context_user.logged_user ) is_expired = False if not access: if not allow_grace_period: raise AccessDenied(f"No access for task {d.id}.{task_id.task_name}") access = verify_access( doc.get_docinfo(), access_type, grace_period=doc.get_settings().answer_grace_period(), ) is_expired = True ctx_user_teacher_access = context_user.logged_user.has_teacher_access( doc.get_docinfo() ) if ( found_plugin.task_id.access_specifier == TaskIdAccess.ReadOnly and required_task_access_level == TaskIdAccess.ReadWrite and not ctx_user_teacher_access ): raise AccessDenied( f"This task/field {task_id.task_name} is readonly and thus only writable for teachers." ) is_invalid = False invalidate_reason = None if ( found_plugin.is_timed() and not ctx_user_teacher_access and required_task_access_level == TaskIdAccess.ReadWrite ): found_plugin.set_access_end_for_user(user=context_user.logged_user) if found_plugin.access_end_for_user: if found_plugin.access_end_for_user < get_current_time(): is_invalid = True invalidate_reason = "Your access to this task has expired." else: is_invalid = True invalidate_reason = "You haven't started this task yet." return TaskAccessVerification( plugin=found_plugin, access=access, is_expired=is_expired, is_invalid=is_invalid, invalidate_reason=invalidate_reason, )
[docs]def grant_access_to_session_users(i: ItemOrBlock): for u in get_other_users_as_list(): grant_access( User.get_by_id(int(u["id"])).get_personal_group(), i, AccessType.manage )
[docs]def reset_request_access_cache(): del_attr_if_exists(g, "manageable") del_attr_if_exists(g, "viewable") del_attr_if_exists(g, "teachable") del_attr_if_exists(g, "see_answers") del_attr_if_exists(g, "owned") del_attr_if_exists(g, "editable")
[docs]def del_attr_if_exists(obj, attr_name: str): if hasattr(obj, attr_name): delattr(obj, attr_name)
[docs]def can_see_par_source(u: User, p: DocParagraph): d = p.doc.get_docinfo() if u.has_copy_access(d): return True if not u.has_view_access(d): return False if not p.is_plugin() and not p.has_plugins(): return True return False
[docs]class AccessDenied(Exception): pass
[docs]def get_single_view_access(i: Item, allow_group: bool = False) -> BlockAccess: u = get_current_user_object() accs: list[BlockAccess] = ( u.get_personal_group().accesses.filter_by(block_id=i.id).all() ) if not accs and allow_group: lig = UserGroup.get_logged_in_group() ugroups = {gid for gid, in u.groups_dyn.with_entities(UserGroup.id).all()} for (ugid, act), acc in i.block.accesses.items(): if (ugid == lig.id or ugid in ugroups) and act == AccessType.view.value: new_acc = u.grant_access( i, AccessType.view, accessible_to=acc.accessible_to, duration_from=acc.duration_from, duration_to=acc.duration_to, duration=acc.duration, ) accs.append(new_acc) if not accs: raise AccessDenied(f"No access found for {i.path}.") if len(accs) > 1: raise AccessDenied(f"Multiple accesses found for {i.path}.") acc = accs[0] if acc.access_type != AccessType.view: raise AccessDenied( f"Access type is {acc.access_type.name} instead of view in {i.path}." ) if acc.expired: raise AccessDenied(f"Access is already expired for {i.path}.") return acc
[docs]def is_allowed_ip() -> bool: ip_allowlist = current_app.config["IP_BLOCK_ALLOWLIST"] if ip_allowlist is None: return True return any( ipaddress.ip_address(request.remote_addr) in network for network in ip_allowlist )
[docs]def is_blocked_ip() -> bool: ip = request.remote_addr fp = get_ipblocklist_path() try: with fp.open("r") as f: ip_blocklist = f.read() except FileNotFoundError: return False ip_lines = ip_blocklist.splitlines() return ip in ip_lines
[docs]def get_ipblocklist_path() -> Path: return Path(current_app.config["FILES_PATH"]) / "ipblocklist"
[docs]def verify_ip_ok(user: User | None, msg: str = "IPNotAllowed"): if (not user or not user.is_admin) and not is_allowed_ip(): username = user.name if user else "Anonymous" cfg = current_app.config ip_block_log_only = cfg["IP_BLOCK_LOG_ONLY"] is_in_blocklist = is_blocked_ip() should_block = not ip_block_log_only or is_in_blocklist blocked_or_allowed = "blocked" if should_block else "allowed" log_warning( f"IP {request.remote_addr} outside allowlist ({username}) - {blocked_or_allowed}" ) msg_end = "Request was blocked." if should_block else "Request was allowed." reply_tos = [cfg["ERROR_EMAIL"]] if user and user.email: reply_tos.append(user.email) if not is_in_blocklist: send_email( rcpt=cfg["ERROR_EMAIL"], subject=f'{cfg["TIM_HOST"]}: ' f"IP {request.remote_addr} outside allowlist ({username}) " f"- {blocked_or_allowed}", mail_from=cfg["WUFF_EMAIL"], reply_to=",".join(reply_tos), msg=f""" IP {request.remote_addr} was outside allowlist. URL: {request.url} User: {username} {msg_end} """.strip(), ) if should_block: raise AccessDenied(msg)