Source code for timApp.messaging.messagelist.routes

from dataclasses import dataclass
from datetime import datetime
from typing import Any

from flask import Response, render_template
from sqlalchemy.orm import load_only

from timApp.auth.accesshelper import (
    verify_logged_in,
    has_manage_access,
    get_doc_or_abort,
    verify_manage_access,
    verify_view_access,
)
from timApp.auth.sessioninfo import get_current_user_object
from timApp.document.docentry import DocEntry
from timApp.document.docinfo import move_document, DocInfo
from timApp.document.viewcontext import default_view_ctx
from timApp.folder.folder import Folder
from timApp.item.manage import get_trash_folder
from timApp.messaging.messagelist.emaillist import (
    create_new_email_list,
    delete_email_list,
    verify_emaillist_name_requirements,
    get_domain_names,
    verify_mailman_connection,
)
from timApp.messaging.messagelist.emaillist import get_email_list_by_name
from timApp.messaging.messagelist.listinfo import (
    ListInfo,
    MemberInfo,
    GroupAndMembers,
    ArchiveType,
)
from timApp.messaging.messagelist.messagelist_models import MessageListModel
from timApp.messaging.messagelist.messagelist_utils import (
    verify_messagelist_name_requirements,
    new_list,
    set_message_list_notify_owner_on_change,
    set_message_list_member_can_unsubscribe,
    set_message_list_subject_prefix,
    set_message_list_tim_users_can_join,
    set_message_list_default_send_right,
    set_message_list_default_delivery_right,
    set_message_list_only_text,
    set_message_list_non_member_message_pass,
    set_message_list_allow_attachments,
    set_message_list_default_reply_type,
    add_new_message_list_group,
    add_message_list_external_email_member,
    set_message_list_member_removed_status,
    set_member_send_delivery,
    set_message_list_description,
    set_message_list_info,
    check_name_rules,
    verify_can_create_lists,
    MESSAGE_LIST_ARCHIVE_FOLDER_PREFIX,
)
from timApp.timdb.sqa import db
from timApp.user.usergroup import UserGroup
from timApp.util.flask.requesthelper import RouteException, NotExist
from timApp.util.flask.responsehelper import json_response, ok_response
from timApp.util.flask.typedblueprint import TypedBlueprint
from timApp.util.logger import log_error
from timApp.util.utils import is_valid_email, get_current_time, title_to_id

messagelist = TypedBlueprint("messagelist", __name__, url_prefix="/messagelist")


[docs]@messagelist.post("/checkname") def check_name(name: str) -> Response: verify_logged_in() verify_can_create_lists() rule_fails = check_name_rules(name) exists = MessageListModel.name_exists(name) return json_response({"rule_fails": list(rule_fails), "exists": exists})
[docs]@messagelist.post("/createlist") def create_list(options: ListInfo) -> Response: """Handles creating a new message list. :param options All options necessary for establishing a new message list. :return: A Response with the list's management doc included. This way the creator can re-directed to the list's management page directly. """ verify_logged_in() verify_can_create_lists() # Current user is set as the default owner. owner = get_current_user_object() # Options object is given directly to new_list, so we don't want to use temporary variable for stripped name. options.name = options.name.strip() test_name(options.name) manage_doc, message_list = new_list(options) if options.domain: verify_mailman_connection() create_new_email_list(options, owner) # Add the domain to a message list only after the email list has been created. This way if the list creation # fails, we have indication that the list does not have an email list attached to it. message_list.email_list_domain = options.domain set_message_list_subject_prefix(message_list, f"[{message_list.name}]") db.session.commit() return json_response(manage_doc)
[docs]def test_name(name_candidate: str) -> None: """Check new message list's name candidate's name. The name has to meet naming rules, it has to be not already be in use and it cannot be a reserved name. If the function retuns control to its caller, then name is viable to use for a message list. If at some point the name is not viable, then an exception is raised. :param name_candidate: The name candidate to check. """ normalized_name = name_candidate.strip() name, sep, domain = normalized_name.partition("@") verify_messagelist_name_requirements(name) if sep: # If character '@' is found, we check email list specific name requirements. verify_mailman_connection() verify_emaillist_name_requirements(name, domain)
[docs]@messagelist.get("/domains") def domains() -> Response: """Send possible domains for a client, if such exists. :return: If domains are configured, return them as an array. """ verify_mailman_connection() possible_domains: list[str] = get_domain_names() return json_response(possible_domains)
[docs]@messagelist.delete("/deletelist") def delete_list(listname: str, permanent: bool) -> Response: """Delete message and associated message channels. :param listname: The list to be deleted. :param permanent: A boolean flag indicating if the deletion is meant to be permanent. :return: A string describing how the operation went. """ # Check access rights. verify_logged_in() message_list = MessageListModel.from_name(listname) if not has_manage_access(message_list.block): raise RouteException( "You need at least a manage access to the list in order to do this action." ) # The amount of docentries a message list's block relationship refers to should be one. If not, something is # terribly wrong. if len(message_list.block.docentries) > 1: log_error( f"Message list '{listname}' has multiple docentries to its block relationship." ) raise RouteException( "Can't perform deletion at this time. The problem has been logged for admins." ) # Perform deletion. if permanent: # If the deletion is (more) permanent, move the admin doc to bin. manage_doc = get_doc_or_abort(message_list.manage_doc_id) trash_folder: Folder = get_trash_folder() move_document(manage_doc, trash_folder) # Set the db entry as removed message_list.removed = get_current_time() if message_list.email_list_domain: # A message list has a domain, so we are also looking to delete an email list. verify_mailman_connection() email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) delete_email_list(email_list, permanent_deletion=permanent) db.session.commit() return ok_response()
[docs]@messagelist.get("/getlist/<listname>") def get_list(listname: str) -> Response: """Get the information for a message list. :param listname: Name of the list :return: ListOptions with the list's information. """ if not listname: raise NotExist("No message list specified") verify_logged_in() msg_list = MessageListModel.from_name(listname) verify_manage_access(msg_list.block) return json_response(msg_list.to_info())
[docs]@messagelist.post("/save") def save_list_options(options: ListInfo) -> Response: """Save message list's options. :param options: The options to be saved. :return: OK response. """ # Check access rights. verify_logged_in() message_list = MessageListModel.from_name(options.name) if not has_manage_access(message_list.block): raise RouteException( "You need at least a manange access to the list in order to do this action." ) if message_list.archive_policy != options.archive: # TODO: If message list changes its archive policy, the members on the list need to be notified. Insert # messaging here. message_list.archive = options.archive set_message_list_description(message_list, options.list_description) set_message_list_info(message_list, options.list_info) set_message_list_notify_owner_on_change( message_list, options.notify_owners_on_list_change ) set_message_list_member_can_unsubscribe( message_list, options.members_can_unsubscribe ) set_message_list_subject_prefix(message_list, options.list_subject_prefix) set_message_list_only_text(message_list, options.only_text) set_message_list_allow_attachments(message_list, options.allow_attachments) set_message_list_tim_users_can_join(message_list, options.tim_users_can_join) set_message_list_non_member_message_pass( message_list, options.non_member_message_pass ) set_message_list_default_send_right(message_list, options.default_send_right) set_message_list_default_delivery_right( message_list, options.default_delivery_right ) set_message_list_default_reply_type(message_list, options.default_reply_type) db.session.commit() return ok_response()
[docs]@messagelist.post("/savemembers") def save_members(listname: str, members: list[MemberInfo]) -> Response: """Save the state of existing list members, e.g. send and delivery rights. :param listname: The name of the message list where the members will be saved. :param members: The members to be saved. :return: Response for the client. The Response is a simple ok_response(). """ # Check access rights. verify_logged_in() message_list = MessageListModel.from_name(listname) if not has_manage_access(message_list.block): raise RouteException( "You need at least a manage access to the list to do this action." ) email_list = None if message_list.email_list_domain: # If there is a domain configured for a list and the Mailman connection is not configured, we can't continue # at this time. verify_mailman_connection() email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) for member in members: db_member = message_list.find_member(member.username, member.email) # This if mostly guards against type errors, but what if we legitimely can't find them? They are given from # the db in the first place. Is there a reasonable way to communicate this state? if db_member: set_member_send_delivery( db_member, member.sendRight, member.deliveryRight, email_list=email_list ) set_message_list_member_removed_status( db_member, member.removed, email_list=email_list ) db.session.commit() return ok_response()
[docs]def parse_external_member(external_member_candidate: str) -> list[str] | None: """Parse the information of an external member. There are two supported ways to give external members. The user can write user.userington@domain.fi User Userington or User Userington <user.userington@domain.fi> :param external_member_candidate: A string represeting the external member. :return: Return a list of the form [email, name_part_1, name_part_2, ...] if parsing was successful. Otherwise return None. """ # Split the name candidate to a list for processing. words = external_member_candidate.strip().split(" ") # Check if the first word is the email. if is_valid_email(words[0]): return words # If the first word was not an email, then check if the email is at the last word, in angle brackets. open_bracket_index = words[-1].find("<") closing_bracket_index = words[-1].find(">") if open_bracket_index != -1 and closing_bracket_index != -1: # Remove angle brackets around the email. email = words[-1].strip("<").strip(">") if is_valid_email(email): return_list = [email] return_list.extend(words[0:-1]) return return_list # If we are here, then no applicable version of external member's information was given. return None
[docs]@messagelist.post("/addmember") def add_member( member_candidates: list[str], msg_list: str, send_right: bool, delivery_right: bool ) -> Response: """Add new members to a message list. :param member_candidates: Names of member candidates. :param msg_list: The message list where we are trying to add new members. :param send_right: The send right on a list for all the member candidates. :param delivery_right: The delivery right on a list for all the member candidates. :return: OK response. """ # Check access right. verify_logged_in() message_list = MessageListModel.from_name(msg_list) if not has_manage_access(message_list.block): raise RouteException( "You need at least a manage access to the list to do this action." ) # TODO: Implement checking whether or not users are just added to a list (like they are now) or they are invited # to a list (requires link generation and other things). em_list = None if message_list.email_list_domain is not None: verify_mailman_connection() em_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) for member_candidate in member_candidates: # For user groups and individual users. ug = UserGroup.get_by_name(member_candidate.strip()) if ug is not None: # The name belongs to a user group. add_new_message_list_group( message_list, ug, send_right, delivery_right, em_list ) # For external members. # If member candidate is not a user, or a user group, then we assume an external member. Add external members. external_member = parse_external_member(member_candidate) if external_member and em_list: if len(external_member) == 1: # There is no display name given for external member. dname = None else: # Construct an optional display name by joining all the other words given on the line that are not # the email address. Remove possible white space between the email address and the first part of a # display name. Other white space within the name is left as is. dname = " ".join(external_member[1:]).lstrip() add_message_list_external_email_member( message_list, external_member[0], send_right, delivery_right, em_list, display_name=dname, ) db.session.commit() return ok_response()
[docs]@messagelist.get("/getmembers/<list_name>") def get_members(list_name: str) -> Response: """Get members belonging to a certain list. :param list_name: The list where we are querying all the members. :return: All the members of a list. """ verify_logged_in() msg_list = MessageListModel.from_name(list_name) if not has_manage_access(msg_list.block): raise RouteException("You are not authorized to see the members of this list.") list_members = msg_list.members return json_response(list_members)
[docs]@messagelist.get("/getgroupmembers/<list_name>") def get_group_members(list_name: str) -> Response: """View function for getting members of groups that are on a message list. :param list_name: Message list. :return: All members of groups associated in a message list as a list of GroupAndMembers objects. """ from timApp.document.hide_names import is_hide_names hide_names = is_hide_names() # Check rights. verify_logged_in() message_list = MessageListModel.from_name(list_name) if not has_manage_access(message_list.block): raise RouteException( "Only an owner of this list can see the members of this group." ) # Get group. groups = [member for member in message_list.members if member.is_group()] # At this point we assume we have a user that is a TIM user group. groups_and_members = [] for group in groups: user_group: UserGroup = group.user_group # Create a MemberInfo object for every current user in the group. As these are current members of the user # group, removed is None. group_members = [ MemberInfo( name=user.real_name if not hide_names else f"User {user.id}", username=user.name if not hide_names else f"user{user.id}", email=user.email if not hide_names else "user@noreply", sendRight=group.send_right, deliveryRight=group.delivery_right, removed=None, ) for user in user_group.users ] gm = GroupAndMembers(groupName=user_group.name, members=group_members) groups_and_members.append(gm) return json_response(groups_and_members)
[docs]@messagelist.get("/archive/siblings/<int:message_doc_id>") def get_sibling_archive_messages(message_doc_id: int) -> Response: message_doc = DocEntry.find_by_id(message_doc_id) if not message_doc: raise RouteException("No document found") verify_view_access(message_doc) # Only allow jumping to archive messages that the user can view docs = message_doc.parent.get_all_documents( query_options=load_only(DocEntry.name, DocEntry.id), filter_user=get_current_user_object(), ) prev_doc = None next_doc = None for doc in docs: if doc.id == message_doc.id: continue if doc.id < message_doc.id and (not prev_doc or prev_doc.id < doc.id): prev_doc = doc if message_doc.id < doc.id and (not next_doc or doc.id < next_doc.id): next_doc = doc def to_json(d: DocInfo | None) -> dict[str, Any] | None: return ( { "title": d.title, "path": d.path, } if d else None ) return json_response({"next": to_json(next_doc), "prev": to_json(prev_doc)})
[docs]@dataclass(slots=True) class ArchivedMessage: anchor: str title: str date: str recipients: list[str] sender: str body: str
[docs]@messagelist.get("/archive/export/<list_name>") def export_archive(list_name: str) -> Response | str: """Export the archive as a simple HTML file.""" message_list = MessageListModel.from_name(list_name) if not has_manage_access(message_list.block): raise RouteException( "You need at least a manage access to the list to do this action." ) if message_list.archive == ArchiveType.NONE: raise RouteException("This list does not have an archive.") # Get the archive messages. archive_folder = f"{MESSAGE_LIST_ARCHIVE_FOLDER_PREFIX}/{list_name}" folder = Folder.find_by_path(archive_folder) if not folder: raise NotExist("No archive folder found, the archive is likely empty") docs = folder.get_all_documents(filter_user=get_current_user_object()) docs = sorted(docs, key=lambda d: d.id, reverse=True) messages: list[ArchivedMessage] = [] anchor_counters: dict[str, int] = {} for doc in docs: settings = doc.document.get_settings() if not settings: continue macros = settings.get_macroinfo(default_view_ctx).get_macros() if not (message_macro := macros.get("message", {})) or not isinstance( message_macro, dict ): continue message_body = next( (p for p in doc.document if p.get_auto_id() == "message-body"), None ) if not message_body: continue res = message_body.get_html(default_view_ctx) message_date = message_macro.get("date") recipients = message_macro.get("recipients", []) sender = message_macro.get("sender", {}) def format_recipient(recipient: dict) -> str: name = recipient.get("name") email = recipient.get("email") if not name: return f"<{email}>" return f"{name} <{email}>" anchor = title_to_id(doc.title) if anchor in anchor_counters: anchor_counters[anchor] += 1 anchor += f"-{anchor_counters[anchor]}" else: anchor_counters[anchor] = 0 messages.append( ArchivedMessage( anchor=anchor, title=doc.title, date=datetime.strptime(message_date, "%Y-%m-%d %H:%M:%S").strftime( "%Y-%m-%d %H:%M:%S UTC+0" ) if message_date else "No date", recipients=[format_recipient(r) for r in recipients], sender=format_recipient(sender), body=res, ) ) now = get_current_time() return render_template( "messagelist/archive_export.jinja2", messages=messages, now=now, message_list=message_list, )