from dataclasses import dataclass
from operator import attrgetter
from typing import Any, Optional
from flask import Response
from timApp.auth.accesshelper import (
verify_admin,
check_admin_access,
AccessDenied,
)
from timApp.auth.accesstype import AccessType
from timApp.auth.auth_models import BlockAccess
from timApp.auth.sessioninfo import (
get_current_user_object,
get_current_user_group_object,
)
from timApp.document.create_item import apply_template, create_document
from timApp.document.docinfo import DocInfo
from timApp.item.validation import ItemValidationRule
from timApp.timdb.sqa import db
from timApp.user.special_group_names import (
SPECIAL_GROUPS,
PRIVILEGED_GROUPS,
SPECIAL_USERNAMES,
)
from timApp.user.user import User, view_access_set, edit_access_set
from timApp.user.usergroup import UserGroup
from timApp.util.flask.requesthelper import load_data_from_req, RouteException, NotExist
from timApp.util.flask.responsehelper import json_response
from timApp.util.flask.typedblueprint import TypedBlueprint
from timApp.util.utils import remove_path_special_chars, get_current_time
from tim_common.marshmallow_dataclass import class_schema
groups = TypedBlueprint("groups", __name__, url_prefix="/groups")
USER_NOT_FOUND = "User not found"
[docs]def verify_groupadmin(
require: bool = True,
user: User | None = None,
action: str | None = None,
msg: str | None = None,
):
curr_user = user
if curr_user is None:
curr_user = get_current_user_object()
if not check_admin_access(user=user):
if not UserGroup.get_groupadmin_group() in curr_user.groups:
if require:
msg = msg or "This action requires group administrator rights."
if action:
msg = action + ": " + msg
raise AccessDenied(msg)
else:
return False
return True
[docs]def get_uid_gid(group_name, usernames) -> tuple[UserGroup, list[User]]:
users = User.query.filter(User.name.in_(usernames)).all()
group = UserGroup.query.filter_by(name=group_name).first()
raise_group_not_found_if_none(group_name, group)
return group, users
[docs]@groups.get("/getOrgs")
def get_organizations() -> Response:
return json_response(UserGroup.get_organizations())
[docs]@groups.get("/show/<group_name>")
def show_members(group_name: str) -> Response:
ug = get_group_or_abort(group_name)
verify_group_view_access(ug)
return json_response(sorted(list(ug.users), key=attrgetter("id")))
[docs]@groups.get("/usergroups/<username>")
def show_usergroups(username: str) -> Response:
verify_admin()
u = User.get_by_name(username)
if not u:
raise NotExist(USER_NOT_FOUND)
return json_response(
u.get_groups(include_special=False).order_by(UserGroup.name).all()
)
[docs]@groups.get("/belongs/<username>/<group_name>")
def belongs(username: str, group_name: str) -> Response:
ug = get_group_or_abort(group_name)
verify_group_view_access(ug)
u = User.get_by_name(username)
if not u:
raise NotExist(USER_NOT_FOUND)
return json_response({"status": ug in u.groups})
[docs]def get_group_or_abort(group_name: str):
ug = UserGroup.get_by_name(group_name)
raise_group_not_found_if_none(group_name, ug)
return ug
[docs]def raise_group_not_found_if_none(group_name: str, ug: UserGroup | None):
if not ug:
raise RouteException(f'User group "{group_name}" not found')
[docs]@groups.get("/create/<path:group_path>")
def create_group(group_path: str) -> Response:
"""Route for creating a user group.
The name of user group has the following restrictions:
1. The name must have at least one digit.
2. The name must have at least one alphabetic character.
3. The name must NOT have any non-alphanumeric characters, with the exception that spaces are allowed.
These restrictions are needed in order to distinguish manually-created groups from personal user groups.
Personal user group names are either
1. email addresses (containing '@' character), or
2. lowercase ASCII strings (Korppi users) with length being in range [2,8].
"""
_, doc = do_create_group(group_path)
db.session.commit()
return json_response(doc)
[docs]def do_create_group(group_path: str) -> tuple[UserGroup, DocInfo]:
group_path = group_path.strip("/ ")
# The name of the user group is separated from the path.
# Does not check whether a name or a path is missing.
group_name = group_path.split("/")[-1]
if UserGroup.get_by_name(group_name):
raise RouteException("User group already exists.")
verify_groupadmin(action=f"Creating group {group_name}")
validate_groupname(group_name)
# To support legacy code:
# The group administrator has always writing permission to the groups' root folder.
# Creating a new user group into the root folder named groups is always allowed.
# Elsewhere, the current user must have group administrator rights.
creating_subdirectory = "/" in group_path
parent_owner = (
get_current_user_group_object()
if creating_subdirectory
else UserGroup.get_admin_group()
)
doc = create_document(
f"groups/{remove_path_special_chars(group_path)}",
group_name,
validation_rule=ItemValidationRule(check_write_perm=creating_subdirectory),
parent_owner=parent_owner,
)
apply_template(doc)
update_group_doc_settings(doc, group_name)
add_group_infofield_template(doc)
u = UserGroup.create(group_name)
u.admin_doc = doc.block
f = doc.parent
if len(f.block.accesses) == 1:
logged_group = UserGroup.get_logged_in_group()
f.block.accesses[(logged_group.id, AccessType.view.value)] = BlockAccess(
usergroup_id=logged_group.id,
type=AccessType.view.value,
accessible_from=get_current_time(),
)
return u, doc
[docs]def add_group_infofield_template(doc: DocInfo):
text = """
## Omia kenttiƤ {defaultplugin="textfield" readonly="view" .hidden-print}
{#info autosave: true #}
"""
doc.document.add_text(text)
[docs]def update_group_doc_settings(
doc: DocInfo, group_name: str, extra_macros: dict[str, Any] = None
):
s = doc.document.get_settings().get_dict().get("macros", {})
s["group"] = group_name
s["fields"] = ["info"]
s["maxRows"] = "40em" # max rows for group list
if extra_macros:
s.update(extra_macros)
doc.document.add_setting("macros", s)
[docs]def validate_groupname(group_name: str):
has_digits = False
has_letters = False
has_upper_letters = False
has_non_alnum = False
for c in group_name:
has_digits = has_digits or c.isdigit()
has_letters = has_letters or c.isalpha()
has_upper_letters = has_upper_letters or c.isupper()
has_non_alnum = has_non_alnum or not (c.isalnum() or c.isspace() or c in "-_")
if not has_digits or not has_letters or has_non_alnum or has_upper_letters:
raise RouteException(
'User group must contain at least one digit and one letter and must not have uppercase or special chars: "'
+ group_name
+ '"'
)
[docs]def verify_group_access(ug: UserGroup, access_set, u=None, require=True):
if ug.name in PRIVILEGED_GROUPS:
return verify_admin(require=require, user=u)
if not u:
u = get_current_user_object()
if u.get_personal_group() == ug:
return True
b = ug.admin_doc
no_access_msg = f"No access for group {ug.name}"
if not b:
return verify_groupadmin(require=require, user=u, msg=no_access_msg)
else:
if not u.has_some_access(b, access_set):
return verify_groupadmin(require=require, user=u, msg=no_access_msg)
return True
[docs]def verify_group_edit_access(ug: UserGroup, user: User | None = None, require=True):
if ug.name in SPECIAL_GROUPS:
raise RouteException(f"Cannot edit special group: {ug.name}")
if User.get_by_name(ug.name):
raise RouteException(f"Cannot edit personal group: {ug.name}")
if ug.name.startswith("cumulative:") or ug.name.startswith("deleted:"):
raise RouteException(f"Cannot edit special Sisu group: {ug.name}")
return verify_group_access(ug, edit_access_set, user, require=require)
[docs]def verify_group_view_access(ug: UserGroup, user=None, require=True):
return verify_group_access(ug, view_access_set, user, require=require)
[docs]def get_member_infos(group_name: str, usernames: list[str]):
usernames = get_usernames(usernames)
group, users = get_uid_gid(group_name, usernames)
verify_group_edit_access(group)
existing_usernames = {u.name for u in users}
existing_ids = {u.id for u in group.users}
not_exist = [name for name in usernames if name not in existing_usernames]
return existing_ids, group, not_exist, usernames, users
[docs]@dataclass
class NamesModel:
names: list[str]
NamesModelSchema = class_schema(NamesModel)
[docs]@groups.post("/addmember/<group_name>")
def add_member(group_name: str) -> Response:
nm: NamesModel = load_data_from_req(NamesModelSchema)
existing_ids, group, not_exist, usernames, users = get_member_infos(
group_name, nm.names
)
if set(nm.names) & SPECIAL_USERNAMES:
raise RouteException("Cannot add special users.")
already_exists = {u.name for u in group.users} & set(usernames)
added = []
curr = get_current_user_object()
for u in users:
if u.id not in existing_ids:
u.add_to_group(group, added_by=curr)
added.append(u.name)
db.session.commit()
return json_response(
{
"already_belongs": sorted(list(already_exists)),
"added": sorted(added),
"not_exist": sorted(not_exist),
}
)
[docs]@groups.post("/removemember/<group_name>")
def remove_member(group_name: str) -> Response:
nm: NamesModel = load_data_from_req(NamesModelSchema)
existing_ids, group, not_exist, usernames, users = get_member_infos(
group_name, nm.names
)
removed = []
does_not_belong = []
ensure_manually_added = group.is_sisu
su = User.get_scimuser()
for u in users:
if u.id not in existing_ids:
does_not_belong.append(u.name)
continue
if ensure_manually_added and group.current_memberships[u.id].adder == su:
raise RouteException(
"Cannot remove not-manually-added users from Sisu groups."
)
group.current_memberships[u.id].set_expired()
removed.append(u.name)
db.session.commit()
return json_response(
{
"removed": sorted(removed),
"does_not_belong": sorted(does_not_belong),
"not_exist": sorted(not_exist),
}
)
[docs]def get_usernames(usernames: list[str]):
usernames = list({name.strip() for name in usernames})
usernames.sort()
return usernames