from dataclasses import dataclass, field
from datetime import date, datetime
from json import JSONDecodeError
from textwrap import dedent
from typing import Optional, Union, Any, Generator
import requests
from flask import Blueprint, current_app, request, Response
from marshmallow import validates, ValidationError
from marshmallow.utils import _Missing, missing
from sqlalchemy import any_, true
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
from webargs.flaskparser import use_args
from timApp.answer.routes import save_fields
from timApp.auth.accesshelper import get_doc_or_abort, AccessDenied
from timApp.auth.accesstype import AccessType
from timApp.auth.sessioninfo import get_current_user_object
from timApp.document.create_item import apply_template
from timApp.document.docentry import DocEntry
from timApp.document.docinfo import DocInfo
from timApp.document.viewcontext import default_view_ctx
from timApp.item.block import Block, BlockType
from timApp.item.validation import (
ItemValidationRule,
validate_item_and_create_intermediate_folders,
validate_item,
)
from timApp.notification.send_email import send_email
from timApp.plugin.plugin import Plugin
from timApp.plugin.pluginexception import PluginException
from timApp.sisu.parse_display_name import (
SisuDisplayName,
parse_sisu_group_display_name,
)
from timApp.sisu.scimusergroup import ScimUserGroup
from timApp.tim_app import app, csrf
from timApp.timdb.sqa import db
from timApp.user.groups import (
validate_groupname,
update_group_doc_settings,
add_group_infofield_template,
verify_group_view_access,
)
from timApp.user.user import User
from timApp.user.usergroup import UserGroup, get_sisu_groups_by_filter
from timApp.util.flask.requesthelper import use_model, RouteException
from timApp.util.flask.responsehelper import json_response
from timApp.util.get_fields import (
get_fields_and_users,
MembershipFilter,
UserFieldObj,
RequestedGroups,
)
from timApp.util.logger import log_warning
from timApp.util.utils import (
remove_path_special_chars,
seq_to_str,
split_location,
get_current_time,
fin_timezone,
)
from tim_common.marshmallow_dataclass import class_schema
from tim_common.utils import Missing
sisu = Blueprint("sisu", __name__, url_prefix="/sisu")
[docs]@sisu.get("/getPotentialGroups")
def get_potential_groups_route() -> Response:
u = get_current_user_object()
result = get_potential_groups(u)
return json_response(
[
{
"id": g.id,
"name": g.name,
"external_id": g.external_id.external_id,
"display_name": g.display_name,
"doc": g.admin_doc.docentries[0] if g.admin_doc else None,
}
for g in result
]
)
# Possible role suffixes, excluding students. Order matters!
role_suffixes = [
"responsible-teachers",
"studysubgroup-teachers",
"teachers",
"administrative-persons",
]
[docs]def get_group_prefix(g: UserGroup) -> str | None:
"""Returns the prefix indicating which Sisu groups the users in this Sisu group shall have access to."""
eid = g.external_id.external_id
for s in role_suffixes:
if eid.endswith(f"-{s}"):
return eid[: -len(s)] + "%"
return None
[docs]def get_potential_groups(u: User, course_filter: str | None = None) -> list[UserGroup]:
"""Returns all the Sisu groups that the user shall have access to."""
sisu_group_memberships = (
u.groups_dyn.join(UserGroup).join(ScimUserGroup).with_entities(UserGroup).all()
)
ug_filter = true()
if not u.is_admin:
accessible_prefixes = [get_group_prefix(g) for g in sisu_group_memberships]
ug_filter = ug_filter & ScimUserGroup.external_id.like(
any_(accessible_prefixes)
)
if course_filter:
ug_filter = ug_filter & ScimUserGroup.external_id.startswith(
course_filter + "-"
)
gs = get_sisu_groups_by_filter(ug_filter)
return gs
[docs]@dataclass
class GroupCreateModel:
externalId: str
name: str | _Missing = missing
GroupCreateSchema = class_schema(GroupCreateModel)
[docs]def get_sisu_group_rights(g: UserGroup) -> list[UserGroup]:
group_names = []
if g.external_id.is_studysubgroup:
group_names.append(g.external_id.without_role + "teachers")
course_code = g.external_id.course_id
for r in role_suffixes:
group_names.append(course_code + "-" + r)
return get_sisu_groups_by_filter(ScimUserGroup.external_id.in_(group_names))
[docs]@sisu.post("/createGroupDocs")
@use_args(GroupCreateSchema(many=True), locations=("json",))
def create_groups_route(args: list[GroupCreateModel]) -> Response:
u = get_current_user_object()
# First, make sure user is eligible for access to all the requested groups.
allowed_groups = get_potential_groups(u)
allowed_external_ids = {g.external_id.external_id for g in allowed_groups}
requested_external_ids = {a.externalId for a in args}
not_allowed = requested_external_ids - allowed_external_ids
if not_allowed:
raise AccessDenied(
f"You don't have access to all the requested groups: {seq_to_str(sorted(list(not_allowed)))}"
)
# Now, create the admin documents for groups that don't yet exist.
# Rights to already existing documents need to be updated too.
name_map: dict[str, str | Missing] = {a.externalId: a.name for a in args}
group_map: dict[str, UserGroup] = {
g.external_id.external_id: g for g in allowed_groups
}
created = []
updated = []
admin_id = UserGroup.get_admin_group().id
for r in requested_external_ids:
g = group_map[r]
name_m = name_map[r]
if not name_m:
name = g.name
else:
name = name_m
if name.strip() == "":
continue
validate_groupname(name)
p = parse_sisu_group_display_name(g.display_name)
name_no_special = remove_path_special_chars(name)
if not p:
raise RouteException(
f"Failed to parse Sisu group display name: {g.display_name}"
)
expected_location = p.group_doc_root
if g.admin_doc:
doc = g.admin_doc.docentries[0]
doc.title = name
# In theory, the admin doc can have multiple aliases, so we'll only update the one in the official location.
for d in g.admin_doc.docentries:
location, short_name = split_location(d.path_without_lang)
if location != expected_location or short_name == name_no_special:
continue
new_path = f"{location}/{name_no_special}"
validate_item(
new_path,
BlockType.Document,
ItemValidationRule(check_write_perm=False, require_login=False),
)
d.name = new_path
updated.append(d)
doc.document.modifier_group_id = admin_id
else:
doc = create_sisu_document(
f"{expected_location}/{name_no_special}",
name,
owner_group=None,
)
doc.document.modifier_group_id = admin_id
apply_template(doc)
add_group_infofield_template(doc)
g.admin_doc = doc.block
created.append(doc)
docblock: Block = g.admin_doc
g.name = name
try:
db.session.flush()
except IntegrityError:
db.session.rollback()
raise RouteException(f"The group name '{name}' already exists.")
update_group_doc_settings(doc, name, extra_macros={"sisugroup": r})
groups = get_sisu_group_rights(g)
docblock.add_rights(groups, AccessType.owner)
db.session.commit()
return json_response(
{
"created": created,
"updated": updated,
}
)
[docs]def create_sisu_document(
item_path: str,
item_title: str,
owner_group: UserGroup | None = None,
) -> DocInfo:
validate_item_and_create_intermediate_folders(
item_path,
BlockType.Document,
owner_group,
validation_rule=ItemValidationRule(check_write_perm=False, require_login=False),
)
return DocEntry.create(item_path, owner_group, item_title)
[docs]def refresh_sisu_grouplist_doc(ug: UserGroup) -> None:
if not ug.external_id.is_student and not ug.external_id.is_studysubgroup:
gn = parse_sisu_group_display_name(ug.display_name)
assert gn is not None
sp = gn.sisugroups_doc_path
d = DocEntry.find_by_path(sp)
settings_to_set = {
"global_plugin_attrs": {
"all": {
"sisugroups": ug.external_id.course_id,
}
},
"macros": {
"course": gn.coursecode_and_time,
},
"preamble": "sisugroups",
}
if not d:
d = create_sisu_document(
sp, f"Sisu groups for course {gn.coursecode.upper()}", owner_group=ug
)
admin_id = UserGroup.get_admin_group().id
d.document.modifier_group_id = admin_id
d.document.set_settings(settings_to_set)
else:
d.block.add_rights([ug], AccessType.owner)
p1 = d.parent
p2 = p1.parent
p1.block.add_rights([ug], AccessType.owner) # type: ignore[union-attr]
p2.block.add_rights([ug], AccessType.owner) # type: ignore[union-attr]
p2.parent.block.add_rights([UserGroup.get_teachers_group()], AccessType.view) # type: ignore[union-attr]
# Update rights for already existing activated groups.
docs = d.parent.get_all_documents(
query_options=joinedload(DocEntry._block)
.joinedload(Block.managed_usergroup)
.joinedload(UserGroup.external_id),
)
for doc in docs:
if doc == d:
continue
group = doc.block.managed_usergroup
# Do some sanity checks for cases that may theoretically happen if someone manually moves documents
# in a wrong place.
if not group:
continue
if not group.external_id:
continue
if group.external_id.course_id != ug.external_id.course_id:
continue
doc.block.add_rights([ug], AccessType.owner)
s = d.document.get_settings()
g_attrs = s.global_plugin_attrs()
has_sisu_attr = False
valid_settings = False
if isinstance(g_attrs, dict):
a = g_attrs.get("all")
if isinstance(a, dict):
sisugroups = a.get("sisugroups")
if sisugroups == ug.external_id.course_id:
has_sisu_attr = True
valid_settings = isinstance(sisugroups, str)
if has_sisu_attr:
return
if not valid_settings:
d.document.set_settings(settings_to_set)
else:
for p in d.document.get_paragraphs():
if not p.is_plugin():
continue
try:
plug = Plugin.from_paragraph(p, default_view_ctx)
except PluginException:
continue
if plug.values.get("sisugroups") == ug.external_id.course_id:
return
d.document.modifier_group_id = UserGroup.get_admin_group().id
d.document.add_text(
f"""
# Sisu groups for course {gn.coursecode_and_time}
``` {{#table_extra plugin="tableForm"}}
sisugroups: {ug.external_id.course_id}
table: true
showInView: true
report: false
maxRows: 40em
realnames: true
buttonText:
autosave: true
cbColumn: true
nrColumn: true
filterRow: true
hide:
toolbar: 1
editorButtons: 1
```
"""
)
[docs]def send_course_group_mail(p: SisuDisplayName, u: User) -> None:
send_email(
u.email,
f"Kurssin {p.coursecode} Sisu-ryhmät on kopioitu TIMiin",
dedent(
f"""
Kurssin {p.coursecode} Sisussa olevat ryhmät on kopioitu TIMiin. Ne löytyvät dokumentista:
{current_app.config['TIM_HOST']}/view/{p.sisugroups_doc_path}
Dokumentissa on ohjeet ryhmien käyttämiseen TIMissä.
Tämä viesti tulee kaikille kurssin vastuuopettajille ja hallintohenkilöille.
"""
).strip(),
mail_from=app.config["NOREPLY_EMAIL"],
)
[docs]@dataclass
class PostGradesModel:
destCourse: str
docId: int
dryRun: bool
partial: bool
filterUsers: list[str] | None = None
includeUsers: MembershipFilter = field(
default=MembershipFilter.All, metadata={"by_value": True}
)
completionDate: datetime | None = None
groups: list[str] | None = None
[docs]def verify_sisu_assessments() -> None:
if not app.config["SISU_ASSESSMENTS_URL"]:
raise SisuError(app.config["SISU_ASSESSMENTS_DISABLED_MESSAGE"])
[docs]@sisu.post("/sendGrades")
@use_model(PostGradesModel)
def post_grades_route(m: PostGradesModel) -> Response:
verify_sisu_assessments()
result = json_response(
send_grades_to_sisu(
m.destCourse,
get_current_user_object(),
get_doc_or_abort(m.docId),
partial=m.partial,
dry_run=m.dryRun,
groups=m.groups,
filter_users=m.filterUsers,
completion_date=m.completionDate.astimezone(fin_timezone).date()
if m.completionDate
else None,
membership_filter=m.includeUsers,
)
)
if not m.dryRun:
db.session.commit()
return result
[docs]class IncorrectSettings(Exception):
pass
[docs]class SisuError(Exception):
pass
[docs]@dataclass
class PostAssessmentsErrorValue:
code: int
reason: str
# TODO: Temporary workaround to allow floats because Sisu API may sometimes erroneously return these.
credits: int | float | Missing = missing
gradeId: str | None = None
AssessmentErrors = dict[str, PostAssessmentsErrorValue]
[docs]@dataclass
class PostAssessmentsBody:
assessments: dict[int, AssessmentErrors]
[docs]@dataclass
class PostAssessmentsResponse:
body: PostAssessmentsBody | None = None
error: PostAssessmentsErrorValue | None = None
PostAssessmentsResponseSchema = class_schema(PostAssessmentsResponse)
[docs]@dataclass
class Assessment:
userName: str
gradeId: str
completionDate: str
completionCredits: int | None = None
privateComment: str | None = None
[docs] @validates("gradeId")
def validate_grade(self, value: str) -> None:
if value == "":
raise ValidationError("Cannot be empty")
if value not in ("0", "1", "2", "3", "4", "5", "HYV", "HYL", "HT", "TT"):
raise ValidationError(f'Cannot be "{value}"')
# if value == 'HYL':
# raise ValidationError('Sisu interface currently does not accept HYL grade')
[docs]def maybe_to_str(s: Any | None) -> str | None:
if s is None:
return s
return str(s)
[docs]@dataclass
class CandidateAssessment:
user: User
gradeId: Any
completionDate: Any
completionCredits: Any = None
privateComment: Any = None
sentGrade: Any = None
sentCredit: Any = None
[docs] def to_sisu_json(
self,
completion_date: str | None = None,
ensure_int_credit: bool = False,
) -> dict[str, str]:
result = {
"userName": self.user.name,
"gradeId": self.gradeId,
"completionDate": completion_date or self.completionDate,
}
if self.completionCredits:
c = self.completionCredits
if ensure_int_credit:
c = int(c)
result["completionCredits"] = c
if self.privateComment:
result["privateComment"] = self.privateComment
return result
@property
def is_fail_grade(self) -> bool:
return self.gradeId in ("HYL", "0")
@property
def is_passing_grade(self) -> bool:
return self.gradeId and not self.is_fail_grade
AssessmentSchema = class_schema(Assessment)
[docs]@csrf.exempt
@sisu.post("/assessments/<sisuid>")
def mock_assessments(sisuid: str) -> Response:
ok_names = {"us-1"}
j = request.get_json()
# Part of spec
assert j is not None and isinstance(j, dict)
assessments = j["assessments"]
partial = j["partial"]
return json_response(
{
"body": {
"assessments": {
str(i): {"userName": {"code": 40001, "reason": "Some reason."}}
for i, a in enumerate(assessments)
if a["userName"] not in ok_names
},
}
},
status_code=207 if partial else 400,
)
[docs]def call_sisu_assessments(sisu_id: str, json: dict[str, Any]) -> requests.Response:
verify_sisu_assessments()
url = f"{app.config['SISU_ASSESSMENTS_URL']}{sisu_id}"
return requests.post(
url,
json=json,
cert=app.config["SISU_CERT_PATH"],
)
[docs]def get_assessment_fields_to_save(
doc: DocInfo, c: CandidateAssessment
) -> dict[str, str]:
result = {
f"{doc.id}.completionDate": c.completionDate,
f"{doc.id}.sentGrade": c.gradeId,
}
if c.sentCredit is not None:
result[f"{doc.id}.sentCredit"] = c.sentCredit
return result
[docs]@dataclass
class AssessmentError:
message: str
assessment: CandidateAssessment
[docs]def send_grades_to_sisu(
sisu_id: str,
teacher: User,
doc: DocInfo,
partial: bool,
dry_run: bool,
completion_date: date | None,
filter_users: list[str] | None,
groups: list[str] | None,
membership_filter: MembershipFilter,
) -> dict[str, Any]:
assessments = get_sisu_assessments(
sisu_id, teacher, doc, groups, filter_users, membership_filter
)
if not completion_date:
completion_date = get_current_time().date()
users_to_update = {a.user.id for a in assessments if a.is_passing_grade}
completion_date_iso = completion_date.isoformat()
validation_errors = []
try:
AssessmentSchema(many=True).load(
[a.to_sisu_json(completion_date=completion_date_iso) for a in assessments]
)
except ValidationError as e:
msgs = e.messages
assert isinstance(msgs, dict)
# If completionCredits has a validation error, msgs will be like:
# {0: {'completionCredits': [['Invalid value.']]}}
# so we have to flatten the error list.
def flatten_error_list(err_list: Any) -> Any:
if (
isinstance(err_list, list)
and len(err_list) > 0
and isinstance(err_list[0], list)
):
return err_list[0]
return err_list
validation_errors = [
AssessmentError(
assessment=assessments[i],
message=", ".join(
x + ": " + ", ".join(flatten_error_list(y)) for x, y in a.items()
),
)
for i, a in msgs.items()
]
if not partial:
return {
"sent_assessments": [],
"default_selection": [],
"assessment_errors": validation_errors,
}
invalid_assessments_indices = {i for i in msgs.keys()}
assessments = [
a for i, a in enumerate(assessments) if i not in invalid_assessments_indices
]
# log_info(json.dumps(assessments, indent=4))
r = call_sisu_assessments(
sisu_id,
json={
"assessments": [
a.to_sisu_json(
completion_date=completion_date_iso,
ensure_int_credit=True,
)
for a in assessments
],
"partial": partial,
"dry_run": dry_run,
},
)
# log_info(json.dumps(r.json(), indent=4))
try:
pr: PostAssessmentsResponse = PostAssessmentsResponseSchema().load(r.json())
except JSONDecodeError:
log_warning(f"Sisu returned invalid JSON: {r.text}")
raise SisuError(
"Connection to Sisu is currently not working (Sisu gave an unexpected error)."
)
except ValidationError:
raise SisuError(f"Failed to validate Sisu JSON: {r.text}")
if pr.error:
raise SisuError(pr.error.reason)
assert pr.body is not None
invalid_assessments = {n for n in pr.body.assessments.keys()}
ok_assessments = [
a for i, a in enumerate(assessments) if i not in invalid_assessments
]
if not dry_run and r.status_code < 400:
for a in ok_assessments:
a.completionDate = completion_date_iso
a.sentGrade = a.gradeId
a.sentCredit = a.completionCredits
save_fields(
{
"savedata": [
{
"fields": get_assessment_fields_to_save(doc, a),
"user": a.user.id,
}
for a in ok_assessments
],
"allowMissing": True,
},
teacher,
current_doc=doc,
allow_non_teacher=False,
)
users_to_update = set()
errs = [
AssessmentError(
message="Sisu: " + ", ".join(list_reasons(v)),
assessment=assessments[k],
)
for k, v in pr.body.assessments.items()
]
all_errors = errs + validation_errors
error_users = {a.assessment.user.id for a in all_errors}
return {
"sent_assessments": ok_assessments if r.status_code < 400 else [],
"assessment_errors": all_errors,
"default_selection": sorted(list(users_to_update - error_users)),
}
[docs]def list_reasons(codes: AssessmentErrors) -> Generator[str, None, None]:
for k, v in codes.items():
if v.code == 40009 and v.gradeId and v.credits:
yield f"{v.reason} ({v.gradeId}, {v.credits} op)"
else:
yield v.reason
[docs]def get_sisu_assessments(
sisu_id: str,
teacher: User,
doc: DocInfo,
groups: list[str] | None,
filter_users: list[str] | None,
membership_filter: MembershipFilter,
) -> list[CandidateAssessment]:
teachers_group = UserGroup.get_teachers_group()
if teacher not in teachers_group.users:
raise AccessDenied("You are not a TIM teacher.")
pot_groups = get_potential_groups(teacher, course_filter=sisu_id)
if not any(
g.external_id.course_id == sisu_id
and (
g.external_id.is_responsible_teacher
or g.external_id.is_administrative_person
)
for g in pot_groups
):
raise AccessDenied(
f"You are neither a responsible teacher nor an administrative person of the course {sisu_id}."
)
if not teacher.has_teacher_access(doc):
raise AccessDenied("You do not have teacher access to the document.")
doc_settings = doc.document.get_settings()
if groups is None:
try:
groupsetting = doc_settings.group()
except ValueError as e:
raise IncorrectSettings(str(e)) from e
if not groupsetting:
raise IncorrectSettings(
'The document must have "group" setting that indicates the student group name.'
)
usergroups = [groupsetting]
else:
usergroups = groups
ugs = UserGroup.query.filter(UserGroup.name.in_(usergroups)).all()
requested = set(usergroups)
found = {ug.name for ug in ugs}
not_found_gs = requested - found
if not_found_gs:
raise IncorrectSettings(
f"Usergroup {seq_to_str(sorted(list(not_found_gs)))} not found."
)
for ug in ugs:
if not verify_group_view_access(ug, require=False):
raise AccessDenied(f'You do not have access to the group "{ug.name}".')
# The group doesn't have to be a Sisu group, but if it is, perform a couple of checks.
if ug.external_id:
if not ug.external_id.is_student:
raise IncorrectSettings(
f'The group "{ug.name}" is not a Sisu student group.'
)
if ug.external_id.course_id != sisu_id:
raise IncorrectSettings(
f'The associated course id "{ug.external_id.course_id}" '
f'of the group "{ug.name}" does not match the course setting "{sisu_id}".'
)
users, _, _, _ = get_fields_and_users(
["grade", "credit", "completionDate", "sentGrade", "sentCredit"],
RequestedGroups(ugs),
doc,
teacher,
default_view_ctx,
user_filter=User.name.in_(filter_users) if filter_users else None,
member_filter_type=membership_filter,
)
return [fields_to_assessment(r, doc) for r in users]
[docs]def fields_to_assessment(r: UserFieldObj, doc: DocInfo) -> CandidateAssessment:
fields = r["fields"]
grade = fields.get(f"{doc.id}.grade")
u = r["user"]
# TODO: Sisu accepts also 'privateComment' field.
result = CandidateAssessment(
gradeId=str(grade) if grade is not None else None,
user=u,
completionDate=fields.get(f"{doc.id}.completionDate"),
completionCredits=fields.get(f"{doc.id}.credit"),
sentGrade=fields.get(f"{doc.id}.sentGrade"),
sentCredit=fields.get(f"{doc.id}.sentCredit"),
)
return result