import re
import traceback
from dataclasses import field, dataclass
from functools import cached_property
from typing import Optional, Any, Generator
from flask import Blueprint, request, current_app, Response
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import aliased
from webargs.flaskparser import use_args
from timApp.admin.user_cli import do_merge_users, do_soft_delete
from timApp.messaging.messagelist.emaillist import update_mailing_list_address
from timApp.messaging.messagelist.messagelist_utils import (
sync_message_list_on_add,
sync_message_list_on_expire,
)
from timApp.sisu.parse_display_name import (
parse_sisu_group_display_name,
SisuDisplayName,
)
from timApp.sisu.scimusergroup import ScimUserGroup, external_id_re
from timApp.sisu.sisu import refresh_sisu_grouplist_doc, send_course_group_mail
from timApp.tim_app import csrf
from timApp.timdb.sqa import db
from timApp.user.scimentity import get_meta
from timApp.user.user import (
User,
UserOrigin,
last_name_to_first,
SCIM_USER_NAME,
UserInfo,
)
from timApp.user.usercontact import ContactOrigin
from timApp.user.usergroup import (
UserGroup,
tim_group_to_scim,
SISU_GROUP_PREFIX,
DELETED_GROUP_PREFIX,
)
from timApp.user.usergroupmember import UserGroupMember, membership_current
from timApp.util.flask.requesthelper import load_data_from_req, JSONException
from timApp.util.flask.responsehelper import json_response
from timApp.util.logger import log_warning, log_info
from tim_common.marshmallow_dataclass import class_schema
scim = Blueprint("scim", __name__, url_prefix="/scim")
UNPROCESSABLE_ENTITY = 422
[docs]@dataclass(frozen=True)
class SCIMNameModel:
familyName: str
givenName: str
middleName: str | None = None
[docs] def derive_full_name(self, last_name_first: bool) -> str:
if last_name_first:
full = f"{self.familyName} {self.givenName}"
if self.middleName:
full += f" {self.middleName}"
return full
else:
if self.middleName:
return f"{self.givenName} {self.middleName} {self.familyName}"
else:
return f"{self.givenName} {self.familyName}"
[docs]@dataclass(frozen=True)
class SCIMMemberModel:
value: str
name: SCIMNameModel
display: str
email: str
workEmail: str | None = None
ref: str | None = field(metadata={"data_key": "$ref"}, default=None)
type: str | None = None
@cached_property
def primary_email(self) -> str:
return self.workEmail or self.email
@cached_property
def emails(self) -> list[str]:
return [s for s in (self.workEmail, self.email) if s]
@cached_property
def has_active_email(self) -> bool:
"""Return True if user has any active emails, False otherwise.
..note::
An active email is one that messages can be sent to.
Right now Sisu SCIM sets a "nobody+username" email if the user has no valid active emails.
These emails should not be set as primary unless the user has no other primary emails at that moment.
:return: True if the email is valid, False otherwise.
"""
return not self.primary_email.startswith("nobody+")
[docs]@dataclass(frozen=True)
class SCIMCommonModel:
externalId: str
displayName: str
[docs]@dataclass(frozen=True)
class SCIMEmailModel:
value: str
type: str | None = None
primary: bool = True
[docs]@dataclass(frozen=True)
class SCIMUserModel(SCIMCommonModel):
userName: str
emails: list[SCIMEmailModel]
SCIMUserModelSchema = class_schema(SCIMUserModel)
[docs]@dataclass(frozen=True)
class SCIMGroupModel(SCIMCommonModel):
members: list[SCIMMemberModel]
id: str | None = None
schemas: list[str] | None = None
SCIMGroupModelSchema = class_schema(SCIMGroupModel)
[docs]@dataclass(frozen=True)
class SCIMException(Exception):
code: int
msg: str
headers: dict[str, str] | None = None
[docs]@scim.errorhandler(SCIMException)
def item_locked(error: Exception) -> Response:
assert isinstance(error, SCIMException)
log_warning(error.msg)
return handle_error_msg_code(error.code, error.msg, error.headers)
[docs]def handle_error(error: Any) -> Response:
return handle_error_msg_code(error.code, error.description)
[docs]def handle_error_msg_code(
code: int, msg: str, headers: dict[str, str] | None = None
) -> Response:
return json_response(
scim_error_json(code, msg),
status_code=code,
headers=headers,
)
scim.errorhandler(UNPROCESSABLE_ENTITY)(handle_error)
[docs]def scim_error_json(code: int, msg: str) -> dict:
return {
"detail": msg,
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"status": str(code),
}
[docs]@scim.before_request
def check_auth() -> None:
ip = request.remote_addr
if ip != current_app.config.get("SCIM_ALLOWED_IP"):
raise SCIMException(403, f"IP not allowed: {ip}")
expected_username = current_app.config.get("SCIM_USERNAME")
expected_password = current_app.config.get("SCIM_PASSWORD")
if not expected_username or not expected_password:
raise SCIMException(403, "SCIM username or password not configured.")
headers = {"WWW-Authenticate": 'Basic realm="Authentication required"'}
auth = request.authorization
if not auth:
raise SCIMException(
401, "This action requires authentication.", headers=headers
)
if auth.username == expected_username and auth.password == expected_password:
pass
else:
raise SCIMException(401, "Incorrect username or password.", headers=headers)
[docs]@dataclass
class GetGroupsModel:
filter: str
GetGroupsModelSchema = class_schema(GetGroupsModel)
[docs]def get_scim_id(ug: UserGroup) -> str:
return tim_group_to_scim(ug.name)
filter_re = re.compile("externalId sw (.+)")
[docs]def scim_group_to_tim(sisu_group: str) -> str:
return f"{SISU_GROUP_PREFIX}{sisu_group}"
[docs]@scim.get("/Groups")
@use_args(GetGroupsModelSchema())
def get_groups(args: GetGroupsModel) -> Response:
m = filter_re.fullmatch(args.filter)
if not m:
raise SCIMException(422, "Unsupported filter")
groups = (
ScimUserGroup.query.filter(
ScimUserGroup.external_id.startswith(scim_group_to_tim(m.group(1)))
)
.join(UserGroup)
.with_entities(UserGroup)
.all()
)
def gen_groups() -> Generator[dict, None, None]:
for g in groups: # type: UserGroup
yield {
"id": g.scim_id,
"externalId": g.scim_id,
"meta": get_meta(g),
}
return json_response(
{
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
"totalResults": len(groups),
"Resources": list(gen_groups()),
}
)
[docs]def derive_scim_group_name(s: SCIMGroupModel) -> str:
x = parse_sisu_group_display_name_or_error(s)
if x.period:
return f"{x.coursecode.lower()}-{x.year[2:]}{x.period.lower()}-{x.desc_slug}"
else:
return f"{x.coursecode.lower()}-{x.year[2:]}{x.month}{x.day}-{x.desc_slug}"
[docs]@csrf.exempt
@scim.post("/Groups")
@use_args(SCIMGroupModelSchema(), locations=("json",))
def post_group(args: SCIMGroupModel) -> Response:
log_info(f"/Groups externalId: {args.externalId}")
gname = scim_group_to_tim(args.externalId)
ug = try_get_group_by_scim(args.externalId)
if ug:
msg = f"Group already exists: {gname}"
log_warning(msg)
log_warning(str(args))
raise SCIMException(409, msg)
deleted_group = UserGroup.get_by_name(f"{DELETED_GROUP_PREFIX}{args.externalId}")
derived_name = derive_scim_group_name(args)
if deleted_group:
log_info(f"Restoring deleted group: {derived_name}")
ug = deleted_group
ug.name = disambiguate_name(derived_name)
else:
derived_name = disambiguate_name(derived_name)
ug = UserGroup(name=derived_name, display_name=args.displayName)
db.session.add(ug)
update_users(ug, args)
db.session.commit()
return json_response(group_scim(ug), status_code=201)
[docs]def disambiguate_name(derived_name: str) -> str:
if UserGroup.get_by_name(derived_name):
disambiguator = 1
while UserGroup.get_by_name(f"{derived_name}-{disambiguator}"):
disambiguator += 1
derived_name = f"{derived_name}-{disambiguator}"
# raise SCIMException(409, f'The group name "{derived_name}" '
# f'derived from display name "{args.displayName}" already exists.')
return derived_name
[docs]@scim.get("/Groups/<group_id>")
def get_group(group_id: str) -> Response:
ug = get_group_by_scim(group_id)
return json_response(group_scim(ug))
[docs]@csrf.exempt
@scim.put("/Groups/<group_id>")
def put_group(group_id: str) -> Response:
try:
ug = get_group_by_scim(group_id)
try:
d = load_data_from_req(SCIMGroupModelSchema)
except JSONException as e:
raise SCIMException(422, e.description)
update_users(ug, d)
db.session.commit()
return json_response(group_scim(ug))
except Exception:
log_warning(traceback.format_exc())
raise
[docs]@csrf.exempt
@scim.delete("/Groups/<group_id>")
def delete_group(group_id: str) -> Response:
ug = get_group_by_scim(group_id)
ug.name = f"{DELETED_GROUP_PREFIX}{ug.external_id.external_id}"
db.session.delete(ug.external_id)
db.session.commit()
return Response(status=204)
[docs]@scim.get("/Users/<user_id>")
def get_user(user_id: str) -> Response:
u = User.get_by_name(user_id)
if not u:
raise SCIMException(404, "User not found.")
return json_response(u.get_scim_data())
[docs]@csrf.exempt
@scim.put("/Users/<user_id>")
def put_user(user_id: str) -> Response:
u = User.get_by_name(user_id)
if not u:
raise SCIMException(404, "User not found.")
try:
um: SCIMUserModel = load_data_from_req(SCIMUserModelSchema)
except JSONException as e:
raise SCIMException(422, e.description)
u.real_name = last_name_to_first(um.displayName)
if um.emails:
emails = [
e.value for e in sorted(um.emails, key=lambda em: 0 if em.primary else 1)
]
u.set_emails(emails, ContactOrigin.Sisu, can_update_primary=True)
db.session.commit()
return json_response(u.get_scim_data())
email_error_re = re.compile(r"Key \(email\)=\((?P<email>[^()]+)\) already exists.")
[docs]def update_users(ug: UserGroup, args: SCIMGroupModel) -> None:
external_id = args.externalId
if not ug.external_id:
if not external_id_re.fullmatch(external_id):
raise SCIMException(
422,
f'Unexpected externalId format: "{external_id}" (displayName: "{args.displayName}")',
)
ug.external_id = ScimUserGroup(external_id=external_id)
else:
if ug.external_id.external_id != args.externalId:
raise SCIMException(422, "externalId unexpectedly changed")
current_usernames = {u.value for u in args.members}
removed_user_names = {u.name for u in ug.users} - current_usernames
expired_memberships: list[UserGroupMember] = list(
get_scim_memberships(ug)
.filter(User.name.in_(removed_user_names))
.with_entities(UserGroupMember)
)
for ms in expired_memberships:
ms.set_expired(sync_mailing_lists=False)
p = parse_sisu_group_display_name_or_error(args)
ug.display_name = args.displayName
emails = [m.primary_email for m in args.members if m.primary_email is not None]
unique_emails = set(emails)
if len(emails) != len(unique_emails):
raise SCIMException(422, f"The users do not have distinct emails.")
unique_usernames = {m.value for m in args.members}
if len(args.members) != len(unique_usernames):
raise SCIMException(422, f"The users do not have distinct usernames.")
added_users = set()
scimuser = User.get_scimuser()
existing_accounts: list[User] = User.query.filter(
User.name.in_(current_usernames) | User.email.in_(emails)
).all()
existing_accounts_dict: dict[str, User] = {u.name: u for u in existing_accounts}
existing_accounts_by_email_dict: dict[str, User] = {
u.email: u for u in existing_accounts
}
email_updates = []
with db.session.no_autoflush:
for u in args.members:
expected_name = u.name.derive_full_name(last_name_first=True)
consistent = (
u.display.endswith(" " + u.name.familyName)
# There are some edge cases that prevent this condition from working, so it has been disabled.
# and set(expected_name.split(' ')[1:]) == set(u.display.split(' ')[:-1])
)
if not consistent:
raise SCIMException(
422,
f"The display attribute '{u.display}' is inconsistent with the name attributes: "
f"given='{u.name.givenName}', middle='{u.name.middleName}', family='{u.name.familyName}'.",
)
name_to_use = expected_name
user = existing_accounts_dict.get(u.value)
if user:
if u.primary_email is not None:
user_email = existing_accounts_by_email_dict.get(u.primary_email)
if user_email and user != user_email:
if not user_email.is_email_user:
raise SCIMException(
422,
f"Users {user.name} and {user_email.name} were not automatically merged because neither was an email user.",
)
log_warning(f"Merging users {user.name} and {user_email.name}")
# Unlike in manual merging, we always merge the users because emails are automatically
# verified by Sisu
do_merge_users(user, user_email, force=True)
do_soft_delete(user_email)
db.session.flush()
user.update_info(
UserInfo(
username=u.value,
full_name=name_to_use,
last_name=u.name.familyName,
given_name=u.name.givenName,
),
)
prev_email = user.email
user.set_emails(
u.emails,
ContactOrigin.Sisu,
can_update_primary=u.has_active_email,
notify_message_lists=False, # Notified in bulk below
)
email_updates.append((prev_email, user.email))
else:
user = existing_accounts_by_email_dict.get(u.primary_email)
if user:
if not user.is_email_user:
raise SCIMException(
422,
f"Key (email)=({user.email}) already exists. Conflicting username is: {u.value}",
)
user.update_info(
UserInfo(
username=u.value,
full_name=name_to_use,
last_name=u.name.familyName,
given_name=u.name.givenName,
),
)
prev_email = user.email
user.set_emails(
u.emails,
ContactOrigin.Sisu,
can_update_primary=u.has_active_email,
notify_message_lists=False, # Notified in bulk below
)
email_updates.append((prev_email, user.email))
else:
user, _ = User.create_with_group(
UserInfo(
username=u.value,
full_name=name_to_use,
origin=UserOrigin.Sisu,
last_name=u.name.familyName,
given_name=u.name.givenName,
)
)
user.set_emails(
u.emails,
ContactOrigin.Sisu,
can_update_primary=u.has_active_email,
notify_message_lists=False, # New user, no need to notify
)
added = user.add_to_group(ug, added_by=scimuser, sync_mailing_lists=False)
if added:
added_users.add(user)
try:
db.session.flush()
except IntegrityError as e:
db.session.rollback()
return raise_conflict_error(args, e)
refresh_sisu_grouplist_doc(ug)
# Sync info with mailing lists after all information got successfully updated
for old, new in email_updates:
update_mailing_list_address(old, new)
for added_user in added_users:
sync_message_list_on_add(added_user, ug)
for expired_membership in expired_memberships:
sync_message_list_on_expire(expired_membership.user, expired_membership.group)
# Possibly just checking is_responsible_teacher could be enough.
if (
ug.external_id.is_responsible_teacher and not ug.external_id.is_studysubgroup
) or ug.external_id.is_administrative_person:
tg = UserGroup.get_teachers_group()
for u in added_users:
if tg not in u.groups:
u.groups.append(tg)
send_course_group_mail(p, u)
[docs]def parse_sisu_group_display_name_or_error(args: SCIMGroupModel) -> SisuDisplayName:
p = parse_sisu_group_display_name(args.displayName)
if not p:
raise SCIMException(
422,
f'Unexpected displayName format: "{args.displayName}" (externalId: "{args.externalId}")',
)
return p
[docs]def raise_conflict_error(args: SCIMGroupModel, e: IntegrityError) -> None:
msg = e.orig.diag.message_detail
m = email_error_re.fullmatch(msg)
if m:
em = m.group("email")
member = None
for x in args.members:
if x.primary_email == em:
member = x
break
assert member is not None
msg += " Conflicting username is: " + member.value
raise SCIMException(422, msg) from e
[docs]def is_manually_added(u: User) -> bool:
"""It is possible to add user manually to SCIM groups.
For now we assume that any email user is such.
"""
return u.is_email_user
# Required in group_scim because we need to join User table twice.
user_adder = aliased(User)
[docs]def get_scim_memberships(ug: UserGroup) -> Any:
return (
ug.memberships.join(user_adder, UserGroupMember.adder)
.join(User, UserGroupMember.user)
.filter(membership_current & (user_adder.name == SCIM_USER_NAME))
)
[docs]def group_scim(ug: UserGroup) -> dict:
def members() -> Generator[dict, None, None]:
db.session.expire(ug)
for u in get_scim_memberships(ug).with_entities(User):
yield {
"value": u.scim_id,
"$ref": u.scim_location,
"display": u.scim_display_name,
}
return {
**ug.get_scim_data(),
"members": list(members()),
}
[docs]def try_get_group_by_scim(group_id: str) -> UserGroup | None:
try:
ug = (
ScimUserGroup.query.filter_by(external_id=scim_group_to_tim(group_id))
.join(UserGroup)
.with_entities(UserGroup)
.first()
)
except ValueError:
raise SCIMException(404, f"Group {group_id} not found")
return ug
[docs]def get_group_by_scim(group_id: str) -> UserGroup:
ug = try_get_group_by_scim(group_id)
if not ug:
raise SCIMException(404, f"Group {group_id} not found")
return ug