import json
import sys
from dataclasses import dataclass
from datetime import datetime
from typing import Sequence, Optional
import click
from flask.cli import AppGroup
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from timApp.admin.datetimetype import DateTimeType
from timApp.admin.timitemtype import TimDocumentType, TimItemType
from timApp.admin.util import commit_if_not_dry
from timApp.answer.answer import Answer, AnswerSaver
from timApp.answer.answer_models import UserAnswer, AnswerUpload
from timApp.answer.answers import valid_answers_query
from timApp.document.docinfo import DocInfo
from timApp.folder.folder import Folder
from timApp.item.block import Block
from timApp.item.item import Item
from timApp.plugin.taskid import TaskId
from timApp.upload.uploadedfile import PluginUpload
from timApp.user.user import User
from timApp.user.usergroup import UserGroup
from timApp.util.pdftools import (
is_pdf_producer_ghostscript,
compress_pdf,
CompressionError,
)
from timApp.velp.annotation_model import Annotation
from timApp.velp.velp_models import AnnotationComment
answer_cli = AppGroup("answer")
@answer_cli.command()
@click.option("--dry-run/--no-dry-run", default=True)
def fix_double_c(dry_run: bool) -> None:
answers: list[Answer] = (
Answer.query.filter(
(Answer.answered_on > datetime(year=2020, month=2, day=9))
& Answer.content.startswith('{"c": {"c":')
)
.order_by(Answer.id)
.all()
)
count = 0
for a in answers:
cont = a.content_as_json
if not isinstance(cont, dict):
continue
c = cont.get("c")
if isinstance(c, dict):
if "c" in c:
print(f"Modifying {a.id} ({a.task_id}, {a.answered_on})")
count += 1
if not dry_run:
a.content = json.dumps(c)
print(f"Total {count}")
commit_if_not_dry(dry_run)
[docs]@dataclass
class AnswerDeleteResult:
useranswer: int
answersaver: int
annotation: int
annotationcomment: int
answer: int
@answer_cli.command()
@click.argument("doc", type=TimDocumentType())
@click.option("--dry-run/--no-dry-run", default=True)
def clear_all(doc: DocInfo, dry_run: bool) -> None:
ids = (
Answer.query.filter(Answer.task_id.startswith(f"{doc.id}."))
.with_entities(Answer.id)
.all()
)
cnt = len(ids)
delete_answers_with_ids(ids)
click.echo(f"Total {cnt}")
commit_if_not_dry(dry_run)
@answer_cli.command()
@click.argument("doc", type=TimDocumentType())
@click.option("--dry-run/--no-dry-run", default=True)
@click.option("--task", "-t", multiple=True)
@click.option("--answer_from", "-af", type=click.DateTime(), default=None)
@click.option("--answer_to", "-at", type=click.DateTime(), default=None)
@click.option("--verbose/--no-verbose", "-v", default=False)
def clear(
doc: DocInfo,
dry_run: bool,
task: list[str],
answer_from: datetime | None,
answer_to: datetime | None,
verbose: bool,
) -> None:
tasks_to_delete = [f"{doc.id}.{t}" for t in task]
q = Answer.query.filter(Answer.task_id.in_(tasks_to_delete))
if answer_from:
q = q.filter(Answer.answered_on >= answer_from)
if answer_to:
q = q.filter(Answer.answered_on <= answer_to)
ids = q.with_entities(Answer.id).all()
cnt = len(ids)
result = delete_answers_with_ids(ids, verbose)
click.echo(f"Total {cnt}")
commit_if_not_dry(dry_run)
[docs]def delete_answers_with_ids(
ids: list[int], verbose: bool = False
) -> AnswerDeleteResult:
if not isinstance(ids, list):
raise TypeError("ids should be a list of answer ids")
d_ua = UserAnswer.query.filter(UserAnswer.answer_id.in_(ids)).delete(
synchronize_session=False
)
d_as = AnswerSaver.query.filter(AnswerSaver.answer_id.in_(ids)).delete(
synchronize_session=False
)
anns = Annotation.query.filter(Annotation.answer_id.in_(ids))
d_acs = AnnotationComment.query.filter(
AnnotationComment.annotation_id.in_(anns.with_entities(Annotation.id))
).delete(synchronize_session=False)
d_anns = anns.delete(synchronize_session=False)
ans_items = Answer.query.filter(Answer.id.in_(ids))
if verbose:
click.echo(
"\n".join(
[
f"taskid: {a.task_id}, points: {a.points}, answered_on: {a.answered_on}; saver: {a.saver}"
for a in ans_items
]
)
)
d_ans = ans_items.delete(synchronize_session=False)
return AnswerDeleteResult(
useranswer=d_ua,
answersaver=d_as,
annotation=d_anns,
annotationcomment=d_acs,
answer=d_ans,
)
@answer_cli.command()
@click.argument("doc", type=TimDocumentType())
@click.option("--deadline", type=DateTimeType(), required=True)
@click.option("--group", required=True)
@click.option("--dry-run/--no-dry-run", default=True)
@click.option("--may-invalidate/--no-may-invalidate", default=False)
def revalidate(
doc: DocInfo, deadline: datetime, group: str, dry_run: bool, may_invalidate: bool
) -> None:
answers: list[tuple[Answer, str]] = (
Answer.query.filter(Answer.task_id.startswith(f"{doc.id}."))
.join(User, Answer.users)
.join(UserGroup, User.groups)
.filter(UserGroup.name == group)
.order_by(Answer.answered_on.desc())
.with_entities(Answer, User.name)
.all()
)
changed_to_valid = 0
changed_to_invalid = 0
for a, name in answers:
if a.answered_on < deadline and not a.valid:
changed_to_valid += 1
a.valid = True
click.echo(
f"Changing to valid: {name}, {a.task_name}, {a.answered_on}, {a.points}"
)
elif a.answered_on >= deadline and a.valid and may_invalidate:
changed_to_invalid += 1
a.valid = False
click.echo(
f"Changing to invalid: {name}, {a.task_name}, {a.answered_on}, {a.points}"
)
total = len(answers)
click.echo(
f"Changing {changed_to_valid} to valid, {changed_to_invalid} to invalid."
)
click.echo(f"Total answers in document for group: {total}")
commit_if_not_dry(dry_run)
@answer_cli.command()
@click.argument("doc", type=TimDocumentType())
@click.option("--limit", required=True, type=int)
@click.option("--to", required=True, type=int)
@click.option("--dry-run/--no-dry-run", default=True)
def truncate_large(doc: DocInfo, limit: int, to: int, dry_run: bool) -> None:
if limit < to:
click.echo("limit must be >= to")
sys.exit(1)
q = Answer.query.filter(Answer.task_id.startswith(f"{doc.id}."))
total = q.count()
anss: list[Answer] = (
q.filter(func.length(Answer.content) > limit)
.options(joinedload(Answer.users_all))
.all()
)
note = " (answer truncated)"
try_keys = ["usercode", "c", "userinput"]
truncated = 0
for a in anss:
diff = len(a.content) - to
if diff > 0:
loaded = a.content_as_json
if not isinstance(loaded, dict):
continue
for k in try_keys:
c = loaded.get(k)
if c:
c_diff = len(c) - to
if c_diff <= 0:
continue
try:
new_c = c[: -(c_diff + len(note))] + note
except IndexError:
continue
name = a.users_all[0].name if a.users_all else "(orphan)"
print(
f"Truncating: {a.task_id}, {name}, {a.answered_on}, length {len(a.content)}"
)
truncated += 1
loaded[k] = new_c
a.content = json.dumps(loaded)
break
print(f"Truncating {truncated} answers (out of {total}).")
commit_if_not_dry(dry_run)
@answer_cli.command()
@click.argument("item", type=TimItemType())
@click.option("--dry-run/--no-dry-run", default=True)
def compress_uploads(item: Item, dry_run: bool) -> None:
docs = collect_docs(item)
for d in docs:
uploads: list[Block] = (
Answer.query.filter(Answer.task_id.startswith(f"{d.id}."))
.join(AnswerUpload)
.join(Block)
.with_entities(Block)
.all()
)
for u in uploads:
path = u.description
if path.lower().endswith(".pdf"):
uf = PluginUpload(u)
if is_pdf_producer_ghostscript(uf):
click.echo(
f"Skipping already processed PDF {uf.relative_filesystem_path}"
)
else:
if dry_run:
click.echo(f"Would compress PDF {uf.relative_filesystem_path}")
continue
try:
old_size = uf.size
except FileNotFoundError:
click.echo(f"PDF {uf.relative_filesystem_path} not found.")
continue
if old_size == 0:
click.echo(
f"PDF {uf.relative_filesystem_path} has size 0; skipping."
)
continue
if not uf.is_content_pdf:
click.echo(
f"PDF {uf.relative_filesystem_path} content is not PDF; skipping."
)
continue
click.echo(
f"Compressing PDF {uf.relative_filesystem_path}... ", nl=False
)
try:
compress_pdf(uf)
except CompressionError:
click.echo(
f"Failed to compress PDF {uf.relative_filesystem_path}; it may be corrupted."
)
continue
new_size = uf.size
percent = round((old_size - new_size) / old_size * 100)
click.echo(
f"done, size: {old_size} -> {new_size} (reduced by {percent}%)"
)
[docs]def collect_docs(item: Item) -> Sequence[DocInfo]:
if isinstance(item, Folder):
docs: Sequence[DocInfo] = item.get_all_documents(include_subdirs=True)
elif isinstance(item, DocInfo):
docs = [item]
else:
raise Exception("Unknown item type")
return docs
[docs]@dataclass
class DeleteResult:
total: int
deleted: int
adr: AnswerDeleteResult
@property
def remaining(self) -> int:
return self.total - self.deleted
[docs]def delete_old_answers(d: DocInfo, tasks: list[str]) -> DeleteResult:
base_query = valid_answers_query(
[TaskId(doc_id=d.id, task_name=t) for t in tasks]
).join(User, Answer.users)
latest = base_query.group_by(Answer.task_id, User.id).with_entities(
func.max(Answer.id)
)
todelete = base_query.filter(Answer.id.notin_(latest)).with_entities(Answer.id)
tot = base_query.count()
del_tot = todelete.count()
adr = delete_answers_with_ids(todelete.all())
r = DeleteResult(total=tot, deleted=del_tot, adr=adr)
return r
@answer_cli.command()
@click.argument("item", type=TimItemType())
@click.option("--task", "-t", multiple=True)
@click.option("--dry-run/--no-dry-run", default=True)
def delete_old(item: Item, task: list[str], dry_run: bool) -> None:
"""Deletes all older than latest answers from the specified tasks.
This is useful especially for deleting field history in documents where jsrunner is used a lot.
"""
docs = collect_docs(item)
for d in docs:
r = delete_old_answers(d, task)
click.echo(
f"Deleting {r.deleted} of {r.total} answers from {d.path}, remaining {r.remaining}. {r.adr}"
)
commit_if_not_dry(dry_run)