Source code for timApp.messaging.timMessage.routes

import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

from flask import Response
from isodate import datetime_isoformat
from sqlalchemy import tuple_
from sqlalchemy.orm import contains_eager

from timApp.auth.accesshelper import (
    verify_edit_access,
    verify_manage_access,
    verify_admin,
)
from timApp.auth.accesshelper import verify_logged_in
from timApp.auth.accesstype import AccessType
from timApp.auth.sessioninfo import (
    get_current_user_object,
    logged_in,
)
from timApp.document.create_item import create_document
from timApp.document.docentry import DocEntry
from timApp.document.docinfo import DocInfo
from timApp.document.documents import import_document_from_file
from timApp.document.hide_names import is_hide_names
from timApp.document.translation.translation import Translation
from timApp.document.viewcontext import default_view_ctx
from timApp.folder.createopts import FolderCreationOptions
from timApp.folder.folder import Folder
from timApp.item.item import Item
from timApp.item.manage import TRASH_FOLDER_PATH
from timApp.messaging.messagelist.messagelist_models import (
    MessageListModel,
    MessageListTimMember,
)
from timApp.messaging.timMessage.internalmessage_models import (
    InternalMessage,
    DisplayType,
    InternalMessageDisplay,
    InternalMessageReadReceipt,
)
from timApp.timdb.sqa import db
from timApp.user.user import User
from timApp.user.usergroup import UserGroup
from timApp.user.usergroupmember import UserGroupMember
from timApp.util.flask.requesthelper import NotExist, RouteException
from timApp.util.flask.responsehelper import (
    ok_response,
    json_response,
    text_response,
    csv_string,
)
from timApp.util.flask.typedblueprint import TypedBlueprint
from timApp.util.utils import (
    remove_path_special_chars,
    static_tim_doc,
    get_current_time,
)

timMessage = TypedBlueprint("timMessage", __name__, url_prefix="/timMessage")


[docs]@dataclass class MessageOptions: # Options regarding TIM messages messageChannel: bool important: bool isPrivate: bool archive: bool pageList: str readReceipt: bool reply: bool sender: str senderEmail: str repliesTo: int | None = None expires: datetime | None = None
[docs]@dataclass class ReplyOptions: archive: bool messageChannel: bool pageList: str recipient: str readReceipt: bool = True repliesTo: int | None = None
[docs]@dataclass class MessageBody: messageBody: str messageSubject: str recipients: list[str] | None = None
[docs]@dataclass class TimMessageData: id: int sender: str | None doc_path: str can_mark_as_read: bool can_reply: bool display_type: DisplayType message_body: str message_subject: str
[docs]@dataclass class TimMessageReadReceipt: message_id: int user_id: int marked_as_read_on: datetime can_mark_as_read: bool
[docs]@timMessage.get("/get") def get_global_messages() -> Response: """ Retrieve global messages return them in json format. :return: List of TIM messages to display """ return json_response(get_tim_messages_as_list())
[docs]@timMessage.get("/get/<int:item_id>") def get_tim_messages(item_id: int) -> Response: """ Retrieve messages displayed for current based on item id and return them in json format. :param item_id: Identifier for document or folder where message is displayed :return: List of TIM messages to display """ return json_response(get_tim_messages_as_list(item_id))
[docs]@timMessage.post("/expire/<int:message_doc_id>") def expire_tim_message(message_doc_id: int) -> Response: """ Expire a TIM message. :param message_doc_id: Document ID of the message to expire. :return: OK response if message was successfully expired. """ internal_message: InternalMessage | None = InternalMessage.query.filter_by( doc_id=message_doc_id ).first() if not internal_message: raise NotExist("Message not found") verify_manage_access(internal_message.block) internal_message.expires = get_current_time() db.session.commit() return ok_response()
[docs]def get_tim_messages_as_list(item_id: int | None = None) -> list[TimMessageData]: """ Retrieve messages displayed for current user based on item id and return them as a list. :param item_id: Identifier for document or folder where message is displayed. If None, global messages are returned. :return: List of TIM messages to display """ # TODO: Add logic for anon users to see and hide global messages if not logged_in(): return [] now = get_current_time() is_global = (InternalMessageDisplay.usergroup_id == None) & ( InternalMessageDisplay.display_doc_id == None ) is_user_specific = False can_see = (InternalMessageReadReceipt.marked_as_read_on == None) & ( (InternalMessage.expires == None) | (InternalMessage.expires > now) ) if item_id is not None: current_page_obj = DocEntry.find_by_id(item_id) if isinstance(current_page_obj, Translation): # Resolve to original file instead of translation file current_page_obj = current_page_obj.docentry if not current_page_obj: current_page_obj = Folder.get_by_id(item_id) if not current_page_obj: raise NotExist("No document or folder found") parent_paths = current_page_obj.parent_paths() # parent folders group_ids = get_current_user_object().group_ids is_user_specific = (InternalMessageDisplay.usergroup_id.in_(group_ids)) & ( (InternalMessageDisplay.display_doc_id == current_page_obj.id) | (tuple_(Folder.location, Folder.name).in_(parent_paths)) ) cur_user = get_current_user_object() q = ( InternalMessage.query.join(InternalMessageDisplay) .outerjoin(Folder, Folder.id == InternalMessageDisplay.display_doc_id) .outerjoin( InternalMessageReadReceipt, (InternalMessageReadReceipt.message_id == InternalMessage.id) # Do this in outer join because message can be seen if no receipt is found or receipt is not marked as read # With outer join, both cases are covered (marked_as_read_on becomes NULL) & (InternalMessageReadReceipt.user_id == cur_user.id), ) .options(contains_eager(InternalMessage.readreceipts)) .filter((is_global | is_user_specific) & can_see) ) messages: list[InternalMessage] = q.all() full_messages = [] for message in messages: document = DocEntry.find_by_id(message.doc_id) if not document: raise NotExist(f"No document or folder found for the message {message.id}") sender = document.owners[0].name if document.owners else None if document.path.startswith(TRASH_FOLDER_PATH): continue body = document.document.get_paragraph(message.par_id).get_html( default_view_ctx ) data = TimMessageData( id=message.id, sender=sender, doc_path=document.path, can_mark_as_read=message.can_mark_as_read, can_reply=message.reply, display_type=message.display_type, message_body=body, message_subject=document.title, ) full_messages.append(data) if message.readreceipts: # Note: previous contains_eager will force message.readreceipts to contain only receipt of current user for read_receipt in message.readreceipts: read_receipt.last_seen = now else: db.session.add( InternalMessageReadReceipt( message=message, user=cur_user, last_seen=now ) ) db.session.commit() return full_messages
[docs]@timMessage.get("/get_read_receipt/<int:doc_id>") def get_read_receipt(doc_id: int) -> Response: """ Retrieve read receipt object for the current user and message related to the given document id :param doc_id: Id of the message document :return: """ message = InternalMessage.query.filter_by(doc_id=doc_id).first() if not message: raise NotExist("No active messages for the document found") receipt = InternalMessageReadReceipt.get_for_user( get_current_user_object(), message ) if not receipt: return json_response({"receipt": None, "expires": message.expires}) receipt_data = TimMessageReadReceipt( message_id=message.id, user_id=receipt.user_id, marked_as_read_on=receipt.marked_as_read_on, can_mark_as_read=message.can_mark_as_read, ) return json_response({"receipt": receipt_data, "expires": message.expires})
# Regex pattern for url verification. URL_PATTERN = re.compile( r"https?://[a-z0-9.-]*/(show_slide|view|teacher|velp|answers|lecture|review|slide)/" )
[docs]@timMessage.post("/url_check") def check_urls(urls: str) -> Response: """ Checks if given URLS's exist in TIM and that user has right to post TIM message to them :param urls: Urls where user wishes to post TIM message :return: Shortened urls to show the user in the UI, or an error message """ url_list = list( filter(None, urls.splitlines()) ) # turn URL string into a list with empty values (new lines) removed valid_urls: list[str] = [] error_message: str = "" status_code: int for url in url_list: url = url.strip() # remove leading and trailing whitespaces if url.endswith("/"): url = url[:-1] hashtag_index = url.find("#") # remove anchors if hashtag_index != -1: url = url[:hashtag_index] if URL_PATTERN.search(url): # check if url matches the TIM urls' pattern shortened_url = URL_PATTERN.sub("", url) else: shortened_url = url document = DocEntry.find_by_path(shortened_url) # check if url exists in TIM if document is None: document = Folder.find_by_path(shortened_url) if document is None: error_message = url + " was not found in TIM" status_code = 404 break try: # check if user has permission to edit the url verify_edit_access(document) valid_urls.append(shortened_url) except Exception: error_message = "You don't have permission to post TIM message to " + url status_code = 401 if error_message: return json_response({"error": error_message}, status_code) else: valid_urls_string = "\n".join(valid_urls) # turn URL list into a string again return json_response({"shortened_urls": valid_urls_string}, 200)
[docs]@timMessage.post("/send") def send_tim_message(message: MessageBody, options: MessageOptions) -> Response: is_global = message.recipients is None if is_global: verify_admin() options.messageChannel = False options.isPrivate = True options.archive = False options.pageList = "" options.readReceipt = True options.reply = False options.sender = "" options.senderEmail = "" return send_message_or_reply(message, options)
[docs]def send_message_or_reply(message: MessageBody, options: MessageOptions) -> Response: """ Creates a new TIM message and saves it to database. :param options: Options related to the message :param message: Message subject, contents and sender :return: """ verify_logged_in() tim_message = InternalMessage( can_mark_as_read=options.readReceipt, reply=options.reply, expires=options.expires, replies_to=options.repliesTo, ) recipients = get_recipient_users(message.recipients) message_doc = create_tim_message(tim_message, options, message, recipients) db.session.add(tim_message) pages = get_display_pages(options.pageList.splitlines()) create_message_displays(tim_message, pages, recipients) db.session.commit() return json_response({"docPath": message_doc.path})
[docs]def create_tim_message( tim_message: InternalMessage, options: MessageOptions, message_body: MessageBody, message_viewers: list[UserGroup] | None = None, ) -> DocInfo: """ Creates a TIM document for the message to the TIM messages folder at TIM's root. :param tim_message: InternalMessage object :param options: Options related to the message :param message_body: Message subject, contents and list of recipients :param message_viewers: Groups that are allowed to view the message. If None, all recepients can. :return: The created Document object """ recipients = ( message_viewers if message_viewers is not None else get_recipient_users(message_body.recipients) ) is_global = message_body.recipients is None sender = get_current_user_object() message_folder_path = "messages/tim-messages" message_subject = message_body.messageSubject timestamp = datetime.now() # add timestamp to document path to make it unique message_path = remove_path_special_chars(f"{timestamp}-{message_subject}") check_messages_folder_path("messages", message_folder_path) if is_global: message_folder_path += "/global" message_doc = create_document( f"{message_folder_path}/{message_path}", message_subject, doc_owner=( UserGroup.get_admin_group() if is_global else sender.get_personal_group() ), ) if recipients: message_doc.block.add_rights(recipients, AccessType.view) elif is_global: message_doc.block.add_rights([UserGroup.get_anonymous_group()], AccessType.view) update_tim_msg_doc_settings( message_doc, sender if not is_global else None, message_body ) message_par = message_doc.document.add_paragraph(message_body.messageBody) message_doc.document.add_paragraph( "<manage-read-receipt></manage-read-receipt>", attrs={"allowangular": "true"} ) tim_message.block = message_doc.block tim_message.par_id = message_par.get_id() tim_message.display_type = ( DisplayType.STICKY if options.important else DisplayType.TOP_OF_PAGE ) return message_doc
[docs]@timMessage.post("/reply") def reply_to_tim_message(options: ReplyOptions, message: MessageBody) -> Response: message_options = MessageOptions( options.messageChannel, False, True, options.archive, options.pageList, options.readReceipt, False, get_current_user_object().name, get_current_user_object().email, options.repliesTo, ) if not message.recipients: raise RouteException("Reply requires a recipient") recipient = User.get_by_name(message.recipients.pop()) if recipient: recipient_email = recipient.email else: raise NotExist("Recipient not found") message = MessageBody( message.messageBody, message.messageSubject, [recipient_email] ) return send_message_or_reply(message, message_options)
[docs]@timMessage.post("/mark_as_read") def mark_as_read(message_id: int) -> Response: """ Marks given message as read in database. Expects that message receiver and marker are the same person. :param message_id: Id of given message :return: """ verify_logged_in() message = InternalMessage.query.filter_by(id=message_id).first() if not message: raise NotExist("Message not found by the ID") read_receipt = InternalMessageReadReceipt.get_for_user( get_current_user_object(), message ) u = get_current_user_object() if read_receipt is None: read_receipt = InternalMessageReadReceipt(user=u, message=message) db.session.add(read_receipt) read_receipt.user = u read_receipt.marked_as_read_on = get_current_time() db.session.commit() return ok_response()
[docs]@timMessage.post("/cancel_read_receipt") def cancel_read_receipt(message_id: int) -> Response: """ Removes read receipt date and the user who marked it from the database entry. :param message_id: Message identifier :return: """ verify_logged_in() receipt = InternalMessageReadReceipt.query.filter_by( user_id=get_current_user_object().id, message_id=message_id ).first() if not receipt: raise NotExist("No read receipt found for the message") receipt.marked_as_read_on = None db.session.commit() return ok_response()
[docs]class ReadReceiptFormat(Enum): CSV = "csv" TableFormQuery = "tableform-query"
[docs]@timMessage.get("/readReceipts") def get_read_receipts( message_doc: int, include_read: bool = False, include_unread: bool = False, separator: str = ";", receipt_format: ReadReceiptFormat = field( metadata={"by_value": True}, default=ReadReceiptFormat.CSV ), ) -> Response: verify_logged_in() doc = DocEntry.find_by_id(message_doc) if not doc: raise NotExist("No document found") verify_manage_access(doc) read_users = ( db.session.query( InternalMessageReadReceipt.user_id, InternalMessageReadReceipt.marked_as_read_on, InternalMessageReadReceipt.last_seen, ) .join(InternalMessage) .filter(InternalMessage.doc_id == doc.id) ) read_user_map: dict[int, datetime] = { user_id: read_time for user_id, read_time, _ in read_users if read_time } last_seen_user_map: dict[int, datetime] = { user_id: last_seen for user_id, _, last_seen in read_users if last_seen } all_recipients = ( User.query.join(UserGroupMember, User.active_memberships) .join( InternalMessageDisplay, InternalMessageDisplay.usergroup_id == UserGroupMember.usergroup_id, ) .join(InternalMessage) .filter(InternalMessage.doc_id == doc.id) ).all() if not all_recipients: if include_unread: raise RouteException( "For performance reasons, only read users can be shown for global messages" ) all_recipients = User.query.filter(User.id.in_(read_user_map.keys())).all() data = [["id", "email", "user_name", "real_name", "read_on", "last_seen"]] for i, u in enumerate(all_recipients): read_time = "" last_seen_time = "" if u.id in read_user_map: if not include_read: continue read_time = datetime_isoformat(read_user_map[u.id]) elif not include_unread: continue if u.id in last_seen_user_map: last_seen_time = datetime_isoformat(last_seen_user_map[u.id]) if not is_hide_names(): data.append([u.id, u.email, u.name, u.real_name, read_time, last_seen_time]) else: data.append( [ str(i), f"user_{i}@noreply", f"user{i}", f"User {i}", read_time, last_seen_time, ] ) if receipt_format == ReadReceiptFormat.TableFormQuery: return text_response("|".join([u[2] for u in data[1:]])) return text_response(csv_string(data, "excel", separator))
[docs]def get_recipient_users(recipients: list[str] | None) -> list[UserGroup]: """ Finds UserGroup objects of recipients based on their email :param recipients: list of recipients' emails :return: list of recipient UserGroups """ if not recipients: return [] users = set() for rcpt in recipients: if not rcpt: continue if user := User.get_by_email(rcpt): users.add(UserGroup.get_by_name(user.name)) if msg_list := MessageListModel.get_by_email(rcpt): q = UserGroup.query.join(MessageListTimMember).filter( (MessageListTimMember.message_list == msg_list) & (MessageListTimMember.membership_ended == None) ) ugs = q.all() users.update(ugs) return list(users)
[docs]def get_display_pages(pagelist: list[str]) -> list[Item]: """ Finds folders and documents based on their paths. :param pagelist: list of paths :return: list of folders and documents """ pages: list[Item] = [] for page in pagelist: folder = Folder.find_by_path(page) if folder: pages.append(folder) continue doc = DocEntry.find_by_path(page) if doc: pages.append(doc) return pages
[docs]def check_messages_folder_path( msg_folder_path: str, tim_msg_folder_path: str ) -> Folder: """ Checks if the /messages/tim-messages folder exists and if not, creates it. All users get view access to /messages folder and edit access to /messages/tim-messages folder so that documents for sent messages can be created. Also creates the preamble for message documents. :param msg_folder_path: path for /messages :param tim_msg_folder_path: path for /messages/tim-messages :return: /messages/tim-messages folder """ msg_folder = Folder.find_by_location(msg_folder_path, "messages") admin_group = UserGroup.get_admin_group() if not msg_folder: msg_folder = Folder.create( msg_folder_path, admin_group, title="Messages", creation_opts=FolderCreationOptions(apply_default_rights=True), ) msg_block = msg_folder.block if msg_block: msg_block.add_rights([UserGroup.get_logged_in_group()], AccessType.view) tim_msg_folder = Folder.find_by_location(tim_msg_folder_path, "tim-messages") if not tim_msg_folder: tim_msg_folder = Folder.create( tim_msg_folder_path, admin_group, title="TIM messages", creation_opts=FolderCreationOptions(apply_default_rights=True), ) tim_msg_block = tim_msg_folder.block if tim_msg_block: tim_msg_block.add_rights([UserGroup.get_logged_in_group()], AccessType.edit) tim_msg_preambles = Folder.find_by_location( f"{tim_msg_folder_path}/templates/preambles", "preambles" ) if not tim_msg_preambles: tim_msg_templates = Folder.create( f"{tim_msg_folder_path}/templates", admin_group, title="templates", creation_opts=FolderCreationOptions(apply_default_rights=True), ) tim_msg_preambles = Folder.create( f"{tim_msg_folder_path}/templates/preambles", admin_group, title="preambles", creation_opts=FolderCreationOptions(apply_default_rights=True), ) tim_msg_templates_block = tim_msg_templates.block if tim_msg_templates_block: tim_msg_templates_block.add_rights( [UserGroup.get_logged_in_group()], AccessType.view ) tim_msg_preambles_block = tim_msg_preambles.block if tim_msg_preambles_block: tim_msg_preambles_block.add_rights( [UserGroup.get_logged_in_group()], AccessType.view ) preamble_path = f"{tim_msg_folder_path}/templates/preambles/preamble" tim_msg_preamble = DocEntry.find_by_path(preamble_path) if not tim_msg_preamble: tim_msg_preamble = import_document_from_file( static_tim_doc("initial/tim_msg_preamble.md"), preamble_path, admin_group, title="preamble", ) tim_msg_preamble.block.add_rights( [UserGroup.get_logged_in_group()], AccessType.view ) return tim_msg_folder
[docs]def update_tim_msg_doc_settings( message_doc: DocInfo, sender: User | None, message_body: MessageBody ) -> None: """ Sets the message information into the preamble macros. :param message_doc: TIM message document :param sender: Sender user :param message_body: Message body :return: """ s = message_doc.document.get_settings().get_dict().get("macros", {}) s["subject"] = message_body.messageSubject if sender: s["sendername"] = sender.name s["senderemail"] = sender.email s["recipients"] = message_body.recipients message_doc.document.add_setting("macros", s)
[docs]def create_message_displays( msg: InternalMessage, pages: list[Item], recipients: list[UserGroup] ) -> None: """ Creates InternalMessageDisplay entries for all recipients and display pages. :param msg: Message :param pages: List of pages where message is displayed :param recipients: List of message recipients :return: """ if pages and recipients: for page in pages: for rcpt in recipients: display = InternalMessageDisplay( message=msg, usergroup=rcpt, display_block=page.block ) db.session.add(display) elif pages and not recipients: for page in pages: display = InternalMessageDisplay(message=msg, display_block=page.block) db.session.add(display) elif not pages and recipients: for rcpt in recipients: display = InternalMessageDisplay(message=msg, usergroup=rcpt) db.session.add(display) elif not pages and not recipients: display = InternalMessageDisplay(message=msg) db.session.add(display)