"""
Routes related to tags.
"""
from dataclasses import dataclass, field
from datetime import datetime
from flask import request, Response
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import UnmappedInstanceError, FlushError # type: ignore
from timApp.auth.accesshelper import (
verify_view_access,
verify_manage_access,
check_admin_access,
)
from timApp.auth.sessioninfo import get_current_user_object
from timApp.document.docentry import DocEntry, get_documents
from timApp.document.docinfo import DocInfo
from timApp.item.block import Block
from timApp.item.tag import Tag, TagType, GROUP_TAG_PREFIX
from timApp.timdb.sqa import db
from timApp.user.groups import verify_group_view_access
from timApp.user.special_group_names import TEACHERS_GROUPNAME
from timApp.user.usergroup import UserGroup
from timApp.util.flask.requesthelper import (
get_option,
RouteException,
NotExist,
)
from timApp.util.flask.responsehelper import ok_response, json_response
from timApp.util.flask.typedblueprint import TypedBlueprint
tags_blueprint = TypedBlueprint("tags", __name__, url_prefix="/tags")
[docs]@dataclass
class TagInfo:
name: str
type: TagType = field(metadata={"by_value": True})
expires: datetime | None = None
block_id: int | None = None
[docs] def to_tag(self) -> Tag:
return Tag(name=self.name, type=self.type, expires=self.expires)
[docs]@tags_blueprint.post("/add/<path:doc>")
def add_tag(doc: str, tags: list[TagInfo]) -> Response:
"""
Adds a tag-document entry into the database.
:param doc: The target document.
:param tags: Tags to add.
:returns Tag adding success response.
"""
d = DocEntry.find_by_path(doc)
if not d:
raise NotExist()
verify_manage_access(d)
add_tags(d, tags)
return commit_and_ok()
[docs]def commit_and_ok() -> Response:
try:
db.session.commit()
except (IntegrityError, FlushError):
db.session.rollback()
raise RouteException("Tag name is already in use.")
return ok_response()
[docs]def check_tag_access(tag: Tag, check_group: bool = True) -> None:
"""
Checks whether the user is allowed to make changes to the tag type.
If not allowed, gives abort response.
:param check_group: Whether to check group tag right. Should be false when deleting a tag.
:param tag: The tag to check.
"""
if tag.type != TagType.Regular:
ug = UserGroup.get_by_name(TEACHERS_GROUPNAME)
if ug not in get_current_user_object().groups and not check_admin_access():
raise RouteException(
f"Managing this tag requires admin or {TEACHERS_GROUPNAME} rights."
)
groupname = tag.get_group_name()
if groupname and check_group:
ug = UserGroup.get_by_name(groupname)
if not ug:
raise NotExist(f'Usergroup "{groupname}" not found.')
verify_group_view_access(ug)
[docs]@tags_blueprint.post("/edit/<path:doc>")
def edit_tag(doc: str, old_tag: TagInfo, new_tag: TagInfo) -> Response:
"""
Replaces a tag-document entry in the database with new one.
:param doc: The target document.
:param old_tag: Tag to remove
:param new_tag: Tag to replace old tag with
:returns Edit success response.
"""
d = DocEntry.find_by_path(doc)
if not d:
raise NotExist()
verify_manage_access(d)
# If commas in the name, use first part.
new_tag_name = new_tag.name.split(",", 1)[0]
new_tag_obj = Tag(name=new_tag_name, expires=new_tag.expires, type=new_tag.type)
old_tag_obj = Tag.query.filter_by(
block_id=d.id, name=old_tag.name, type=old_tag.type
).first()
if not old_tag_obj:
raise RouteException("Tag to edit not found.")
check_tag_access(old_tag_obj)
check_tag_access(new_tag_obj)
try:
db.session.delete(old_tag_obj)
d.block.tags.append(new_tag_obj)
db.session.commit()
except (IntegrityError, FlushError):
raise RouteException("Tag editing failed! New tag name may already be in use")
return ok_response()
[docs]@tags_blueprint.post("/remove/<path:doc>")
def remove_tag(doc: str, tag: TagInfo) -> Response:
"""
Removes a tag-document entry from the database.
:param doc: The target document.
:param tag: Tag to remove
:returns Removal success response.
"""
d = DocEntry.find_by_path(doc)
if not d:
raise NotExist()
verify_manage_access(d)
tag_obj = Tag.query.filter_by(block_id=d.id, name=tag.name, type=tag.type).first()
if not tag_obj:
raise RouteException("Tag not found.")
check_tag_access(tag_obj)
try:
db.session.delete(tag_obj)
db.session.commit()
except (IntegrityError, UnmappedInstanceError):
raise RouteException("Tag removal failed.")
return ok_response()
[docs]@tags_blueprint.get("/getDocs")
def get_tagged_documents() -> Response:
"""
Gets a list of documents that have a certain tag.
Options:
- Search exact or partial words.
- Case sensitivity
- Get all other tags in the document as well or don't fetch them.
:returns Response containing list of documents with the searched tag.
"""
tag_name = request.args.get("name", "")
exact_search = get_option(request, "exact_search", default=False, cast=bool)
list_doc_tags = get_option(request, "list_doc_tags", default=False, cast=bool)
case_sensitive = get_option(request, "case_sensitive", default=False, cast=bool)
if exact_search:
if case_sensitive:
custom_filter = DocEntry.id.in_(
Tag.query.filter_by(name=tag_name).with_entities(Tag.block_id)
)
else:
custom_filter = DocEntry.id.in_(
Tag.query.filter(
func.lower(Tag.name) == func.lower(tag_name)
).with_entities(Tag.block_id)
)
else:
tag_name = f"%{tag_name}%"
if case_sensitive:
custom_filter = DocEntry.id.in_(
Tag.query.filter(
Tag.name.like(tag_name)
& ((Tag.expires > datetime.now()) | (Tag.expires == None))
).with_entities(Tag.block_id)
)
else:
custom_filter = DocEntry.id.in_(
Tag.query.filter(
Tag.name.ilike(tag_name)
& ((Tag.expires > datetime.now()) | (Tag.expires == None))
).with_entities(Tag.block_id)
)
if list_doc_tags:
query_options = joinedload(DocEntry._block).joinedload(Block.tags)
else:
query_options = None
docs = get_documents(
filter_user=get_current_user_object(),
custom_filter=custom_filter,
query_options=query_options,
)
return json_response(docs, date_conversion=True)
[docs]@tags_blueprint.get("/getDoc/<int:doc_id>")
def get_tagged_document_by_id(doc_id: int) -> Response:
"""
Gets a document and its tags by id.
:param doc_id: Searched document id.
:return: A DocEntry with tags.
"""
docs = get_documents(
filter_user=get_current_user_object(),
custom_filter=DocEntry.id.in_([doc_id]),
query_options=joinedload(DocEntry._block).joinedload(Block.tags),
)
if not docs:
raise NotExist("Document not found or not accessible!")
return json_response(docs[0], date_conversion=True)