Source code for timApp.messaging.messagelist.messagelist_utils

import itertools
import re
from dataclasses import dataclass, field
from datetime import datetime
from email.utils import parsedate_to_datetime
from enum import Enum
from re import Match
from typing import Iterator
from urllib.error import HTTPError
from urllib.parse import SplitResult, parse_qs, urlsplit

from mailmanclient import MailingList
from sqlalchemy.orm import load_only

from timApp.auth.accesshelper import has_manage_access, AccessDenied
from timApp.auth.accesstype import AccessType
from timApp.auth.sessioninfo import get_current_user_object
from timApp.document.create_item import create_document, apply_template
from timApp.document.docentry import DocEntry
from timApp.document.docinfo import DocInfo
from timApp.folder.folder import Folder
from timApp.item.block import Block
from timApp.item.validation import ItemValidationRule
from timApp.messaging.messagelist.emaillist import (
    get_email_list_by_name,
    set_notify_owner_on_list_change,
    set_email_list_unsubscription_policy,
    set_email_list_subject_prefix,
    set_email_list_only_text,
    set_email_list_allow_nonmember,
    set_email_list_allow_attachments,
    set_email_list_default_reply_type,
    add_email,
    get_email_list_member,
    remove_email_list_membership,
    set_email_list_member_send_status,
    set_email_list_member_delivery_status,
    set_email_list_description,
    set_email_list_info,
    log_mailman,
)
from timApp.messaging.messagelist.listinfo import (
    ArchiveType,
    ListInfo,
    ReplyToListChanges,
)
from timApp.messaging.messagelist.messagelist_models import (
    MessageListModel,
    Channel,
    MessageListTimMember,
    MessageListExternalMember,
    MessageListMember,
)
from timApp.timdb.sqa import db
from timApp.user.groups import verify_groupadmin
from timApp.user.user import User
from timApp.user.usergroup import UserGroup
from timApp.util.flask.requesthelper import RouteException
from timApp.util.logger import log_warning
from timApp.util.utils import remove_path_special_chars, get_current_time


[docs]def verify_can_create_lists() -> None: curr_user = get_current_user_object() res = verify_groupadmin(False, curr_user) or curr_user.is_sisu_teacher if not res: raise AccessDenied("This action requires permission to create message lists")
[docs]def verify_messagelist_name_requirements(name_candidate: str) -> None: """Checks name requirements specific for email list. If at any point a name requirement check fails, then an exception is raised an carried to the client. If all name requirements are met, then succeed silently. :param name_candidate: Name to check against naming rules. """ # There might become a time when we also check here if name is some message list specific reserved name. We # haven't got a source of those reserved names, not including names that already exists, so no check at this time. verify_name_rules(name_candidate) verify_name_availability(name_candidate)
[docs]def verify_name_availability(name_candidate: str) -> None: """Check if a message list with a given name already exists. :param name_candidate: The name to be checked if it already exists. """ if MessageListModel.name_exists(name_candidate): raise RouteException(f"Message list with name {name_candidate} already exists.")
# Regular expression patters used for name rule verification. They are kept here, so they are not re-compiled at # every name rule verification. The explanation of the rules is at their usage in verify_name_rules function. START_WITH_LOWERCASE_PATTER = re.compile(r"^[a-z]") SEQUENTIAL_DOTS_PATTERN = re.compile(r"\.\.+") # A Name cannot have allowed characters. This set of characters is an import from Korppi's character # limitations for email list names, and can probably be expanded in the future if desired. # lowercase letters a - z # digits 0 - 9 # dot '.' # hyphen '-' # underscore '_' # The pattern is a negation of the actual rules. PROHIBITED_CHARACTERS_PATTERN = re.compile(r"[^a-z0-9.\-_]") REQUIRED_DIGIT_PATTERN = re.compile(r"\d")
[docs]class NameRequirements(Enum): NAME_LENGTH_BOUNDED = 0 START_WITH_LOWERCASE = 1 NO_SEQUENTIAL_DOTS = 2 NO_TRAILING_DOTS = 3 NO_FORBIDDEN_CHARS = 4 MIN_ONE_DIGIT = 5
[docs]def check_name_rules(name_candidate: str) -> Iterator[NameRequirements]: """Check if name candidate complies with naming rules. :param name_candidate: What name we are checking against the rules. :return: A generator that returns violated name rules. """ # Be careful when checking regex rules. Some rules allow a pattern to exist, while prohibiting others. Some # rules prohibit something, but allow other things to exist. If the explanation for a rule is different than # the regex, the explanation is more likely to be correct. # Name is within length boundaries. lower_bound = 5 upper_bound = 36 if not (lower_bound <= len(name_candidate) <= upper_bound): yield NameRequirements.NAME_LENGTH_BOUNDED # Name has to start with a lowercase letter. if not START_WITH_LOWERCASE_PATTER.search(name_candidate): yield NameRequirements.START_WITH_LOWERCASE # Name cannot have multiple dots in sequence. if SEQUENTIAL_DOTS_PATTERN.search(name_candidate): yield NameRequirements.NO_SEQUENTIAL_DOTS # Name cannot end in a dot. if name_candidate.endswith("."): yield NameRequirements.NO_TRAILING_DOTS if PROHIBITED_CHARACTERS_PATTERN.search(name_candidate): yield NameRequirements.NO_FORBIDDEN_CHARS # Name has to include at least one digit. if not REQUIRED_DIGIT_PATTERN.search(name_candidate): yield NameRequirements.MIN_ONE_DIGIT
[docs]def verify_name_rules(name_candidate: str) -> None: """Check if name candidate complies with naming rules. The function raises a RouteException if naming rule is violated. If this function doesn't raise an exception, then the name candidate follows naming rules. :param name_candidate: What name we are checking against the rules. """ res = next(check_name_rules(name_candidate), None) if res: raise RouteException(res.name)
[docs]@dataclass class EmailAndDisplayName: """Wrapper for parsed email messages containing sender/receiver email and display name.""" email: str name: str
[docs] def to_json(self) -> dict[str, str]: res = {"email": self.email} if self.name: res["name"] = self.name return res
[docs]@dataclass class BaseMessage: """A unified datastructure for messages TIM handles.""" # Meta information about where this message belongs to and where its from. Mandatory values for all messages. message_list_name: str message_channel: Channel = field( metadata={"by_value": True} ) # Where the message came from. # Header information. Mandatory values for all messages. sender: EmailAndDisplayName recipients: list[EmailAndDisplayName] subject: str # Message body. Mandatory value for all messages. message_body: str # Email specific attributes. domain: str | None = None reply_to: EmailAndDisplayName | None = None # Timestamp for the message is a mandatory value. If the message comes from an outside source, it should already # have a time stamp. The default value is mostly for messages that would be generated inside TIM. It can also be # set for messages which for some reason don't already have any form of timestamp present. timestamp: datetime = get_current_time()
# Path prefixes for documents and folders. MESSAGE_LIST_DOC_PREFIX = "messagelists" MESSAGE_LIST_ARCHIVE_FOLDER_PREFIX = "archives"
[docs]def create_archive_doc_with_permission( archive_subject: str, archive_doc_path: str, message_list: MessageListModel, message: BaseMessage, ) -> DocEntry: """Create archive document with permissions matching the message list's archive policy. :param archive_subject: The subject of the archive document. :param archive_doc_path: The path where the archive document should be created. :param message_list: The message list where the message belongs. :param message: The message about to be archived. :return: The archive document. """ # Gather owners of the archive document. message_owners: list[UserGroup] = [] message_sender = User.get_by_email(message.sender.email) # List owners get a default ownership for the messages on a list. This covers the archive policy of SECRET. message_owners.extend(get_message_list_owners(message_list)) # Sender will always be able to see their message if message_sender: message_owners.append(message_sender.get_personal_group()) # Who gets to see a message in the archives. message_viewers: list[UserGroup] = [] # Gather additional permissions to the archive doc. # The meanings of different archive settings are listed with ArchiveType class. if message_list.archive_policy is ArchiveType.PUBLIC: message_viewers.append(UserGroup.get_anonymous_group()) elif message_list.archive_policy is ArchiveType.UNLISTED: message_viewers.append(UserGroup.get_logged_in_group()) elif message_list.archive_policy is ArchiveType.GROUPONLY: message_viewers.extend([m.user_group for m in message_list.get_tim_members()]) # Otherwise it's secret => no one but list owners and sender can see # List owner will always be at least one of the message owners archive_doc = DocEntry.create( title=archive_subject, path=archive_doc_path, owner_group=message_owners[0] ) # Add the rest of the message owners. if len(message_owners) > 1: archive_doc.block.add_rights(message_owners[1:], AccessType.owner) # Add view rights. archive_doc.block.add_rights(message_viewers, AccessType.view) return archive_doc
# Based on https://mathiasbynens.be/demo/url-regex with minor edits # This is one of the simplest patterns and it matches all cases correctly except for some special cases url_pattern = r"(https?|ftp)://[^\s/$.?#].[^\s]*" md_url_pattern = re.compile( rf"(\[([^]]*)\]\(({url_pattern})\))|({url_pattern})", re.IGNORECASE )
[docs]def message_body_to_md(body: str) -> str: """ Converts mail body into markdown. Importantly, the function * adds extra spacing for quotes * adds explicit newline to non-paragraph breaks * makes links clickable * cleans up Outlook safelinks :param body: Original message body. :return: Markdown-converted message body. """ result: list[str] = [] body_lines = body.splitlines(False) code_block = None quote_level = 0 def fix_url(url: SplitResult) -> str: real_url = None # Outlook safelink if "safelinks.protection.outlook.com" in url.netloc: qs = parse_qs(url.query) real_url = qs.get("url", [""])[0] real_url = real_url or url.geturl() extra = "" # Usually URLs don't end with a dot, so it's reasonable to move it outside the link if real_url.endswith("."): real_url = real_url[:-1] extra = "." return f"<{real_url}>{extra}" if not code_block else real_url def handle_md_url(m: Match) -> str: md_url, raw_url = m.group(1), m.group(5) if raw_url: return fix_url(urlsplit(raw_url)) else: return f"[{m.group(2)}]({fix_url(urlsplit(m.group(3)))})" def append_line(line_str: str = "") -> None: result.append((quote_level * ">") + line_str) def count_prefix_char(s: str, prefix_char: str) -> int: res = 0 for c in s: if c.isspace(): continue if c == prefix_char: res += 1 else: break return res def strip_quotes(s: str) -> str: for _ in range(quote_level): index = next((ci for ci, c in enumerate(s) if c == ">"), None) if index is not None: s = s[index + 1 :] return s def is_list_start(s: str) -> bool: return s.startswith("-") or s.startswith("*") for i, line in enumerate(body_lines): # plaintext boundary if it's present, simply ignore since we'd rather save just the plaintext mail if line == "--- mail_boundary ---": break line = md_url_pattern.sub(handle_md_url, line) prev = body_lines[i - 1] if i > 0 else "" cur_quote_level = count_prefix_char(line, ">") prev_quote_level = count_prefix_char(prev, ">") # Strip quotes after computing line's quote level line = strip_quotes(line) cur = line.strip() prev = prev.strip() # Headers are not common in emails, so it's better to just paste them verbatim if cur.startswith("#"): line = line.replace("#", "\\#") # Code block start/end if cur.startswith("```"): if not code_block: code_block_end = count_prefix_char(cur, "`") code_block = cur[:code_block_end] elif cur.startswith(code_block): code_block = None append_line(line) continue # Code block -> handle verbatim if code_block: append_line(line) continue # Quote level mismatch => quote level is changed, append newline and change quote level if cur_quote_level != prev_quote_level: # If we go deeper, add newline on current level if cur_quote_level > prev_quote_level and prev: append_line() quote_level = cur_quote_level # If we return from quote, add newline on new level if cur_quote_level < prev_quote_level and cur: append_line() # Reset line and current values since we changed quote level line = strip_quotes(line) cur = line.strip() prev = "" cur_is_list_start = is_list_start(cur) # If the current line starts a list and prev line is not empty, # add a newline => forces a new paragraph for a list in markdown if cur_is_list_start and prev: append_line() # Previous and current lines are non-empty lists => force newline on previous line if not cur_is_list_start and prev and cur: result[-1] += " " append_line(line) # Close the opened code block if code_block: append_line(code_block) return "\n".join(result)
[docs]def check_archives_folder_exists(message_list: MessageListModel) -> Folder | None: """ Ensures archive folder exists for the given list if the list is archived. :param message_list: Message list to check :return: The archive folder for the list if the list should be archived. Otherwise None. """ if message_list.archive_policy is ArchiveType.NONE: return None archive_folder_path = f"{MESSAGE_LIST_ARCHIVE_FOLDER_PREFIX}/{remove_path_special_chars(message_list.name)}" archive_folder = Folder.find_by_path(archive_folder_path) if archive_folder is None: owners = get_message_list_owners(message_list) archive_folder = Folder.create( archive_folder_path, owner_groups=owners, title=f"{message_list.name}" ) return archive_folder
[docs]def archive_message(message_list: MessageListModel, message: BaseMessage) -> None: """Archive a message for a message list. :param message_list: The message list where the archived message belongs. :param message: The message being archived. """ archive_folder = check_archives_folder_exists(message_list) # Don't archive if there is nothing to archive to (e.g. list's archives are disabled) if not archive_folder: return # Don't save spam if message.subject.lower().startswith("[spam]"): return message_doc_subject = message.subject if message_list.subject_prefix: message_doc_subject = message_doc_subject.removeprefix( message_list.subject_prefix ) message_doc_name = message_doc_subject.replace("/", "-") archive_doc_path = remove_path_special_chars( f"{archive_folder.path}/{message_doc_name}-" f"{get_current_time().strftime('%Y-%m-%d %H:%M:%S')}" ) archive_doc = create_archive_doc_with_permission( message.subject, archive_doc_path, message_list, message ) archive_doc.document.add_setting( "macros", { "message": { "sender": message.sender.to_json(), "recipients": [r.to_json() for r in message.recipients], "date": message.timestamp.strftime("%Y-%m-%d %H:%M:%S"), } }, ) # Set header information for archived message. archive_doc.document.add_paragraph( "<tim-archive-header message='%%message|tojson%%'></tim-archive-header>", attrs={"allowangular": "true"}, ) # Set message body for archived message. # TODO: Check message list's only_text flag. archive_doc.document.add_paragraph( message_body_to_md(message.message_body), attrs={"taskId": "message-body"} ) archive_doc.document.add_paragraph( "<tim-archive-footer message='%%message|tojson%%'></tim-archive-footer>", attrs={"allowangular": "true"}, ) db.session.commit()
[docs]def parse_mailman_message(original: dict, msg_list: MessageListModel) -> BaseMessage: """Modify an email message sent from Mailman to TIM's universal message format. :param original: An email message sent from Mailman. :param msg_list: The message list where original is meant to go. :return: A BaseMessage object corresponding the original email message. """ # original message is of form specified in https://pypi.org/project/mail-parser/ visible_recipients: list[EmailAndDisplayName] = [] maybe_to_addresses = parse_mailman_message_address(original, "to") if maybe_to_addresses is not None: visible_recipients.extend(maybe_to_addresses) maybe_cc_addresses = parse_mailman_message_address(original, "cc") if maybe_cc_addresses is not None: visible_recipients.extend(maybe_cc_addresses) sender: EmailAndDisplayName | None = None maybe_from_address = parse_mailman_message_address(original, "from") if maybe_from_address is not None: # Expect only one sender. sender = maybe_from_address[0] if sender is None: # If no sender is found on a message, we don't archive the message. raise RouteException("No sender found in the message.") message_subject = original.get("subject", "No subject") message_body = original.get("body", "") message = BaseMessage( message_list_name=msg_list.name, domain=msg_list.email_list_domain, message_channel=Channel.EMAIL_LIST, # Header information sender=sender, recipients=visible_recipients, subject=message_subject, # Message body message_body=message_body, ) # Try parsing the rest of email specific fields. if "reply_to" in original: message.reply_to = original["reply_to"] if "date" in original: try: # At first we except RFC5322 format Date header. message.timestamp = parsedate_to_datetime(original["date"]) except (TypeError, ValueError): # Being here means that the date field is not in RFC5322 format. Testing has shown that ISO8601 format is # then a likely candidate format for Date header. Try parsing that format. try: message.timestamp = datetime.fromisoformat(original["date"]) except ValueError: # Being here means that the date field was none of tried formats after all. We'll log the format the # date was in so that it can be fixed. log_warning( f"Function parse_mailman_message has encountered a Date header format it cannot handle. The " f"date is of format {original['date']}. Please handle this at earliest convenience." ) return message
[docs]def parse_mailman_message_address( original: dict, header: str ) -> list[EmailAndDisplayName] | None: """Parse (potentially existing) fields 'from' 'to', 'cc', or 'bcc' from a dict representing Mailman's email message. The fields are in lists, with individual list indicies being lists themselves of the form ['Display Name', 'email@domain.fi'] :param original: Original message. :param header: One of "from", "to", "cc" or "bcc". :return: Return None if the header is not one of "from", "to", "cc" or "bcc". Otherwise return a list of EmailAndDisplayName objects. """ if header not in ["from", "to", "cc", "bcc"]: return None email_name_pairs: list[EmailAndDisplayName] = [] if header in original: for email_name_pair in original[header]: new_email_name_pair = EmailAndDisplayName( email=email_name_pair[1], name=email_name_pair[0] ) email_name_pairs.append(new_email_name_pair) return email_name_pairs
[docs]def get_message_list_owners(mlist: MessageListModel) -> list[UserGroup]: """Get the owners of a message list. :param mlist: The message list we want to know the owners. :return: A list of owners, as their personal user group. """ manage_doc_block = Block.query.filter_by(id=mlist.manage_doc_id).one() return manage_doc_block.owners
[docs]def create_management_doc( msg_list_model: MessageListModel, list_options: ListInfo ) -> DocInfo: """Create management doc for a new message list. :param msg_list_model: The message list the management document is created for. :param list_options: Options for creating the management document. :return: Newly created management document. """ doc = create_document( f"/{MESSAGE_LIST_DOC_PREFIX}/{remove_path_special_chars(list_options.name)}", list_options.name, validation_rule=ItemValidationRule(check_write_perm=False), parent_owner=UserGroup.get_admin_group(), ) apply_template(doc) s = doc.document.get_settings().get_dict().get("macros", {}) s["messagelist"] = list_options.name doc.document.add_setting("macros", s) # Set the management doc for the message list. msg_list_model.manage_doc_id = doc.id return doc
[docs]def new_list(list_options: ListInfo) -> tuple[DocInfo, MessageListModel]: """Adds a new message list into the database and creates the list's management doc. :param list_options: The list information for creating a new message list. Used to carry list's name and archive policy. :return: The management document of the message list. :return: The message list db model. """ msg_list = MessageListModel(name=list_options.name, archive=list_options.archive) db.session.add(msg_list) doc_info = create_management_doc(msg_list, list_options) check_archives_folder_exists(msg_list) return doc_info, msg_list
[docs]def set_message_list_notify_owner_on_change( message_list: MessageListModel, notify_owners_on_list_change_flag: bool | None ) -> None: """Set the notify list owner on list change flag for a list, and update necessary channels with this information. If the message list has an email list as a message channel, this will set the equilavent flag on the email list. :param message_list: The message list where the flag is being set. :param notify_owners_on_list_change_flag: An optional boolean flag. If True, then changes on the message list sends notifications to list owners. If False, notifications won't be sent. If None, nothing is set. """ if ( notify_owners_on_list_change_flag is None or message_list.notify_owner_on_change == notify_owners_on_list_change_flag ): return message_list.notify_owner_on_change = notify_owners_on_list_change_flag if message_list.email_list_domain: # Email lists have their own flag for notifying list owners for list changes. email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_notify_owner_on_list_change(email_list, message_list.notify_owner_on_change)
[docs]def set_message_list_member_can_unsubscribe( message_list: MessageListModel, can_unsubscribe_flag: bool | None ) -> None: """Set the list member's free unsubscription flag, and propagate that setting to channels that have own handling of unsubscription. If the message list has an email list as a message channel, this will set the equilavent flag on the email list. :param message_list: Message list where the flag is being set. :param can_unsubscribe_flag: An optional boolean flag. For True, the member can unsubscribe on their own. For False, then the member can't unsubscribe from the list on their own. If None, then the current value is kept. """ if ( can_unsubscribe_flag is None or message_list.can_unsubscribe == can_unsubscribe_flag ): return message_list.can_unsubscribe = can_unsubscribe_flag if message_list.email_list_domain: # Email list's have their own settings for unsubscription. email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_unsubscription_policy(email_list, can_unsubscribe_flag)
[docs]def set_message_list_subject_prefix( message_list: MessageListModel, subject_prefix: str | None ) -> None: """Set the message list's subject prefix. If the message list has an email list as a message list, then set the subject prefix there also. Sets one extra space automatically to offset prefix from the actual title. :param message_list: The message list where the subject prefix is being set. :param subject_prefix: The prefix set for messages that go through the list. If None, then the current value is kept. """ if subject_prefix is None or message_list.subject_prefix == subject_prefix: return # Add an extra space, if there is none. if not subject_prefix.endswith(" "): subject_prefix = f"{subject_prefix} " message_list.subject_prefix = subject_prefix if message_list.email_list_domain: email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_subject_prefix(email_list, subject_prefix)
[docs]def set_message_list_tim_users_can_join( message_list: MessageListModel, can_join_flag: bool | None ) -> None: """Set the flag controlling if TIM users can directly join this list. Because the behaviour that is controlled by the can_join_flag applies to TIM users, there is no message channel specific handling. :param message_list: Message list where the flag is being set. :param can_join_flag: An optional boolean flag. If True, then TIM users can directly join this list, no moderation needed. If False, then TIM users can't direclty join the message list. If None, the current value is kept. """ if can_join_flag is None or message_list.tim_user_can_join == can_join_flag: return message_list.tim_user_can_join = can_join_flag
[docs]def set_message_list_default_send_right( message_list: MessageListModel, default_send_right_flag: bool | None ) -> None: """Set the default message list new member send right flag. :param message_list: The message list where the flag is set. :param default_send_right_flag: An optional boolean flag. For True, new members on the list get default send right. For False, new members don't get a send right. For None, the current value is kept. """ if ( default_send_right_flag is None or message_list.default_send_right == default_send_right_flag ): return message_list.default_send_right = default_send_right_flag
[docs]def set_message_list_default_delivery_right( message_list: MessageListModel, default_delivery_right_flag: bool | None ) -> None: """Set the message list new member default delivery right. :param message_list: The message list where the flag is set. :param default_delivery_right_flag: An optional boolean flag. For True, new members on the list get default delivery right. For False, new members don't automatically get a delivery right. For None, the current value is kept. """ if ( default_delivery_right_flag is None or message_list.default_delivery_right == default_delivery_right_flag ): return message_list.default_delivery_right = default_delivery_right_flag
[docs]def set_message_list_only_text( message_list: MessageListModel, only_text: bool | None ) -> None: """Set the flag controlling if message list is to accept text-only messages. :param message_list: The message list where the flag is to be set. :param only_text: An optional boolean flag. For True, the message list is set to text-only mode. For False, the message list accepts HTML-based messages. For None, the current value is kept. """ if only_text is None or message_list.only_text == only_text: return message_list.only_text = only_text if message_list.email_list_domain: email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_only_text(email_list, only_text)
[docs]def set_message_list_non_member_message_pass( message_list: MessageListModel, non_member_message_pass_flag: bool | None ) -> None: """Set message list's non member message pass flag. :param message_list: The message list where the flag is set. :param non_member_message_pass_flag: An optional boolean flag. For True, sources outside the list can send messages to this list. If False, messages form sources outside the list will be hold for moderation. For None, the current value is kept. """ if ( non_member_message_pass_flag is None or message_list.non_member_message_pass == non_member_message_pass_flag ): return message_list.non_member_message_pass = non_member_message_pass_flag if message_list.email_list_domain: email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_allow_nonmember(email_list, non_member_message_pass_flag)
[docs]def set_message_list_allow_attachments( message_list: MessageListModel, allow_attachments_flag: bool | None ) -> None: """Set the flag controlling if a message list accepts messages with attachments. :param message_list: The message list where the flag is to be set. :param allow_attachments_flag: An optional boolean flag. For True, the list will allow a pre-determined set of attachments. For False, no attachments are allowed. For None, the current value is kept. """ if ( allow_attachments_flag is None or message_list.allow_attachments == allow_attachments_flag ): return message_list.allow_attachments = allow_attachments_flag if message_list.email_list_domain: email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_allow_attachments(email_list, allow_attachments_flag)
[docs]def set_message_list_default_reply_type( message_list: MessageListModel, default_reply_type: ReplyToListChanges | None ) -> None: """Set a value controlling how replies to a message list are steered. The reply type is analogous to email lists' operation of "Reply-To munging". Reply-To munging is a process where messages sent to list may be subject to having their Reply-To header changed from what the sender of the message initially used. This is mainly used (and sometimes abused) to steer conversation from announce-only lists (which don't accept posts from anyone except few select individuals) to separate discussion lists. :param message_list: The message list where the value is to be set. :param default_reply_type: An optional enumeration. For value NOCHANGES the user is completely left the control how to respond to messages sent from the list. For value ADDLIST the replies will be primarily steered towards the message list. For None, the current value is kept. """ if ( default_reply_type is None or message_list.default_reply_type == default_reply_type ): return message_list.default_reply_type = default_reply_type if message_list.email_list_domain: email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_default_reply_type(email_list, default_reply_type)
[docs]def add_new_message_list_group( msg_list: MessageListModel, ug: UserGroup, send_right: bool, delivery_right: bool, em_list: MailingList | None, ) -> None: """Add new (user) group to a message list. For groups, checks that the adder has at least manage rights to group's admin doc. Performs a duplicate check for memberships. A duplicate member will not be added again to the list. The process of re-activating a removed member of a list is different. For re-activating an already existing member, use set_message_list_member_removed_status function. This is a direct add, meaning member's membership_verified attribute is set in this function. Use other means to invite members. :param msg_list: The message list where the group will be added. :param ug: The user group being added to a message list. :param send_right: Send right for user groups members, that will be added to the message list individually. :param delivery_right: Delivery right for user groups members, that will be added to the message list individually. :param em_list: An optional email list. If given, then all the members of the user group will also be subscribed to the email list. """ # Check right to a group. Right checking is not required for personal groups, only user groups. if not ug.is_personal_group and not has_manage_access(ug.admin_doc): return # Check for membership duplicates. member = msg_list.find_member(username=ug.name, email=None) if member and not member.membership_ended: return # Add the user group as a member to the message list. new_group_member = MessageListTimMember( message_list_id=msg_list.id, group_id=ug.id, delivery_right=delivery_right, send_right=send_right, membership_verified=get_current_time(), ) db.session.add(new_group_member) # Add group's individual members to message channels. if em_list is not None: for user in ug.users: # TODO: Search for a set of emails and a primary email here when users' additional emails are implemented. user_email = ( user.email ) # In the future, we can search for a set of emails and a primary email here. add_email( em_list, user_email, email_owner_pre_confirmation=True, real_name=user.real_name, send_right=send_right, delivery_right=delivery_right, )
[docs]def add_message_list_external_email_member( msg_list: MessageListModel, external_email: str, send_right: bool, delivery_right: bool, em_list: MailingList, display_name: str | None, ) -> None: """Add external member to a message list. External members at this moment only support external members to email lists. :param msg_list: Message list where the member is to be added. :param external_email: The email address of an external member to be added to the message list. :param send_right: The send right to the list by the new member. :param delivery_right: The delivery right to the list by the new member. :param em_list: The email list where this external member will be also added, because at this time external members only make sense for an email list. :param display_name: Optional name associated with the external member. """ # Check for duplicate members. if msg_list.find_member(username=None, email=external_email): return new_member = MessageListExternalMember( email_address=external_email, display_name=display_name, delivery_right=delivery_right, send_right=send_right, message_list_id=msg_list.id, ) db.session.add(new_member) add_email( em_list, external_email, email_owner_pre_confirmation=True, real_name=display_name, send_right=send_right, delivery_right=delivery_right, )
[docs]def sync_message_list_on_add(user: User, new_group: UserGroup) -> None: """On adding a user to a new group, sync the user to user group's message lists. :param user: The user that was added to the new_group. :param new_group: The new group that the user was added to. """ # TODO: This might become a bottle neck, as adding to group is often done in a loop and every sync is a potential # call to different message channels (now just Mailman). In order to rid ourselves of that, we might need to # revamp the syncing. A solution might be a call to (Mailman's) server (sidelining mailmanclient-library) with a # batch of users we want to add with necessary information, and then let the server handle adding in a loop # locally. # Get all the message lists for the user group. for group_tim_member in new_group.messagelist_membership: group_message_list: MessageListModel = group_tim_member.message_list # Propagate the adding on message list's message channels. if group_message_list.email_list_domain: # TODO: Find user's contact info for emails and add them accordingly. email_list = get_email_list_by_name( group_message_list.name, group_message_list.email_list_domain ) add_email( email_list, user.email, True, user.real_name, group_tim_member.member.send_right, group_tim_member.member.delivery_right, )
[docs]def sync_message_list_on_expire(user: User, old_group: UserGroup) -> None: """On removing a user from a user group, remove the user from all the message lists that watch the group. :param user: The user who was removed from the user group. :param old_group: The group where the user was removed from. """ # TODO: This might become a bottle neck, as removing from group is often done in a loop and every sync is a # potential call to different message channels (now just Mailman). In order to rid ourselves of that, # we might need to revamp the syncing. A solution might be a call to (Mailman's) server (sidelining # mailmanclient-library) with a batch of users we want to add with necessary information, and then let the # server handle removing in a loop locally. # Get all the message lists for the user group. for group_tim_member in old_group.messagelist_membership: group_message_list: MessageListModel = group_tim_member.message_list # Propagate the deletion on message list's message channels. if group_message_list.email_list_domain: # TODO: Find user's contact info for emails and remove them accordingly. email_list = get_email_list_by_name( group_message_list.name, group_message_list.email_list_domain ) email_list_member = get_email_list_member(email_list, user.email) remove_email_list_membership(email_list_member)
[docs]def set_message_list_member_removed_status( member: MessageListMember, removed: datetime | None, email_list: MailingList | None, ) -> None: """Set the message list member's membership removed status. :param member: The member who's membership status is being set. :param removed: Member's date of removal from the message list. If None, then the member is an active member on the list. :param email_list: An email list belonging to the message list. If None, the message list does not have an email list. """ if (member.membership_ended is None and removed is None) or ( member.membership_ended and removed ): return member.remove(removed) # Remove members from email list or return them there. if email_list: if member.is_group(): ug = member.tim_member.user_group ug_members = ug.users for ug_member in ug_members: mlist_member = get_email_list_member(email_list, ug_member.email) if removed: remove_email_list_membership(mlist_member) else: # Re-set the member's send and delivery rights on the email list. set_email_list_member_send_status(mlist_member, member.send_right) set_email_list_member_delivery_status( mlist_member, member.delivery_right ) elif member.is_personal_user(): # Make changes to member's status on the email list. mlist_member = get_email_list_member(email_list, member.get_email()) # If there is an email list and the member is removed, do a soft removal on the email list. if removed: remove_email_list_membership(mlist_member) else: # Re-set the member's send and delivery rights on the email list. set_email_list_member_send_status(mlist_member, member.send_right) set_email_list_member_delivery_status( mlist_member, member.delivery_right )
[docs]def set_member_send_delivery( member: MessageListMember, send: bool, delivery: bool, email_list: MailingList | None = None, ) -> None: """Set message list member's send and delivery rights. :param member: Member who's rights are being set. :param send: Member's new send right. :param delivery: Member's new delivery right. :param email_list: If the message list has email list as one of its message channels, set the send and delivery rights there also. :return: None. """ # Send right if member.send_right != send: member.send_right = send if email_list: if member.is_personal_user(): mlist_member = get_email_list_member(email_list, member.get_email()) set_email_list_member_send_status(mlist_member, send) elif member.is_group(): # For group, set the delivery status for its members on the email list. ug = member.tim_member.user_group ug_members = ug.users # ug.current_memberships for ug_member in ug_members: # user = ug_member.personal_user email_list_member = get_email_list_member( email_list, ug_member.email ) set_email_list_member_send_status(email_list_member, send) # Delivery right. if member.delivery_right != delivery: member.delivery_right = delivery if email_list: # If message list has an email list associated with it, set delivery rights there. if member.is_personal_user(): mlist_member = get_email_list_member(email_list, member.get_email()) set_email_list_member_delivery_status(mlist_member, delivery) elif member.is_group(): # For group, set the delivery status for its members on the email list. ug = member.tim_member.user_group ug_members = ug.users # ug.current_memberships for ug_member in ug_members: # user = ug_member.personal_user email_list_member = get_email_list_member( email_list, ug_member.email ) set_email_list_member_delivery_status(email_list_member, delivery)
[docs]def set_message_list_description( message_list: MessageListModel, description: str | None ) -> None: """Set a (short) description to a message list and its associated message channels. :param message_list: The message list where the description is set. :param description: The new description. If None, keep the current value. """ if description is None or message_list.description == description: return message_list.description = description if message_list.email_list_domain: email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_description(email_list, description)
[docs]def set_message_list_info(message_list: MessageListModel, info: str | None) -> None: """Set a long description (called 'info' on Mailman) to a message list and its associated message channels. :param message_list: The message list where the (long) description is set. :param info: The new long description. If None, keep the current value. """ if info is None or message_list.info == info: return message_list.info = info if message_list.email_list_domain: email_list = get_email_list_by_name( message_list.name, message_list.email_list_domain ) set_email_list_info(email_list, info)
[docs]@dataclass class UserGroupDiff: add_user_ids: list[int] remove_user_ids: list[int]
[docs]def sync_usergroup_messagelist_members( diffs: dict[int, UserGroupDiff], permanent_delete: bool = False ) -> None: user_ids = set( itertools.chain.from_iterable( [*u.add_user_ids, *u.remove_user_ids] for u in diffs.values() ) ) if not user_ids: return user_query = User.query.filter(User.id.in_(user_ids)).options( load_only(User.id, User.email, User.real_name) ) users = {user.id: user for user in user_query} try: for ug_id, diff in diffs.items(): ug_memberships = MessageListTimMember.query.filter_by(group_id=ug_id).all() for group_tim_member in ug_memberships: group_message_list: MessageListModel = group_tim_member.message_list if group_message_list.email_list_domain: email_list = get_email_list_by_name( group_message_list.name, group_message_list.email_list_domain ) for add_id in diff.add_user_ids: user = users[add_id] add_email( email_list, user.email, True, user.real_name, group_tim_member.member.send_right, group_tim_member.member.delivery_right, ) for remove_id in diff.remove_user_ids: user = users[remove_id] remove_email_list_membership( get_email_list_member(email_list, user.email), permanent_delete, ) except HTTPError as e: log_mailman(e, "Failed to sync usergroups")