import itertools
import json
import re
import time
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, unique
from typing import Optional, DefaultDict, TypedDict, Any, Union
import attr
import dateutil.parser
from isodate import datetime_isoformat
from marshmallow import missing
from sqlalchemy import func, true
from sqlalchemy.orm import lazyload, joinedload
from timApp.answer.answer import Answer
from timApp.answer.answers import (
get_points_by_rule,
basic_tally_fields,
valid_answers_query,
)
from timApp.auth.accesshelper import get_doc_or_abort, AccessDenied
from timApp.document.docinfo import DocInfo
from timApp.document.usercontext import UserContext
from timApp.document.viewcontext import ViewContext
from timApp.plugin.plugin import find_task_ids, CachedPluginFinder
from timApp.plugin.pluginexception import PluginException
from timApp.plugin.taskid import TaskId
from timApp.user.groups import verify_group_view_access
from timApp.user.user import User, get_membership_end, get_membership_added
from timApp.user.usergroup import UserGroup
from timApp.util.flask.requesthelper import RouteException
from timApp.util.utils import widen_fields, get_alias, seq_to_str, fin_timezone
ALL_ANSWERED_WILDCARD = "*"
[docs]def chunks(l: list, n: int):
for i in range(0, len(l), n):
yield l[i : i + n]
tallyfield_re = re.compile(
r"tally:((?P<doc>\d+)\.)?(?P<field>[a-zA-Z0-9öäåÖÄÅ_-]+)(\[ *(?P<ds>[^\[\],]*) *, *(?P<de>[^\[\],]*) *\])?"
)
[docs]@attr.s(auto_attribs=True)
class TallyField:
"""In Jsrunner, represents the "tally:" type of field."""
field: str
doc_id: int | None
datetime_start: datetime | None
datetime_end: datetime | None
default_doc: DocInfo
@property
def effective_doc_id(self):
return self.doc_id or self.default_doc.id
@property
def grouping_key(self):
return f"{self.effective_doc_id}{self.datetime_start}{self.datetime_end}"
@property
def doc_and_field(self):
return f"{self.effective_doc_id}.{self.field}"
[docs] @staticmethod
def try_parse(s: str, default_doc: DocInfo) -> Optional["TallyField"]:
m = tallyfield_re.fullmatch(s)
if not m:
return None
try:
gds = m.group("ds")
gde = m.group("de")
ds, de = (
dateutil.parser.parse(gds) if gds else None,
dateutil.parser.parse(gde) if gde else None,
)
except (ValueError, OverflowError):
return None
if ds and ds.tzinfo is None:
ds = fin_timezone.localize(ds)
if de and de.tzinfo is None:
de = fin_timezone.localize(de)
doc = m.group("doc")
return TallyField(
field=m.group("field"),
datetime_start=ds,
datetime_end=de,
doc_id=int(doc) if doc else None,
default_doc=default_doc,
)
[docs]@unique
class MembershipFilter(Enum):
All = "all"
Current = "current"
Deleted = "deleted"
member_filter_relation_map = {
MembershipFilter.All: User.groups_dyn,
MembershipFilter.Current: User.groups,
MembershipFilter.Deleted: User.groups_inactive,
}
FieldValue = Union[str, float, None]
UserFields = dict[str, FieldValue]
[docs]class UserFieldObj(TypedDict):
user: User
fields: UserFields
styles: Any
[docs]@dataclass
class RequestedGroups:
groups: list[UserGroup]
include_all_answered: bool = False
[docs] @staticmethod
def from_name_list(group_names: list[str]):
return RequestedGroups(
groups=UserGroup.query.filter(UserGroup.name.in_(group_names)).all(),
include_all_answered=ALL_ANSWERED_WILDCARD in group_names,
)
[docs]class GetFieldsAccess(Enum):
RequireTeacher = 0
AllowMaybeNonTeacher = 1
AllowAlwaysNonTeacher = 2
[docs] @staticmethod
def from_bool(b: bool):
return (
GetFieldsAccess.AllowMaybeNonTeacher
if b
else GetFieldsAccess.RequireTeacher
)
[docs]def get_fields_and_users(
u_fields: list[str],
requested_groups: RequestedGroups,
d: DocInfo,
current_user: User,
view_ctx: ViewContext,
autoalias: bool = False,
add_missing_fields: bool = False,
access_option: GetFieldsAccess = GetFieldsAccess.RequireTeacher,
member_filter_type: MembershipFilter = MembershipFilter.Current,
user_filter=None,
) -> tuple[list[UserFieldObj], dict[str, str], list[str], list[UserGroup] | None]:
"""
Return fielddata, aliases, field_names
:param view_ctx: The view context.
:param user_filter: Additional filter to use.
:param member_filter_type: Whether to use all, current or deleted users in groups.
:param u_fields: list of fields to be used
:param requested_groups: requested user groups to be used
:param d: default document
:param current_user: current users, check his rights to fields
:param autoalias: if true, give automatically from d1 same as would be from d1 = d1
:param add_missing_fields: return estimated field even if it wasn't given previously
:param access_option: option specifying who is allowed to access fields (non-teachers, teachers)
:return: fielddata, aliases, field_names
"""
allow_non_teacher = False
if access_option == GetFieldsAccess.AllowAlwaysNonTeacher:
allow_non_teacher = True
elif access_option == GetFieldsAccess.AllowMaybeNonTeacher:
allow_non_teacher = not requested_groups.include_all_answered
needs_group_access_check = UserGroup.get_teachers_group() not in current_user.groups
ugroups = []
for group in requested_groups.groups:
if needs_group_access_check and group.name != current_user.name:
if not verify_group_view_access(group, current_user, require=False):
# raise AccessDenied(f'Missing view access for group {group.name}')
continue # TODO: study how to give just warning from missing access, extra return string?
ugroups.append(group)
if (
not ugroups and not requested_groups.include_all_answered
): # if no access, give at least own group
ugroups.append(current_user.get_personal_group())
groups = ugroups
task_ids = []
task_id_map = defaultdict(list)
alias_map = {}
jsrunner_alias_map = {}
doc_map = {}
try:
u_fields = widen_fields(u_fields)
except Exception as e:
raise RouteException(f"Problem with field names: {u_fields}\n{e}")
tasks_without_fields = []
tally_fields: list[tuple[TallyField, str | None]] = []
tasks_with_count_field = []
for field in u_fields:
try:
t, a, *rest = field.split("=")
except ValueError:
a = None
t, rest = field, None
if autoalias:
a = get_alias(t)
t = t.strip()
if a:
a = a.strip()
if rest:
raise RouteException(f"Invalid alias: {field}")
if a == "":
raise RouteException(f"Alias cannot be empty: {field}")
try:
task_id = TaskId.parse(
t,
require_doc_id=False,
allow_block_hint=False,
allow_type=False,
allow_custom_field=True,
)
except PluginException as e:
tally_field = TallyField.try_parse(t, d)
if not tally_field:
if t.startswith("tally:"):
raise RouteException(f"Invalid tally field format: {t}")
else:
raise RouteException(str(e))
else:
did = tally_field.effective_doc_id
tally_fields.append((tally_field, a))
alias_map_value = tally_field.doc_and_field
else:
if task_id.field is None:
tasks_without_fields.append(task_id)
elif task_id.field == "count":
tasks_with_count_field.append(task_id)
task_ids.append(task_id)
if not task_id.doc_id:
task_id.doc_id = d.id
task_id_map[task_id.doc_task].append(task_id)
did = task_id.doc_id
alias_map_value = task_id.extended_or_doc_task
if a:
alias_map[alias_map_value] = a
if a in jsrunner_alias_map:
raise RouteException(f"Duplicate alias {a} in fields attribute")
jsrunner_alias_map[a] = alias_map_value
if did in doc_map:
continue
dib = get_doc_or_abort(did, f"Document {did} not found")
if not (current_user.has_teacher_access(dib) or allow_non_teacher):
raise AccessDenied(f"Missing teacher access for document {dib.id}")
elif dib.document.get_settings().get(
"need_view_for_answers", False
) and not current_user.has_view_access(dib):
raise AccessDenied("Sorry, you don't have permission to use this resource.")
doc_map[did] = dib
cpf = CachedPluginFinder(
doc_map=doc_map,
curr_user=UserContext.from_one_user(current_user),
view_ctx=view_ctx,
)
if add_missing_fields:
for task in tasks_without_fields:
plug = cpf.find(task)
if plug:
task.field = plug.get_content_field_name()
else:
task.field = "c"
try:
alias_map[task.doc_task_with_field] = alias_map[task.doc_task]
jsrunner_alias_map[alias_map[task.doc_task]] = task.doc_task_with_field
del alias_map[task.doc_task]
except KeyError:
pass
group_id_set = {ug.id for ug in groups}
group_filter = UserGroup.id.in_([ug.id for ug in groups])
if user_filter is not None:
group_filter = group_filter & user_filter
join_relation = member_filter_relation_map[member_filter_type]
tally_field_values = get_tally_field_values(
d,
doc_map,
group_filter if not requested_groups.include_all_answered else None,
join_relation,
tally_fields,
view_ctx,
UserContext.from_one_user(current_user),
)
sub = []
# For some reason, with 7 or more fields, executing the following query is very slow in PostgreSQL 9.5.
# That's why we split the list of task ids in chunks of size 6 and merge the results.
# TODO: Investigate if this is still true for PostgreSQL 11.
not_global_taskids = [t for t in task_ids if not t.is_global]
for task_chunk in chunks(not_global_taskids, 6):
q = valid_answers_query(task_chunk).join(User, Answer.users)
if not requested_groups.include_all_answered:
q = q.join(UserGroup, join_relation).filter(group_filter)
elif user_filter is not None:
# Ensure user filter gets applied even if group filter is skipped in include_all_answered
q = q.filter(user_filter)
sub += (
q.group_by(Answer.task_id, User.id)
.with_entities(func.max(Answer.id), User.id)
.all()
)
aid_uid_map = defaultdict(list)
user_ids = set()
for aid, uid in sub:
aid_uid_map[aid].append(uid)
user_ids.add(uid)
q1 = User.query.join(UserGroup, join_relation).filter(group_filter)
if requested_groups.include_all_answered:
# if no group filter is given, attempt to get users that have valid answers only using the user
# ids from previous query
id_filter = User.id.in_(user_ids)
# Ensure that user filter gets applied even if group filter was None
if user_filter is not None:
id_filter = id_filter & user_filter
q2 = User.query.filter(id_filter)
q = q1.union(q2)
else:
q = q1
q = q.with_entities(User).order_by(User.id).options(lazyload(User.groups))
if member_filter_type != MembershipFilter.Current:
q = q.options(joinedload(User.memberships))
users: list[User] = q.all()
user_map = {}
for u in users:
user_map[u.id] = u
global_taskids = [t for t in task_ids if t.is_global]
global_answer_ids = (
valid_answers_query(global_taskids)
.group_by(Answer.task_id)
.with_entities(func.max(Answer.id))
.all()
)
answs = Answer.query.filter(
Answer.id.in_(itertools.chain((aid for aid, _ in sub), global_answer_ids))
).all()
answers_with_users: list[tuple[int, Answer | None]] = []
for a in answs:
uids = aid_uid_map.get(a.id)
if uids is not None:
for uid in uids:
answers_with_users.append((uid, a))
else:
# This is a global task, so add the answer for all users.
for uid in user_map.keys():
answers_with_users.append((uid, a))
missing_users = {u.id for u in users} - {uid for uid, _ in answers_with_users}
for mu in missing_users:
answers_with_users.append((mu, None))
answers_with_users.sort(key=lambda au: au[0])
counts: dict[int, dict[str, int]] = {}
if tasks_with_count_field:
for u in users:
counts[u.id] = {}
cnt = func.count(Answer.id).label("cnt")
answer_counts = (
Answer.query.filter(
Answer.task_id.in_([tid.doc_task for tid in tasks_with_count_field])
)
.join(User, Answer.users)
.filter(User.id.in_([u.id for u in users]))
.group_by(User.id, Answer.task_id)
.with_entities(User.id, Answer.task_id, cnt)
.all()
)
for (uid, taskid, count) in answer_counts:
counts[uid][taskid] = count
last_user = None
user_tasks = None
user_fieldstyles = None
user_index = -1
res: list[UserFieldObj] = []
for uid, a in answers_with_users:
if last_user != uid:
user_index += 1
tally_values = tally_field_values.get(uid)
user_tasks = {}
if tally_values:
user_tasks = user_tasks | {a: v for v, a in tally_values}
if tasks_with_count_field:
user_tasks = user_tasks | {
alias_map.get(
tid.extended_or_doc_task, tid.extended_or_doc_task
): counts.get(uid).get(tid.doc_task, 0)
for tid in tasks_with_count_field
}
user_fieldstyles = {}
user = users[user_index]
assert user.id == uid
obj = {"user": user, "fields": user_tasks, "styles": user_fieldstyles}
res.append(obj)
m_add = get_membership_added(user, group_id_set)
m_end = (
get_membership_end(user, group_id_set)
if member_filter_type != MembershipFilter.Current
else None
)
obj["groupinfo"] = {
"membership_add": time.mktime(m_add.timetuple()) if m_add else None,
"membership_end": time.mktime(m_end.timetuple()) if m_end else None,
}
last_user = uid
if not a:
continue
for task in task_id_map[a.task_id]:
value = None
style = None
if a:
json_str = a.content
p = json.loads(json_str)
if isinstance(p, dict):
style = p.get("styles")
if task.field == "points":
value = a.points
elif task.field == "datetime":
value = time.mktime(a.answered_on.timetuple())
elif task.field == "isodatetime":
value = datetime_isoformat(a.answered_on)
elif task.field == "ALL":
value = p
elif task.field == "count":
continue
else:
if task.field:
try:
value = p.get(task.field)
except:
value = json.dumps(p)
else:
plug = cpf.find(task)
if plug:
content_field = plug.get_content_field_name()
else:
content_field = "c"
if isinstance(p, dict):
value = p.get(content_field)
else:
value = p
user_tasks[
alias_map.get(task.extended_or_doc_task, task.extended_or_doc_task)
] = value
user_fieldstyles[
alias_map.get(task.extended_or_doc_task, task.extended_or_doc_task)
] = style
return (
res,
jsrunner_alias_map,
[
alias_map.get(ts.extended_or_doc_task, ts.extended_or_doc_task)
for ts in task_ids
],
groups,
)
[docs]def get_tally_field_values(
d: DocInfo,
doc_map: dict[int, DocInfo],
group_filter,
join_relation,
tally_fields: list[tuple[TallyField, str | None]],
view_ctx: ViewContext,
user_ctx: UserContext,
):
tally_field_values: DefaultDict[int, list[tuple[float, str]]] = defaultdict(list)
task_id_cache = {}
field_groups = itertools.groupby(tally_fields, key=lambda f: f[0].grouping_key)
for _, x in field_groups:
fs = list(x)
g = fs[0][0]
doc = doc_map[g.doc_id].document if g.doc_id else d.document
tids = task_id_cache.get(doc.doc_id)
if tids is None:
doc.insert_preamble_pars()
pars = doc.get_dereferenced_paragraphs(view_ctx)
tids = find_task_ids(pars, view_ctx, user_ctx, check_access=False)[0]
task_id_cache[doc.doc_id] = tids
ans_filter = true()
if g.datetime_start:
ans_filter = ans_filter & (Answer.answered_on >= g.datetime_start)
if g.datetime_end:
ans_filter = ans_filter & (Answer.answered_on < g.datetime_end)
psr = doc.get_settings().point_sum_rule()
pts = get_points_by_rule(
rule=psr,
task_ids=tids,
user_ids=User.query.join(UserGroup, join_relation)
.filter(group_filter)
.with_entities(User.id)
.subquery()
if group_filter is not None
else None,
answer_filter=ans_filter,
)
known_tally_fields = list(
itertools.chain(basic_tally_fields, psr.get_groups(tids) if psr else [])
)
for field, _ in fs:
if field.field not in known_tally_fields:
raise RouteException(
f"Unknown tally field: {field.field}. "
f"Valid tally fields are: {seq_to_str(known_tally_fields)}."
)
for r in pts:
u = r["user"]
groups = r.get("groups", None)
for field, alias in fs:
# The value can be None if the user has not done any tasks with points, so we use another sentinel.
value = r.get(field.field, missing)
if value is missing:
value = groups[
field.field
] # The group should exist because the field was validated above.
value = value["total_sum"]
tally_field_values[u.id].append((value, alias or field.doc_and_field))
return tally_field_values