""""""
import hashlib
import json
from collections import defaultdict, OrderedDict
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import (
Iterable,
Any,
Generator,
TypeVar,
SupportsRound,
DefaultDict,
TypedDict,
ItemsView,
)
from bs4 import UnicodeDammit
from flask import current_app
from sqlalchemy import func, Numeric, Float, true
from sqlalchemy.dialects.postgresql import aggregate_order_by
from sqlalchemy.orm import selectinload, defaultload, Query, joinedload, contains_eager
from timApp.answer.answer import Answer
from timApp.answer.answer_models import AnswerTag, UserAnswer
from timApp.answer.pointsumrule import PointSumRule, PointType, Group
from timApp.document.viewcontext import OriginInfo
from timApp.plugin.plugintype import PluginType, PluginTypeLazy, PluginTypeBase
from timApp.plugin.taskid import TaskId
from timApp.timdb.sqa import db
from timApp.upload.upload import get_pluginupload
from timApp.user.user import Consent, User
from timApp.user.usergroup import UserGroup
from timApp.util.answerutil import (
task_ids_to_strlist,
AnswerPeriodOptions,
)
from timApp.util.logger import log_warning
from timApp.velp.annotation_model import Annotation
[docs]@dataclass
class ExistingAnswersInfo:
latest_answer: Answer | None
count: int
[docs]def get_latest_answers_query(task_id: TaskId, users: list[User]) -> Query:
if task_id.is_global:
q = Answer.query.filter_by(task_id=task_id.doc_task).order_by(Answer.id.desc())
else:
q = (
Answer.query.filter_by(task_id=task_id.doc_task)
.join(User, Answer.users)
.filter(User.id.in_([u.id for u in users]))
.group_by(Answer.id)
.with_entities(Answer.id)
.having(
(func.array_agg(aggregate_order_by(User.id, User.id)))
== sorted(u.id for u in users)
)
.subquery()
)
q = Answer.query.filter(Answer.id.in_(q)).order_by(Answer.id.desc())
return q
[docs]def get_latest_valid_answers_query(task_id: TaskId, users: list[User]) -> Query:
sq = (
Answer.query.filter_by(task_id=task_id.doc_task, valid=True)
.join(User, Answer.users)
.filter(User.id.in_([u.id for u in users]))
.group_by(User.id)
.with_entities(func.max(Answer.id).label("aid"), User.id.label("uid"))
.subquery()
)
datas = Answer.query.join(sq, Answer.id == sq.c.aid).with_entities(Answer)
return datas
[docs]def is_redundant_answer(
content: str,
existing_answers: ExistingAnswersInfo,
ptype: PluginTypeBase | None,
valid: bool,
) -> bool:
la = existing_answers.latest_answer
is_redundant = la and (la.content == content and la.valid == valid)
if is_redundant:
return True
if (
existing_answers.count == 0
and ptype
and ptype.get_type() == "rbfield"
and json.loads(content)["c"] == "0"
):
return True
return False
[docs]def save_answer(
users: list[User],
task_id: TaskId,
content: Any,
points: float | None,
tags: list[str] | None = None,
valid: bool = True,
points_given_by: int | None = None,
force_save: bool = False,
saver: User | None = None,
plugintype: PluginTypeLazy | None = None,
max_content_len: int | None = None,
origin: OriginInfo | None = None,
) -> Answer | None:
"""Saves an answer to the database.
:param max_content_len: Maximum length for answer content.
:param plugintype: The plugin type.
:param saver: Who saved the answer.
:param points_given_by: The usergroup id who gave the points, or None if they were given by a plugin.
:param tags: Tags for the answer.
:param valid: Whether the answer is considered valid (e.g. sent before deadline, etc.)
:param users: The users to which the answer belongs.
:param task_id: The id of the task.
:param content: The content of the answer.
:param points: Points for the task.
:param force_save: Whether to force to save the answer even if the latest existing answer has the same content.
:param origin: If known, the document from which the answer was sent.
"""
content_str = json.dumps(content)
content_len = len(content_str)
if max_content_len and content_len > max_content_len:
log_warning(f"Task {task_id.doc_task}: too large answer ({content_len})")
raise TooLargeAnswerException(
f"Answer is too large (size is {content_len} but maximum is {max_content_len})."
)
if tags is None:
tags = []
answerinfo = get_existing_answers_info(users, task_id)
if (
is_redundant_answer(content_str, answerinfo, plugintype, valid)
and not force_save
):
if answerinfo.latest_answer:
a = answerinfo.latest_answer
a.points = points
a.last_points_modifier = points_given_by
return None
a = Answer(
task_id=task_id.doc_task,
content=content_str,
points=points,
valid=valid,
last_points_modifier=points_given_by,
origin_doc_id=origin.doc_id if origin else None,
plugin_type=plugintype.resolve() if plugintype else None,
)
db.session.add(a)
for u in users:
a.users.append(u)
for tag in tags:
at = AnswerTag(tag=tag)
a.tags.append(at)
if saver:
a.saver = saver
db.session.flush()
return a
[docs]class AgeOptions(Enum):
MIN = "min"
MAX = "max"
ALL = "all"
[docs]class ValidityOptions(Enum):
ALL = "all"
VALID = "1"
INVALID = "0"
[docs]class NameOptions(Enum):
USERNAME = "username"
BOTH = "both"
ANON = "anonymous"
PSEUDO = "pseudonym"
[docs]class SortOptions(Enum):
USERNAME = "username"
TASK = "task"
[docs]class AnswerPrintOptions(Enum):
ALL = "all"
ANSWERS = "answers"
ANSWERS_NO_LINE = "answersnoline"
HEADER = "header"
KORPPI = "korppi"
[docs]@dataclass
class AllAnswersOptions(AnswerPeriodOptions):
group: str | None = None
age: AgeOptions = field(default=AgeOptions.MAX, metadata={"by_value": True})
valid: ValidityOptions = field(
default=ValidityOptions.VALID, metadata={"by_value": True}
)
name: NameOptions = field(default=NameOptions.BOTH, metadata={"by_value": True})
sort: SortOptions = field(default=SortOptions.TASK, metadata={"by_value": True})
format: FormatOptions = field(
default=FormatOptions.TEXT, metadata={"by_value": True}
)
consent: Consent | None = field(
default=None, metadata={"by_value": True, "data_key": "consentOpt"}
)
print: AnswerPrintOptions = field(
default=AnswerPrintOptions.ALL, metadata={"by_value": True}
)
salt: str | None = None
salt_len: int = field(default=32, metadata={"data_key": "saltLen"})
[docs]def get_all_answers(
task_ids: list[TaskId],
options: AllAnswersOptions,
) -> list[str] | list[dict]:
"""Gets all answers to the specified tasks.
:param task_ids: The ids of the tasks to get answers for.
:param options: The options for getting and printing the answers.
"""
print_header = options.print in (AnswerPrintOptions.ALL, AnswerPrintOptions.HEADER)
print_answers = options.print in (
AnswerPrintOptions.ALL,
AnswerPrintOptions.ANSWERS,
AnswerPrintOptions.ANSWERS_NO_LINE,
)
anon_name = options.name in (NameOptions.ANON, NameOptions.PSEUDO)
if options.period_from is None or options.period_to is None:
raise ValueError("Answer period must be specified.")
q = get_all_answer_initial_query(
options.period_from, options.period_to, task_ids, options.valid
)
q = q.options(defaultload(Answer.users).lazyload(User.groups))
if options.consent is not None:
q = q.filter_by(consent=options.consent)
match options.age:
case AgeOptions.MIN:
minmax = func.min(Answer.id).label("minmax")
counts = func.count(Answer.answered_on).label("count")
case AgeOptions.MAX:
minmax = func.max(Answer.id).label("minmax")
counts = func.count(Answer.answered_on).label("count")
case AgeOptions.ALL:
minmax = Answer.id.label("minmax")
counts = Answer.valid.label("count")
q = q.add_columns(minmax, counts)
if options.age != AgeOptions.ALL:
q = q.group_by(Answer.task_id, User.id)
q = q.with_entities(minmax, counts)
sub = q.subquery()
q = Answer.query.join(sub, Answer.id == sub.c.minmax).join(User, Answer.users)
q = q.outerjoin(PluginType).options(contains_eager(Answer.plugin_type))
match options.sort:
case SortOptions.USERNAME:
q = q.order_by(User.name, Answer.task_id, Answer.answered_on)
case SortOptions.TASK:
q = q.order_by(Answer.task_id, User.name, Answer.answered_on)
q = q.with_entities(Answer, User, sub.c.count)
result = []
result_json = []
lf = "\n"
if options.print == AnswerPrintOptions.ANSWERS_NO_LINE:
lf = ""
qq: Iterable[tuple[Answer, User, int]] = q
cnt = 0
hidden_user_names: dict[str, str] = {}
hashes = set()
def hasher(x: int) -> str:
nonlocal cnt
cnt += 1
return str(cnt)
if options.name == NameOptions.PSEUDO and options.salt is not None:
def hasher(x: int) -> str:
assert options.salt is not None
shake = hashlib.shake_256()
shake.update(options.salt.encode("utf-8"))
shake.update(str(x).encode("utf-8"))
hash_result = shake.hexdigest(options.salt_len)
if hash_result in hashes:
return f"{hash_result}-DUPLICATE"
hashes.add(hash_result)
return hash_result
for a, u, n in qq:
points = str(a.points)
if points == "None":
points = ""
name = u.name
ns = str(int(n)) # n may be a boolean, so convert to int (0/1) first
if anon_name:
name = hidden_user_names.get(u.name, None)
if not name:
name = f"user_{hasher(u.id)}"
hidden_user_names[u.name] = name
line = json.loads(a.content)
header = "; ".join(
[
name,
str(a.origin_doc_id),
a.task_id,
a.plugin_type.type if a.plugin_type else "",
str(a.answered_on),
ns,
points,
]
)
answ = json.dumps(line, ensure_ascii=False)
if isinstance(line, dict): # maybe csPlugin?
files = line.get("uploadedFiles")
if isinstance(files, list):
if len(files) == 1:
p = files[0]["path"]
prefix = "/uploads/"
if p.startswith(prefix):
p = p[len(prefix) :]
mt, pu = get_pluginupload(p)
if mt == "text/plain":
try:
answ = pu.data.decode()
except UnicodeDecodeError:
answ = UnicodeDammit(pu.data).unicode_markup
else:
answ = "ERROR: Uploaded file is binary; cannot show content."
else:
answ = f"ERROR: There are more than 1 file uploads ({len(files)}) in this answer; cannot show content."
elif "usercode" in line:
answ = str(line.get("usercode", "-"))
else:
if "points" in line: # empty csPlugin answer
answ = ""
match options.format:
case FormatOptions.TEXT:
res = ""
if options.name == NameOptions.BOTH:
header = str(u.real_name) + "; " + header
if print_header:
res = header
if print_answers:
res += lf + answ
if options.print == AnswerPrintOptions.KORPPI:
res = name + ";"
taskid = a.task_id
i = taskid.find(".")
if i >= 0:
taskid = taskid[i + 1 :]
res += taskid + ";" + answ.replace("\n", "\\n")
result.append(res)
case FormatOptions.JSON:
user_data = u if not anon_name else name
result_json.append(
dict(user=user_data, answer=a, count=int(n), resolved_content=answ)
)
if options.format == FormatOptions.TEXT:
return result
else:
return result_json
[docs]def get_all_answer_initial_query(
period_from: datetime,
period_to: datetime,
task_ids: list[TaskId],
valid: ValidityOptions,
) -> Query:
q = Answer.query.filter(
(period_from <= Answer.answered_on) & (Answer.answered_on < period_to)
).filter(Answer.task_id.in_(task_ids_to_strlist(task_ids)))
match valid:
case ValidityOptions.ALL:
pass
case ValidityOptions.INVALID:
q = q.filter_by(valid=False)
case ValidityOptions.VALID:
q = q.filter_by(valid=True)
q = q.join(User, Answer.users)
return q
[docs]def get_existing_answers_info(
users: list[User], task_id: TaskId
) -> ExistingAnswersInfo:
q = get_latest_answers_query(task_id, users)
latest = q.first()
count = q.count()
return ExistingAnswersInfo(latest_answer=latest, count=count)
basic_tally_fields = [
"total_points",
"velp_points",
"task_points",
"task_count",
"velped_task_count",
]
[docs]def valid_answers_query(task_ids: list[TaskId]) -> Query:
return Answer.query.filter(valid_taskid_filter(task_ids))
[docs]def valid_taskid_filter(task_ids: list[TaskId]) -> Query:
return Answer.task_id.in_(task_ids_to_strlist(task_ids)) & (Answer.valid == True)
[docs]class UserTaskEntry(TypedDict):
user: User
task_count: int
velped_task_count: int
total_points: float | None
task_points: float | None
velp_points: float | None
task_id: str
[docs]def get_users_for_tasks(
task_ids: list[TaskId],
user_ids: list[int] | None = None,
group_by_user: bool = True,
group_by_doc: bool = False,
answer_filter: Any | None = None,
) -> list[UserTaskEntry]:
if not task_ids:
return []
a3 = (
Annotation.query.filter_by(valid_until=None)
.group_by(Annotation.answer_id)
.with_entities(
Annotation.answer_id.label("annotation_answer_id"),
func.sum(Annotation.points).label("velp_points"),
)
.subquery()
)
a2 = Answer.query.with_entities(Answer.id, Answer.points).subquery()
if answer_filter is None:
answer_filter = true()
a1 = (
valid_answers_query(task_ids)
.filter(answer_filter)
.join(UserAnswer, UserAnswer.answer_id == Answer.id)
.group_by(UserAnswer.user_id, Answer.task_id)
.with_entities(
Answer.task_id,
UserAnswer.user_id.label("uid"),
func.max(Answer.id).label("aid"),
)
.subquery()
)
tmp = (
db.session.query(a1, a2, a3)
.join(a2, a1.c.aid == a2.c.id)
.outerjoin(a3, a3.c.annotation_answer_id == a1.c.aid)
.subquery()
)
main = User.query.join(UserAnswer, UserAnswer.user_id == User.id).join(
tmp, (tmp.c.aid == UserAnswer.answer_id) & (User.id == tmp.c.uid)
)
group_by_cols = []
cols = []
if not group_by_user:
min_task_id = func.min(tmp.c.task_id).label("task_id")
group_by_cols.append(tmp.c.task_id)
cols.append(min_task_id)
if group_by_doc:
doc_id = func.substring(tmp.c.task_id, r"(\d+)\..+").label("doc_id")
group_by_cols.append(doc_id)
cols.append(doc_id)
if user_ids is not None:
main = main.filter(User.id.in_(user_ids))
if current_app.config["LOAD_STUDENT_IDS_IN_TEACHER"]:
main = main.options(joinedload("uniquecodes"))
main = main.group_by(User.id, *group_by_cols)
# prevents error:
# column "usergroup_1.id" must appear in the GROUP BY clause or be used in an aggregate function
main = main.options(selectinload(User.groups))
task_sum = (
func.round(func.sum(tmp.c.points).cast(Numeric), 4)
.cast(Float)
.label("task_points")
)
velp_sum = (
func.round(func.sum(tmp.c.velp_points).cast(Numeric), 4)
.cast(Float)
.label("velp_points")
)
main = main.with_entities(
User,
func.count(tmp.c.task_id).label("task_count"),
task_sum,
velp_sum,
# TODO: The following does not work because PostgreSQL evaluates a+b==null if a==null or b==null
# We want a+b to be null only if BOTH are null. For now, the summing is done in Python.
# func.round((task_sum + velp_sum).cast(Numeric), 4).cast(Float).label('total_points'),
func.count(tmp.c.annotation_answer_id).label("velped_task_count"),
*cols,
).order_by(User.real_name)
def g() -> Generator[UserTaskEntry, None, None]:
for r in main:
d = r._asdict()
d["user"] = d.pop("User")
task = d["task_points"]
velp = d["velp_points"]
if task is not None and velp is not None:
tot = task + velp
elif task is not None:
tot = task
else:
tot = velp
d["total_points"] = tot
yield d
result = list(g())
return result
T = TypeVar("T", bound=SupportsRound[Any])
[docs]def sum_and_round(generator: Generator[T, None, None], digits: int = 2) -> T | None:
list_to_sum = list(generator)
if not list_to_sum:
return None
return round(sum(list_to_sum), digits) # type: ignore
[docs]def round_if_not_none(num: T | None, digits: int = 2) -> T | None:
if num is None:
return None
return round(num, digits)
[docs]class SumFields(TypedDict):
task_sum: float | None
velp_sum: float | None
total_sum: float | None
[docs]class UserPointGroup(SumFields):
tasks: list[UserTaskEntry]
velped_task_count: int
[docs]class UserPointInfo(SumFields):
groups: DefaultDict[str, UserPointGroup]
[docs]class ResultGroup(SumFields):
text: str
link: bool
linktext: str
[docs]class UserPoints(TypedDict):
total_points: float | None
task_points: float | None
velp_points: float | None
task_count: int
velped_task_count: int
groups: dict[str, ResultGroup]
user: User
[docs]def get_points_by_rule(
rule: PointSumRule | None,
task_ids: list[TaskId],
user_ids: list[int] | None = None,
answer_filter: Any | None = None,
force_user: User | None = None,
) -> (
list[UserPoints] | list[UserTaskEntry]
): # TODO: Would be better to return always same kind of list.
"""Computes the point sum from given tasks according to the given point rule.
:param force_user: Whether to force at least one result user if the result would be empty otherwise.
:param answer_filter: Optional additional filter for answers.
:param rule: The points rule.
:param task_ids: The list of task ids to consider.
:param user_ids: The list of users for which to compute the sum.
:return: The computed result.
"""
if not rule:
return get_users_for_tasks(task_ids, user_ids, answer_filter=answer_filter)
tasks_users = get_users_for_tasks(
task_ids, user_ids, group_by_user=False, answer_filter=answer_filter
)
result: DefaultDict[int, UserPointInfo] = defaultdict(
lambda: {
"groups": defaultdict(
lambda: {
"tasks": [],
"task_sum": None,
"velp_sum": None,
"total_sum": None,
"velped_task_count": 0,
}
),
"task_sum": None,
"velp_sum": None,
"total_sum": None,
}
)
task_counts: dict[int, int] = {}
user_map = {}
if not tasks_users and rule.force and force_user:
for t in task_ids:
tasks_users.append(
UserTaskEntry(
user=force_user,
task_count=0,
velped_task_count=0,
velp_points=None,
total_points=None,
task_points=None,
task_id=t.doc_task,
)
)
rule_groups = dict(rule.groups)
for tu in tasks_users:
u = tu["user"]
uid = u.id
user_map[uid] = u
if rule.count_all: # TODO: would this info already been somewhere cheaper?
c = task_counts.get(uid, 0)
c += 1
task_counts[uid] = c
group_names = list(rule.find_groups(tu["task_id"]))
if not group_names and rule.include_groupless:
gname = TaskId.parse(tu["task_id"]).task_name
group_names = [gname]
rule_groups[gname] = Group(gname, gname)
for grp in group_names:
result[uid]["groups"][grp]["tasks"].append(tu)
for user_id, task_groups in result.items():
groups = task_groups["groups"]
groupsums = []
for groupname, group in groups.items():
task_sum = None
velp_sum = None
gr = rule_groups[groupname]
if PointType.task in gr.point_types:
task_sum = sum_and_round(
t["task_points"]
for t in group["tasks"]
if t["task_points"] is not None
)
if PointType.velp in gr.point_types:
velp_sum = sum_and_round(
t["velp_points"]
for t in group["tasks"]
if t["velp_points"] is not None
)
group["velped_task_count"] = sum(
1 for t in group["tasks"] if t["velped_task_count"] > 0
)
if task_sum is not None and velp_sum is not None:
total_sum: float | None = task_sum + velp_sum
elif task_sum is not None:
total_sum = task_sum
elif velp_sum is not None:
total_sum = velp_sum
else:
total_sum = None
total_sum = (
min(max(total_sum, gr.min_points), gr.max_points)
if total_sum is not None
else None
)
group["task_sum"] = task_sum
group["velp_sum"] = velp_sum
group["total_sum"] = total_sum
groupsums.append((task_sum, velp_sum, total_sum))
groupsums = sorted(
groupsums,
reverse=rule.count_type == "best",
key=lambda x: (x[2] is not None, x[2]),
)
if rule.total:
s = groupsums[
0
] # TODO: find indesies from total, total is a list of names to count
task_groups["task_sum"] = round_if_not_none(s[0])
task_groups["velp_sum"] = round_if_not_none(s[1])
task_groups["total_sum"] = round_if_not_none(s[2])
else:
task_groups["task_sum"] = sum_and_round(
s[0] for s in groupsums[0 : rule.count_amount] if s[0] is not None
)
task_groups["velp_sum"] = sum_and_round(
s[1] for s in groupsums[0 : rule.count_amount] if s[1] is not None
)
task_groups["total_sum"] = sum_and_round(
s[2] for s in groupsums[0 : rule.count_amount] if s[2] is not None
)
return flatten_points_result(rule, rule_groups, result, task_counts, user_map)
[docs]def flatten_points_result(
rule: PointSumRule,
rule_groups: dict[str, Group],
result: DefaultDict[int, UserPointInfo],
task_counts: dict[int, int],
user_map: dict[int, User],
) -> list[UserPoints]:
result_list = []
hide_list = rule.hide
for user_id, task_groups in result.items():
row = UserPoints(
total_points=task_groups["total_sum"],
task_points=task_groups["task_sum"],
velp_points=task_groups["velp_sum"],
task_count=task_counts.get(user_id, 0)
if rule.count_all
else sum(
1
for t in task_groups["groups"].values()
if sum(x["task_count"] for x in t["tasks"]) > 0
),
velped_task_count=sum(
1 for t in task_groups["groups"].values() if t["velped_task_count"] > 0
),
groups=OrderedDict(),
user=user_map[user_id],
)
if rule.sort:
rulegroups: ItemsView[str, Group] | list[tuple[str, Group]] = sorted(
rule_groups.items()
)
else:
rulegroups = rule_groups.items()
for groupname, rg in rulegroups:
if hide_list and groupname in hide_list:
continue
gr = task_groups["groups"].get(groupname)
if gr:
task_sum = gr["task_sum"]
velp_sum = gr["velp_sum"]
total_sum = gr["total_sum"]
else:
task_sum = None
velp_sum = None
total_sum = None
try:
linktext = rg.linktext or rule.linktext
link = rg.link
except:
linktext = ""
link = False
try:
text = rg.expl.format(
groupname,
float(total_sum if total_sum is not None else 0),
float(task_sum if task_sum is not None else 0),
float(velp_sum if velp_sum is not None else 0),
)
except:
text = groupname + ": " + str(total_sum)
row["groups"][groupname] = {
"task_sum": task_sum,
"velp_sum": velp_sum,
"total_sum": total_sum,
"text": text,
"link": link,
"linktext": linktext,
}
result_list.append(row)
return result_list
[docs]def add_missing_users_from_group(result: list, usergroup: UserGroup) -> list:
users = set(usergroup.users)
existing_users = set()
for d in result:
existing_users.add(d["user"])
missing = users - existing_users
for d in missing:
result.append(
{
"task_count": 0,
"task_points": None,
"velp_points": 0.0,
"total_points": None,
"velped_task_count": 0,
"user": d,
"id": d.id,
"name": d.name,
"real_name": d.real_name,
"email": d.email,
}
)
# {'task_count': 1, 'task_points': None, 'velp_points': 0.0, 'total_points': None, 'velped_task_count': 0,
# 'user': <User 6>, 'id': 6, 'name': 'testiuser@testi.fi', 'real_name': 'Testi User', 'email': 'testiuser@testi.fi'}
return result