from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Literal, Callable
import filelock
from flask import render_template_string, Response, current_app
from timApp.answer.backup import sync_user_group_memberships_if_enabled
from timApp.answer.routes import save_fields, FieldSaveRequest, FieldSaveUserEntry
from timApp.auth.accesshelper import verify_logged_in, verify_view_access
from timApp.auth.accesstype import AccessType
from timApp.auth.auth_models import BlockAccess
from timApp.auth.session.util import distribute_session_verification
from timApp.auth.sessioninfo import get_current_user_object
from timApp.document.docentry import DocEntry
from timApp.document.docinfo import DocInfo
from timApp.document.editing.globalparid import GlobalParId
from timApp.document.usercontext import UserContext
from timApp.document.viewcontext import ViewRoute, ViewContext
from timApp.item.distribute_rights import (
RightOp,
ConfirmOp,
QuitOp,
UnlockOp,
ChangeTimeOp,
register_right_impl,
UndoConfirmOp,
UndoQuitOp,
)
from timApp.item.manage import (
TimeOpt,
verify_permission_edit_access,
PermissionEditModel,
add_perm,
log_right,
remove_perm,
)
from timApp.plugin.plugin import Plugin
from timApp.timdb.sqa import db
from timApp.user.groups import verify_group_edit_access
from timApp.user.user import User
from timApp.user.usergroup import UserGroup
from timApp.user.usergroupmember import membership_current, UserGroupMember
from timApp.util.flask.requesthelper import (
NotExist,
RouteException,
view_ctx_with_urlmacros,
)
from timApp.util.flask.responsehelper import json_response, ok_response
from timApp.util.flask.typedblueprint import TypedBlueprint
from timApp.util.get_fields import get_fields_and_users, RequestedGroups
from timApp.util.logger import log_warning, log_info
from timApp.util.utils import get_current_time
from tim_common.markupmodels import GenericMarkupModel
from tim_common.marshmallow_dataclass import class_schema
from tim_common.pluginserver_flask import (
GenericHtmlModel,
PluginReqs,
register_html_routes,
EditorTab,
)
from tim_common.utils import DurationSchema
user_select_plugin = TypedBlueprint("userSelect", __name__, url_prefix="/userSelect")
[docs]@dataclass
class PermissionActionBase:
doc_path: str
type: AccessType
[docs]@dataclass
class AddPermission(PermissionActionBase):
time: TimeOpt
confirm: bool = False
[docs]@dataclass
class RemovePermission(PermissionActionBase):
pass
[docs]@dataclass
class ConfirmPermission(PermissionActionBase):
pass
[docs]@dataclass
class ChangePermissionTime(PermissionActionBase):
minutes: float
[docs]@dataclass
class SetTaskValueAction:
taskId: str
value: str
[docs]@dataclass
class DistributeRightAction:
operation: Literal["confirm", "quit", "unlock", "changetime", "undoquit"]
target: str | list[str]
timestamp: datetime | None = None
minutes: float = 0.0
@property
def timestamp_or_now(self) -> datetime:
return self.timestamp or get_current_time()
RIGHT_TO_OP: dict[str, Callable[[DistributeRightAction, str], RightOp]] = {
"confirm": lambda r, usr: ConfirmOp(
type="confirm",
email=usr,
timestamp=r.timestamp_or_now,
),
"quit": lambda r, usr: QuitOp(
type="quit",
email=usr,
timestamp=r.timestamp_or_now,
),
"undoquit": lambda r, usr: UndoQuitOp(
type="undoquit",
email=usr,
timestamp=r.timestamp_or_now,
),
"unlock": lambda r, usr: UnlockOp(
type="unlock",
email=usr,
timestamp=r.timestamp_or_now,
),
"changetime": lambda r, usr: ChangeTimeOp(
type="changetime",
email=usr,
secs=int(r.minutes * 60),
timestamp=r.timestamp_or_now,
),
}
[docs]@dataclass
class ChangeGroupAction:
changeTo: str
allGroups: list[str]
verify: bool = False
@property
def all_groups(self) -> list[str]:
return self.allGroups + [self.changeTo]
[docs]@dataclass
class ActionCollection:
addPermission: list[AddPermission] = field(default_factory=list)
confirmPermission: list[ConfirmPermission] = field(default_factory=list)
removePermission: list[RemovePermission] = field(default_factory=list)
changePermissionTime: list[ChangePermissionTime] = field(default_factory=list)
distributeRight: list[DistributeRightAction] = field(default_factory=list)
setValue: list[SetTaskValueAction] = field(default_factory=list)
addToGroups: list[str] = field(default_factory=list)
removeFromGroups: list[str] = field(default_factory=list)
changeGroup: ChangeGroupAction | None = None
verifyRemoteSessions: list[str] = field(default_factory=list)
invalidateRemoteSessions: list[str] = field(default_factory=list)
[docs]@dataclass
class ScannerOptions:
applyOnMatch: bool = False
continuousMatch: bool = False
enabled: bool = False
scanInterval: float = 1.5
waitBetweenScans: float = 0.0
beepOnSuccess: bool = False
beepOnFailure: bool = False
parameterSeparator: str | None = "#"
[docs]@dataclass
class TextOptions:
apply: str | None = None
cancel: str | None = None
success: str | None = None
undone: str | None = None
undo: str | None = None
undoWarning: str | None = None
verifyReasons: dict[str, str] = field(default_factory=dict)
[docs]@dataclass
class UserSelectMarkupModel(GenericMarkupModel):
allowUndo: bool = False
preFetch: bool = False
inputMinLength: int = 3
autoSearchDelay: float = 0.0
selectOnce: bool = False
maxMatches: int = 10
scanner: ScannerOptions = field(default_factory=ScannerOptions)
groups: list[str] = field(default_factory=list)
fields: list[str] = field(default_factory=list)
actions: ActionCollection | None = None
text: TextOptions = field(default_factory=TextOptions)
displayFields: list[str] = field(default_factory=lambda: ["username", "realname"])
sortBy: list[str] = field(default_factory=list)
UserSelectMarkupModelSchema = class_schema(
UserSelectMarkupModel, base_schema=DurationSchema
)
[docs]@dataclass
class UserSelectStateModel:
pass
[docs]@dataclass
class UserSelectHtmlModel(
GenericHtmlModel[UserSelectInputModel, UserSelectMarkupModel, UserSelectStateModel]
):
[docs] def get_component_html_name(self) -> str:
return "user-selector"
[docs] def get_static_html(self) -> str:
return render_template_string(
"""
<div>User selector</div>
"""
)
[docs]def reqs_handler() -> PluginReqs:
template = """
``` {#user_select plugin="userSelect" nocache="true"}
groups: # Groups from which the users can be searched and selected
- groupname
fields: # Fields to use when searching. Username, full name and email are always included.
- somefields
displayFields: # What fields to show in search results. Any fields defined in "fields" are allowed.
- realname # Full name
- username # Username
- useremail # Email address
autoSearchDelay: 0 # Wait this amount of seconds before searching for a user. 0 = OFF
maxMatches: 10 # If more than one user is found, how many are shown at max.
inputMinLength: 3 # How many characters must be given before searching.
selectOnce: false # If true, hide other users when selecting one.
allowUndo: false # Can the action be undone. Undoing is not supported by all actions.
preFetch: false # If true, all users are prefetched. This makes initial load longer but searches are faster.
scanner: # Camera scanner options
enabled: false # Show the scanner button
parameterSeparator: "#" # String to separate the user query from the search parameter when scanning. If null, no separation is done.
applyOnMatch: false # If true and only one user is found from scanning, apply actions on them without verifying
continuousMatch: false # If true, scanner is not disabled after a successful scan
waitBetweenScans: 0 # If continuousMatch is true, how many seconds to wait before restarting the scanner
beepOnSuccess: false # Play a "beep" sound on successful scan.
beepOnFailure: true # Play a "beep" sound if scan was successful but no matching users were found.
scanInterval: 1.5 # Time interval between scan attempts in seconds. Lower value means faster scans but higher energy usage.
actions: # Actions to apply for the selected user
#addPermission: # Add permissions to documents
# - doc_path: some/doc/path # Target document path
# type: view # Permission type. Allowed values: view, edit, teacher, manage, see_answers, owner, copy
# time: # Duration of the permission
# type: always # Permission type. Allowed values: always, range, duration
# duration: P30M # Duration in ISO 8601 format (for "duration" type)
# to: 2021-04-23T18:00:00.000Z # When the permission ends
# from: 2021-04-23T16:00:00.000Z # When the permission starts
# durationTo: 2021-04-23T18:00:00.000Z # When the duration permission can be asked for
# durationFrom: 2021-04-23T16:00:00.000Z # When the duration permission ends
# confirm: false # Does the permission require extra confirmation?
#confirmPermission: # Confirms user's permission to a document
# - doc_path: some/doc/path # Target document path
# type: view # Permission type. Allowed values: view, edit, teacher, manage, see_answers, owner, copy
#removePermission: # Remove permissions from documents
# - doc_path: some/doc/path # Document from which to remove the permission
# type: view # Type of permission to remove: view, edit, teacher, manage, see_answers, owner, copy
#changePermissionTime: # Change duration of an active user permission
# - doc_path: some/doc/path # Document from which to edit the permission time
# type: view # Type of permission to change: view, edit, teacher, manage, see_answers, owner, copy
# minutes: 10 # By how many minutes to adjust the permission. Positive values add time, negative remove.
#setValue: # Set value to a field or task
# - taskId: 1.sometask # Task or field to which to set the value
# value: somevalue # Value to set. Can be a macro.
#addToGroups: # Add the user to the groups
# - somegroups
#changeGroup: # Change the user's group
# changeTo: somegroup # Group to change to. The user will become a member of this group.
# allGroups: # All groups to check. User is removed from other all groups that are not the same as changeTo.
# - somegroup
# verify: false # Show a verification message if user is already a member of a group in allGroups.
#removeFromGroups: # Remove the user from the groups. This will do a soft delete (i.e. add removal date)
# - somegroups
#verifyRemoteSessions: # Verify sessions for remote targets. Use DIST_RIGHTS_HOSTS to specify the actual hosts.
# - target1
#invalidateRemoteSessions: # Invalidate sessions for remote targets. Use DIST_RIGHTS_HOSTS to specify the actual hosts.
# - target1
#text: # UI texts
# apply: Apply permissions
# cancel: Cancel
# success: Gave permission to {realname}.
# undone: Undone permissions from {realname} ({HETU}).
```
"""
editor_tabs: list[EditorTab] = [
{
"text": "Plugins",
"items": [
{
"text": "UserSelect",
"items": [
{
"data": template.strip(),
"text": "User selector",
"expl": "Search users from a group and apply actions to them",
}
],
},
],
},
]
return {
"js": ["userSelect"],
"multihtml": True,
"editor_tabs": editor_tabs,
}
[docs]def log_user_select(msg: str) -> None:
if not current_app.config["LOG_USER_SELECT_ACTIONS"]:
return
log_info(f"USER_SELECT: {msg}")
[docs]def get_plugin_markup(
task_id: str | None, par: GlobalParId | None
) -> tuple[UserSelectMarkupModel, DocInfo, User, ViewContext]:
verify_logged_in()
user = get_current_user_object()
user_ctx = UserContext.from_one_user(user)
view_ctx = view_ctx_with_urlmacros(ViewRoute.Unknown)
if task_id:
plugin, doc = Plugin.from_task_id(task_id, user_ctx, view_ctx)
elif par:
plugin, doc = Plugin.from_global_par(par, user_ctx, view_ctx)
else:
raise RouteException("Either task_id or par must be specified")
model: UserSelectMarkupModel = UserSelectMarkupModelSchema().load(plugin.values)
return model, doc, user, view_ctx
[docs]@user_select_plugin.get("/fetchUsers")
def fetch_users(
task_id: str | None = None,
doc_id: int | None = None,
par_id: str | None = None,
) -> Response:
model, doc, user, view_ctx = get_plugin_markup(
task_id, GlobalParId(doc_id, par_id) if doc_id and par_id else None
)
field_data, _, field_names, _ = get_fields_and_users(
model.fields, RequestedGroups.from_name_list(model.groups), doc, user, view_ctx
)
return json_response(
{
"users": [
{"user": field_obj["user"], "fields": field_obj["fields"]}
for field_obj in field_data
],
"fieldNames": field_names,
}
)
[docs]def match_query(query_words: list[str], keywords: list[str]) -> bool:
kw = set(keywords)
for qw in query_words:
found = next((k for k in kw if qw in k), None)
if found is None:
return False
kw.remove(found)
return True
[docs]@user_select_plugin.post("/search")
def search_users(
search_strings: list[str],
task_id: str | None = None,
par: GlobalParId | None = None,
) -> Response:
model, doc, user, view_ctx = get_plugin_markup(task_id, par)
verify_view_access(doc)
field_data, _, field_names, _ = get_fields_and_users(
model.fields, RequestedGroups.from_name_list(model.groups), doc, user, view_ctx
)
# If query contains spaces, split into sub-queries that all must match
# In each subquery, match by longest word first to ensure best match
search_query_words = [
sorted(s.lower().split(), key=lambda s: len(s), reverse=True)
for s in search_strings
]
matched_field_data = []
for field_obj in field_data:
fields = field_obj["fields"]
usr = field_obj["user"]
values_to_check: list[str | float | None | None] = [
usr.name,
usr.real_name,
usr.email,
*fields.values(),
]
for field_val in values_to_check:
if not field_val:
continue
val = (
(field_val if isinstance(field_val, str) else str(field_val))
.lower()
.split()
)
if next((qws for qws in search_query_words if match_query(qws, val)), None):
matched_field_data.append(field_obj)
break
match_count = len(matched_field_data)
if match_count > model.maxMatches:
matched_field_data = matched_field_data[0 : model.maxMatches]
return json_response(
{
"matches": [
{"user": field_obj["user"], "fields": field_obj["fields"]}
for field_obj in matched_field_data
],
"allMatchCount": match_count,
"fieldNames": field_names,
}
)
[docs]def has_distribution_moderation_access(doc: DocInfo) -> bool:
allowed_docs = current_app.config.get("DIST_RIGHTS_MODERATION_DOCS", [])
return doc.path in allowed_docs
[docs]def get_plugin_info(
username: str, task_id: str | None = None, par: GlobalParId | None = None
) -> tuple[UserSelectMarkupModel, User, UserGroup, User, DocInfo]:
model, doc, _, _ = get_plugin_markup(task_id, par)
# Ensure user actually has access to document with the plugin
verify_view_access(doc)
cur_user = get_current_user_object()
user_group = UserGroup.get_by_name(username)
user_acc = User.get_by_name(user_group.name)
assert user_acc is not None
if not user_group:
raise RouteException(f"Cannot find user {username}")
if not model.actions:
return model, cur_user, user_group, user_acc, doc
can_distribute_rights = has_distribution_moderation_access(doc)
if model.actions.distributeRight and not can_distribute_rights:
raise RouteException("distributeRight is not allowed in this document")
if model.actions.verifyRemoteSessions and not can_distribute_rights:
raise RouteException("verifyRemoteSessions is not allowed in this document")
if model.actions.invalidateRemoteSessions and not can_distribute_rights:
raise RouteException("invalidateRemoteSessions is not allowed in this document")
return model, cur_user, user_group, user_acc, doc
[docs]def undo_dist_right_actions(
user_acc: User, dist_rights: list[DistributeRightAction]
) -> list[str]:
# TODO: Implement undoing for local permissions
undoable_dists = [
dist for dist in dist_rights if dist.operation in ("confirm", "quit")
]
errors = []
for distribute in undoable_dists:
if distribute.operation == "confirm":
undo_op: UndoConfirmOp | UndoQuitOp | ChangeTimeOp = UndoConfirmOp(
type="undoconfirm",
email=user_acc.email,
timestamp=distribute.timestamp_or_now,
)
elif distribute.operation == "quit":
undo_op = UndoQuitOp(
type="undoquit",
email=user_acc.email,
timestamp=distribute.timestamp_or_now,
)
elif distribute.operation == "changetime":
undo_op = ChangeTimeOp(
type="changetime",
email=user_acc.email,
timestamp=distribute.timestamp_or_now,
secs=-int(distribute.minutes * 60),
)
else:
continue
errors.extend(register_right_impl(undo_op, distribute.target))
return errors
[docs]def undo_field_actions(
cur_user: User, user_acc: User, set_value: list[SetTaskValueAction]
) -> None:
fields_to_save = {set_val.taskId: "" for set_val in set_value}
if fields_to_save:
# Reuse existing helper for answer route to save field values quickly
save_fields(
FieldSaveRequest(
savedata=[FieldSaveUserEntry(user=user_acc.id, fields=fields_to_save)]
),
cur_user,
allow_non_teacher=False,
)
[docs]def get_groups(
cur_user: User, add: list[str], remove: list[str], change_all_groups: list[str]
) -> tuple[list[UserGroup], list[UserGroup], list[UserGroup]]:
add_groups: list[UserGroup] = UserGroup.query.filter(UserGroup.name.in_(add)).all()
remove_groups: list[UserGroup] = UserGroup.query.filter(
UserGroup.name.in_(remove)
).all()
change_all_groups_ugs: list[UserGroup] = UserGroup.query.filter(
UserGroup.name.in_(change_all_groups)
).all()
all_groups: dict[str, UserGroup] = {
ug.name: ug for ug in (add_groups + remove_groups + change_all_groups_ugs)
}
for ug in all_groups.values():
if ug.is_sisu:
raise RouteException(
"Modifying Sisu groups with user selector is not allowed to prevent mistakes"
)
verify_group_edit_access(ug, cur_user)
return add_groups, remove_groups, change_all_groups_ugs
# It can be useful to offset the time a little to ensure any checks for expired memberships can pass
group_expired_offset = timedelta(seconds=1)
[docs]def undo_group_actions(
user_acc: User,
cur_user: User,
add: list[str],
remove: list[str],
change: ChangeGroupAction | None,
) -> bool:
add_groups, remove_groups, change_all_groups = get_groups(
cur_user, add, remove, change.all_groups if change else []
)
# We cannot safely add user back to a group because we don't know if the user was removed from it
changed = False
for ug in add_groups + change_all_groups:
membership = ug.current_memberships.get(user_acc.id, None)
if membership:
changed = True
membership.set_expired(time_offset=group_expired_offset)
return changed
[docs]@user_select_plugin.post("/undo")
def undo(
username: str,
task_id: str | None = None,
par: GlobalParId | None = None,
param: str | None = None, # TODO: Use
) -> Response:
model, cur_user, user_group, user_acc, doc = get_plugin_info(username, task_id, par)
# No permissions to undo
if not model.actions:
return json_response({"distributionErrors": []})
log_user_select(
f"[{cur_user.name}] undo on {user_acc.name} in {doc.path} (param = {param})"
)
groups = set(model.actions.addToGroups) | set(model.actions.removeFromGroups)
if model.actions.changeGroup:
groups |= set(model.actions.changeGroup.all_groups)
locks = [
filelock.FileLock(f"/tmp/userselect_groupaction_{group}.lock")
for group in groups
]
for lock in locks:
lock.acquire()
try:
# Undo the group actions before dist rights because dist rights can might depend on the group
# Moreover, undoing is generally a soft action, so we can always manually restore the group safely
changed = undo_group_actions(
user_acc,
cur_user,
model.actions.addToGroups,
model.actions.removeFromGroups,
model.actions.changeGroup,
)
# Flush so that right distribution is handled correctly
db.session.flush()
if changed:
sync_user_group_memberships_if_enabled(user_acc)
errors = undo_dist_right_actions(user_acc, model.actions.distributeRight)
# If there are errors undoing, don't reset the fields because it may have been caused by a race condition
if errors:
db.session.rollback()
return json_response({"distributionErrors": errors})
undo_field_actions(cur_user, user_acc, model.actions.setValue)
errors += apply_verify_session(
"verify", user_acc, param, model.actions.invalidateRemoteSessions
)
errors += apply_verify_session(
"invalidate", user_acc, param, model.actions.verifyRemoteSessions
)
db.session.commit()
finally:
for lock in locks:
lock.release()
return json_response({"distributionErrors": errors})
[docs]def apply_permission_actions(
user_group: UserGroup,
add: list[AddPermission],
remove: list[RemovePermission],
confirm: list[ConfirmPermission],
change_time: list[ChangePermissionTime],
) -> list[str]:
doc_entries = {}
update_messages = []
permission_actions: list[PermissionActionBase] = [
*add,
*remove,
*confirm,
*change_time,
]
# Verify first that all documents can be accessed and permissions edited + cache doc entries
for perm in permission_actions:
if perm.doc_path in doc_entries:
continue
doc_entry = DocEntry.find_by_path(
perm.doc_path, fallback_to_id=True, try_translation=False
)
if not doc_entry:
raise NotExist(f"Can't find document {perm.doc_path}")
verify_permission_edit_access(doc_entry, perm.type)
doc_entries[perm.doc_path] = doc_entry
for to_add in add:
doc_entry = doc_entries[to_add.doc_path]
# Don't throw if we try to remove a permission from ourselves, just ignore it
accs = add_perm(
PermissionEditModel(
to_add.type, to_add.time, [user_group.name], to_add.confirm
),
doc_entry,
replace_active_duration=False,
)
if accs:
update_messages.append(
f"added {accs[0].info_str} for {user_group.name} in {doc_entry.path}"
)
for to_remove in remove:
doc_entry = doc_entries[to_remove.doc_path]
a = remove_perm(user_group, doc_entry.block, to_remove.type)
if a:
update_messages.append(
f"removed {a.info_str} for {user_group.name} in {doc_entry.path}"
)
for to_confirm in confirm:
doc_entry = doc_entries[to_confirm.doc_path]
ba_confirm: BlockAccess | None = BlockAccess.query.filter_by(
type=to_confirm.type.value,
block_id=doc_entry.block.id,
usergroup_id=user_group.id,
).first()
if ba_confirm and ba_confirm.require_confirm:
ba_confirm.do_confirm()
update_messages.append(
f"confirmed {ba_confirm.info_str} for {user_group.name} in {doc_entry.path}"
)
for to_change in change_time:
doc_entry = doc_entries[to_change.doc_path]
ba_change: BlockAccess | None = BlockAccess.query.filter_by(
type=to_change.type.value,
block_id=doc_entry.block.id,
usergroup_id=user_group.id,
).first()
if ba_change and ba_change.accessible_to is not None:
ba_change.accessible_to += timedelta(minutes=to_change.minutes)
update_messages.append(
f"adjusted {ba_change.info_str} for {user_group.name} in {doc_entry.path} by {to_change.minutes} minutes"
)
return update_messages
[docs]def apply_field_actions(
cur_user: User, user_acc: User, set_values: list[SetTaskValueAction]
) -> None:
fields_to_save = {set_val.taskId: set_val.value for set_val in set_values}
if fields_to_save:
# Reuse existing helper for answer route to save field values quickly
save_fields(
FieldSaveRequest(
savedata=[FieldSaveUserEntry(user=user_acc.id, fields=fields_to_save)]
),
cur_user,
allow_non_teacher=False,
)
[docs]def apply_dist_right_actions(
user_acc: User, dist_right: list[DistributeRightAction]
) -> list[str]:
errors = []
for distribute in dist_right:
convert = RIGHT_TO_OP[distribute.operation]
right_op = convert(distribute, user_acc.email)
apply_errors = register_right_impl(right_op, distribute.target)
if isinstance(right_op, QuitOp):
# Ignore failing to undo twice. It is an error but it's not strictly an issue for UserSelect
# However, do this only for QuitOp to prevent other issues like trying to confirm users who has already quit
# TODO: Don't depend on string matching to filter out the error
apply_errors = [
e for e in apply_errors if "Cannot register a non-UndoQuitOp" not in e
]
errors.extend(apply_errors)
return errors
[docs]def apply_verify_session(
action: str, user_acc: User, session_id: str | None, targets: list[str]
) -> list[str]:
return distribute_session_verification(action, user_acc.name, session_id, targets)
[docs]def apply_group_actions(
user_acc: User,
cur_user: User,
add: list[str],
remove: list[str],
change_to: ChangeGroupAction | None,
) -> bool:
add_groups, remove_groups, _ = get_groups(cur_user, add, remove, [])
changed = False
for ug in add_groups:
user_acc.add_to_group(ug, cur_user)
changed = True
def expire_membership(ugg: UserGroup) -> None:
nonlocal changed
m = user_acc.memberships_dyn.filter(
membership_current & (UserGroupMember.group == ugg)
).first()
if m:
m.set_expired(time_offset=group_expired_offset)
changed = True
for ug in remove_groups:
expire_membership(ug)
if change_to:
all_groups_set = set(change_to.allGroups)
change_to_group = UserGroup.get_by_name(change_to.changeTo)
if change_to_group:
user_acc.add_to_group(change_to_group, cur_user)
changed = True
for ug in user_acc.groups:
if ug.name != change_to.changeTo and ug.name in all_groups_set:
expire_membership(ug)
return changed
[docs]class NeedsVerifyReasons(Enum):
CHANGE_GROUP_BELONGS = "changeGroupBelongs"
CHANGE_GROUP_ALREADY_MEMBER = "changeGroupAlreadyMember"
[docs]@user_select_plugin.post("/needsVerify")
def needs_verify(username: str, par: GlobalParId) -> Response:
model, cur_user, user_group, user_acc, _ = get_plugin_info(username, None, par)
if not model.actions:
return json_response({"needsVerify": False, "reasons": []})
verify_reasons = []
if model.actions.changeGroup and model.actions.changeGroup.verify:
membership_groups: set[str] = {g.name for g in user_acc.groups}
if model.actions.changeGroup.changeTo in membership_groups:
verify_reasons.append(NeedsVerifyReasons.CHANGE_GROUP_ALREADY_MEMBER)
# No need to add the other reason because "already member" is more important
elif any(
True for g in model.actions.changeGroup.allGroups if g in membership_groups
):
verify_reasons.append(NeedsVerifyReasons.CHANGE_GROUP_BELONGS)
return json_response(
{"needsVerify": len(verify_reasons) > 0, "reasons": verify_reasons}
)
[docs]@user_select_plugin.post("/apply")
def apply(
username: str,
task_id: str | None = None,
par: GlobalParId | None = None,
param: str | None = None, # TODO: Use
) -> Response:
model, cur_user, user_group, user_acc, doc = get_plugin_info(username, task_id, par)
# No permissions to apply, simply return
if not model.actions:
return ok_response()
log_user_select(
f"[{cur_user.name}] apply on {user_acc.name} in {doc.path} (param = {param})"
)
all_groups = set(model.actions.addToGroups) | set(model.actions.removeFromGroups)
if model.actions.changeGroup:
all_groups |= set(model.actions.changeGroup.all_groups)
locks = [
filelock.FileLock(f"/tmp/userselect_groupaction_{group}.lock")
for group in all_groups
]
for lock in locks:
lock.acquire()
try:
changed = apply_group_actions(
user_acc,
cur_user,
model.actions.addToGroups,
model.actions.removeFromGroups,
model.actions.changeGroup,
)
db.session.flush()
if changed:
sync_user_group_memberships_if_enabled(user_acc)
apply_field_actions(cur_user, user_acc, model.actions.setValue)
right_dist_errors = apply_dist_right_actions(
user_acc, model.actions.distributeRight
)
session_verification_errors = apply_verify_session(
"verify", user_acc, param, model.actions.verifyRemoteSessions
)
session_invalidation_errors = apply_verify_session(
"invalidate", user_acc, param, model.actions.invalidateRemoteSessions
)
update_messages = apply_permission_actions(
user_group,
model.actions.addPermission,
model.actions.removePermission,
model.actions.confirmPermission,
model.actions.changePermissionTime,
)
db.session.commit()
finally:
for lock in locks:
lock.release()
for msg in update_messages:
log_right(msg)
for error in right_dist_errors:
log_warning(
f"RIGHT_DIST: problem distributing rights for user {user_acc.email}: {error}"
)
for error in session_verification_errors:
log_warning(
f"SESSION_VERIFICATION: problem verifying session for user {user_acc.email}: {error}"
)
for error in session_invalidation_errors:
log_warning(
f"SESSION_VERIFICATION: problem invalidating session for user {user_acc.email}: {error}"
)
all_errors = (
right_dist_errors + session_verification_errors + session_invalidation_errors
)
# Better throw an error here. This should prompt the user to at least try again
# Unlike with undoing, it's better to get the user to reapply the rights or properly fix them
# Moreover, this should encourage the user to report the problem with distribution ASAP
if all_errors:
raise RouteException("\n".join([f"* {error}" for error in all_errors]))
return ok_response()
register_html_routes(
user_select_plugin,
class_schema(UserSelectHtmlModel, base_schema=DurationSchema),
reqs_handler,
)