Source code for timApp.answer.backup
from pathlib import Path
import filelock
from flask import current_app, Response
from timApp.answer.answer import Answer
from timApp.answer.exportedanswer import ExportedAnswer
from timApp.document.docentry import DocEntry
from timApp.timdb.sqa import db
from timApp.user.user import User
from timApp.user.usergroup import UserGroup
from timApp.user.usergroupmember import UserGroupMember, membership_current
from timApp.util.flask.responsehelper import to_json_str, ok_response
from timApp.util.logger import log_error
from timApp.util.secret import check_secret
[docs]def get_backup_answer_file() -> Path:
return (
Path(current_app.config["FILES_PATH"])
/ current_app.config["BACKUP_ANSWER_FILE"]
)
[docs]def save_answer_backup(answer: ExportedAnswer, secret: str) -> Response:
check_secret(secret, "BACKUP_ANSWER_RECEIVE_SECRET")
with filelock.FileLock(f"/tmp/answer_backup"):
with get_backup_answer_file().open("a") as f:
f.write(to_json_str(answer) + "\n")
return ok_response()
[docs]def send_answer_backup_if_enabled(a: Answer) -> None:
if current_app.config["BACKUP_ANSWER_SEND_SECRET"] is None:
return
from timApp.tim_celery import send_answer_backup
doc_id = a.parsed_task_id.doc_id
assert doc_id is not None
doc = DocEntry.find_by_id(doc_id)
assert doc is not None
num_users = len(a.users_all)
if num_users > 1:
log_error(
f"Multiple users not supported for answer backup (answer id {a.id} has {num_users} users)"
)
return
send_answer_backup.delay(
{
"email": a.users_all[0].email,
"content": a.content,
"valid": a.valid,
"points": a.points,
"time": a.answered_on.isoformat(),
"task": a.task_name,
"doc": doc.path,
"host": current_app.config["TIM_HOST"],
}
)
[docs]def sync_user_group_memberships_if_enabled(user: User) -> None:
if current_app.config["SYNC_USER_GROUPS_SEND_SECRET"] is None:
return
from timApp.tim_celery import sync_user_group_memberships
# Do a manual query to ensure there is no relationship cache in the middle
user_groups: list[str] = [
ugn
for ugn, in (
db.session.query(UserGroup.name)
.join(
UserGroupMember,
(UserGroup.id == UserGroupMember.usergroup_id) & membership_current,
)
.filter(UserGroupMember.user_id == user.id)
.all()
)
]
sync_user_group_memberships.delay(user.email, user_groups)