import json
import time
from dataclasses import dataclass, field
from datetime import timedelta
from random import randrange
from typing import Optional
import dateutil.parser
from flask import Blueprint, render_template, g
from flask import Response
from flask import current_app
from flask import request
from flask import session
from sqlalchemy import func
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import StaleDataError
from timApp.auth.accesshelper import (
verify_ownership,
get_doc_or_abort,
has_ownership,
verify_edit_access,
)
from timApp.auth.login import log_in_as_anonymous
from timApp.auth.sessioninfo import (
get_current_user_id,
logged_in,
get_current_user_object,
)
from timApp.document.docentry import DocEntry
from timApp.document.randutils import hashfunc
from timApp.errorhandlers import suppress_wuff
from timApp.lecture.askedjson import get_asked_json_by_hash, AskedJson
from timApp.lecture.askedquestion import (
AskedQuestion,
get_asked_question,
user_activity_lock,
)
from timApp.lecture.lecture import Lecture
from timApp.lecture.lectureanswer import LectureAnswer, get_totals
from timApp.lecture.lectureutils import (
is_lecturer_of,
verify_is_lecturer,
get_current_lecture_info,
)
from timApp.lecture.message import Message
from timApp.lecture.question import Question
from timApp.lecture.question_utils import (
calculate_points_from_json_answer,
create_points_table,
qst_handle_randomization,
)
from timApp.lecture.questionactivity import QuestionActivityKind, QuestionActivity
from timApp.lecture.runningquestion import Runningquestion
from timApp.lecture.showpoints import Showpoints
from timApp.lecture.useractivity import Useractivity
from timApp.plugin.qst.qst import get_question_data_from_document
from timApp.timdb.sqa import db, tim_main_execute
from timApp.user.user import User
from timApp.util.flask.requesthelper import (
get_option,
verify_json_params,
use_model,
RouteException,
NotExist,
)
from timApp.util.flask.responsehelper import json_response, ok_response, empty_response
from timApp.util.logger import log_info
from timApp.util.utils import get_current_time
lecture_routes = Blueprint("lecture", __name__, url_prefix="")
[docs]@dataclass
class AskedIdModel:
asked_id: int
[docs]@lecture_routes.get("/getLectureInfo")
def get_lecture_info():
"""Route to get info from lectures.
Gives answers, and messages and other necessary info.
"""
lecture = get_lecture_from_request(check_access=False)
messages = lecture.messages.order_by(Message.timestamp.asc()).all()
is_lecturer = is_lecturer_of(lecture)
u = get_current_user_object()
lecture_questions: list[AskedQuestion] = (
lecture.asked_questions.options(
joinedload(AskedQuestion.answers_all).raiseload(
LectureAnswer.asked_question
)
)
.options(
joinedload(AskedQuestion.answers_all)
.joinedload(LectureAnswer.user)
.raiseload(User.groups)
)
.all()
)
if is_lecturer or u.is_admin:
answers: list[LectureAnswer] = [
a for q in lecture_questions for a in q.answers_all
]
answerers = list({a.user for a in answers})
else:
answers = [
a for q in lecture_questions for a in q.answers_all if a.user_id == u.id
]
answerers = [get_current_user_object()]
return json_response(
{
"answerers": answerers,
"answers": [
a.to_json(include_question=False, include_user=False) for a in answers
],
"isLecturer": is_lecturer,
"messages": messages,
"questions": lecture_questions,
},
date_conversion=True,
)
[docs]@lecture_routes.get("/getLectureAnswerTotals/<int:lecture_id>")
def get_lecture_answer_totals(lecture_id):
lec = Lecture.find_by_id(lecture_id)
is_lecturer = is_lecturer_of(lec)
u = get_current_user_object()
results = get_totals(lec, None if is_lecturer or u.is_admin else u)
sum_field_name = get_option(request, "sum_field_name", "sum")
count_field_name = get_option(request, "count_field_name", "count")
def generate_text():
for u, p, c in results:
yield f"{u.name};{sum_field_name};{p}\n"
yield "\n"
for u, p, c in results:
yield f"{u.name};{count_field_name};{c}\n"
return Response(generate_text(), mimetype="text/plain")
[docs]@lecture_routes.get("/getAllMessages")
def get_all_messages():
"""Route to get all the messages from some lecture."""
lecture = get_lecture_from_request(check_access=False)
messages = lecture.messages.order_by(Message.timestamp.asc()).all()
return json_response(messages, date_conversion=True)
[docs]@dataclass
class GetUpdatesModel:
client_last_id: int = field(metadata={"data_key": "c"})
current_points_id: int | None = field(metadata={"data_key": "p"}, default=None)
current_question_id: int | None = field(metadata={"data_key": "i"}, default=None)
doc_id: int | None = field(metadata={"data_key": "d"}, default=None)
use_questions: bool = field(metadata={"data_key": "q"}, default=False)
use_wall: bool = field(metadata={"data_key": "m"}, default=False)
[docs]@lecture_routes.get("/getUpdates")
@use_model(GetUpdatesModel)
def get_updates(m: GetUpdatesModel):
# taketime("before update")
ret = do_get_updates(m)
db.session.commit()
# taketime("after update")
return json_response(ret, date_conversion=True)
[docs]@lecture_routes.before_request
def lecture_before_request():
tim_main_execute("SET LOCAL lock_timeout = '1s'")
EXTRA_FIELD_NAME = "extra"
[docs]@suppress_wuff(
OperationalError,
"https://gitlab.com/tim-jyu/tim/-/issues/1975",
r"LockNotAvailable.*canceling statement due to lock timeout",
)
def do_get_updates(m: GetUpdatesModel):
"""Gets updates from some lecture.
Checks updates in 1 second frequently and answers if there is updates.
"""
client_last_id = m.client_last_id
current_question_id = m.current_question_id
current_points_id = m.current_points_id
use_wall = m.use_wall
use_questions = m.use_questions
session["use_questions"] = use_questions
step = 0
lecture = get_current_lecture()
doc_id = m.doc_id
if not lecture:
return get_running_lectures(doc_id)
lecture_id = lecture.lecture_id
if not lecture.is_running:
empty_lecture(lecture)
db.session.commit()
return get_running_lectures(doc_id)
list_of_new_messages = []
lecturers = []
students = []
u = get_current_user_object()
user_name = u.name
update_activity(lecture, u)
options = lecture.options_parsed
teacher_poll = options.get("teacher_poll", "")
teacher_poll = teacher_poll.split(";")
poll_interval_ms = 4000
long_poll = False
poll_interval_t_ms = 4000
long_poll_t = False
# noinspection PyBroadException
try:
poll_interval_ms = int(float(options.get("poll_interval", 4)) * 1000)
long_poll = bool(options.get("long_poll", False))
except:
pass
# noinspection PyBroadException
try:
poll_interval_t_ms = int(float(options.get("poll_interval_t", 1)) * 1000)
long_poll_t = bool(options.get("long_poll_t", False))
except:
pass
poll_interval_ms += randrange(-100, 500)
if teacher_poll:
# noinspection PyBroadException
try:
if teacher_poll.index(user_name) >= 0:
poll_interval_ms = poll_interval_t_ms
long_poll = long_poll_t
except:
pass
is_lecturer = is_lecturer_of(lecture)
if is_lecturer:
poll_interval_ms = poll_interval_t_ms
long_poll = long_poll_t
if long_poll:
poll_interval_ms = 1000
lecture_ending = 100
base_resp = None
basic_info = {
"ms": poll_interval_ms,
}
while step <= 10:
lecture = get_current_lecture()
if not lecture:
return get_running_lectures(doc_id)
lecture_ending = check_if_lecture_is_ending(lecture)
if is_lecturer:
lecturers, students = get_lecture_users(lecture)
# Gets new messages if the wall is in use.
if use_wall:
list_of_new_messages = (
lecture.messages.filter(Message.msg_id > client_last_id)
.order_by(Message.msg_id.asc())
.all()
)
# Check if current question is still running and user hasn't already answered on it on another tab.
if current_question_id:
q = get_asked_question(current_question_id)
if q and q.running_question:
# Always report question end time in case it has been stopped or extended.
basic_info["question_end_time"] = q.running_question.end_time
base_resp = {
**basic_info,
"msgs": list_of_new_messages,
"lectureEnding": lecture_ending,
"lectureId": lecture_id,
"lecturers": lecturers,
"students": students,
}
if current_points_id:
q = get_asked_question(current_points_id)
if q and q.has_activity(QuestionActivityKind.Pointsclosed, u):
return {
**base_resp,
EXTRA_FIELD_NAME: {
"points_closed": True,
},
}
# Gets new questions if the questions are in use.
if use_questions:
new_question = get_new_question(
lecture, current_question_id, current_points_id
)
if new_question:
return {**base_resp, EXTRA_FIELD_NAME: new_question}
if list_of_new_messages:
return base_resp
if not long_poll or current_app.config["TESTING"]:
# Don't loop when testing.
break
# Database updates may have happened during sleep, so we have to expire all objects so that they will be
# reloaded. Additionally, we don't want to keep the connection open during sleep, so we call commit()
# instead of expire_all().
db.session.commit()
# For long poll wait 1 sec before new check.
time.sleep(1)
step += 1
if lecture_ending != 100 or lecturers or students:
return base_resp
# No new updates, except possibly new question end time.
return basic_info
[docs]@lecture_routes.get("/getQuestionManually")
def get_question_manually():
"""Route to use to get question manually (instead of getting question in /getUpdates)."""
lecture = get_current_lecture_or_abort()
new_question = get_new_question(lecture, None, None, True)
return json_response(new_question, date_conversion=True)
[docs]def hide_points_and_try_shuffle_question(question: AskedQuestion, user_id: int):
# Hides points from question json and shuffles rows if required
q_copy = question.to_json(hide_points=True)
q_json = {"markup": q_copy["json"]["json"], "user_id": user_id}
qst_handle_randomization(q_json)
q_copy["json"]["json"] = q_json["markup"]
return q_copy
[docs]def q_log(s: str):
log_info(s)
[docs]def get_new_question(
lecture: Lecture, current_question_id=None, current_points_id=None, force=False
):
"""
:param current_points_id: TODO: what is this?
:param current_question_id: The id of the current question.
:param lecture: lecture to get running questions from
:param force: Return question, even if it already has been shown to user
:return: None if no questions are running
dict with data of new question if there is a question running and user hasn't answered to that question.
{'type': 'already_answered'} if there is a question running and user has answered to that.
"""
current_user = get_current_user_id()
u = get_current_user_object()
rqs: list[Runningquestion] = lecture.running_questions
with user_activity_lock(u):
if rqs and rqs[0].asked_question.is_running:
question: AskedQuestion = rqs[0].asked_question
asked_id = question.asked_id
already_shown = question.has_activity(QuestionActivityKind.Usershown, u)
already_answered = question.has_activity(
QuestionActivityKind.Useranswered, u
)
s = f"q: {u.name}, r, as={already_shown is not None}, aa={already_answered is not None}, f={force}, aid={asked_id}"
if already_answered:
if force:
q_log(f"{s}, ret=already_answered")
return {"type": "already_answered"}
else:
q_log(f"{s}, ret=None")
return None
if (not already_shown or force) or (asked_id != current_question_id):
q = get_asked_question(asked_id)
answer = q.answers.filter_by(user_id=current_user).first()
question.add_activity(QuestionActivityKind.Usershown, u)
if answer:
q_log(f"{s}, ret=answer")
return {"type": "answer", "data": answer}
else:
q_log(f"{s}, ret=question")
return {
"type": "question",
"data": q
if lecture.lecturer == current_user
else hide_points_and_try_shuffle_question(q, current_user),
}
q_log(f"{s}, ret=None")
else:
question_to_show_points = get_shown_points(lecture)
s = ""
if question_to_show_points:
asked_id = question_to_show_points.asked_id
already_shown = question_to_show_points.has_activity(
QuestionActivityKind.Pointsshown, u
)
already_closed = question_to_show_points.has_activity(
QuestionActivityKind.Pointsclosed, u
)
s = f"q: {u.name}, nr, as={already_shown is not None}, ac={already_closed is not None}, f={force}, aid={asked_id}"
if already_closed:
if force:
db.session.delete(already_closed)
else:
q_log(f"{s}, ret=None")
return None
if not (already_shown or force) or (asked_id != current_points_id):
question = get_asked_question(asked_id)
question.add_activity(QuestionActivityKind.Pointsshown, u)
answer = question.answers.filter_by(user_id=current_user).first()
if answer:
q_log(f"{s}, ret=result")
return {"type": "result", "data": answer}
q_log(s or f"q: {u.name}, nr, f={force}, ret=None")
return None
[docs]def get_shown_points(lecture) -> AskedQuestion | None:
return lecture.asked_questions.join(Showpoints).first()
[docs]def check_if_lecture_is_ending(lecture: Lecture):
"""Checks if the lecture is about to end. 1 -> ends in 1 min. 5 -> ends in 5 min. 100 -> goes on atleast for 5 mins.
:param lecture: The lecture object.
:return:
"""
lecture_ending = 100
if is_lecturer_of(lecture):
time_now = get_current_time()
ending_time = lecture.end_time
time_left = ending_time - time_now
if time_left.total_seconds() <= 60:
return 1
elif time_left.total_seconds() <= 60 * 5:
return 5
return lecture_ending
[docs]@dataclass
class SendMessageModel:
message: str
[docs]@lecture_routes.post("/sendMessage")
@use_model(SendMessageModel)
def send_message(m: SendMessageModel):
"""Route to add message to database."""
lecture = get_current_lecture_or_abort()
msg = Message(message=m.message, user_id=get_current_user_id())
lecture.messages.append(msg)
db.session.commit()
return json_response(msg, date_conversion=True)
[docs]def get_lecture_session_data():
for k in ("use_questions",):
if session.get(k) is None:
session[k] = True
return {
"useQuestions": session["use_questions"],
}
[docs]def lecture_dict(lecture: Lecture):
info = get_current_lecture_info()
lecturers, students = get_lecture_users(lecture) if info.is_lecturer else ([], [])
return {
"lecture": lecture,
"isInLecture": info.in_lecture,
"isLecturer": info.is_lecturer,
"lecturers": lecturers,
"students": students,
**get_lecture_session_data(),
}
[docs]@lecture_routes.get("/checkLecture")
def check_lecture():
"""Route to check if the current user is in some lecture in specific document."""
lectures = get_current_user_object().lectures
lecture = lectures[0] if lectures else None
if lecture:
if lecture.is_running:
return json_response(lecture_dict(lecture), date_conversion=True)
else:
leave_lecture(lecture)
empty_lecture(lecture)
db.session.commit()
doc_id = request.args.get("doc_id")
if doc_id is not None:
return json_response(get_running_lectures(int(doc_id)), date_conversion=True)
else:
return empty_response()
[docs]def switch_to_lecture(l: Lecture):
u = get_current_user_object()
u.lectures = [l]
[docs]@lecture_routes.post("/startFutureLecture")
def start_future_lecture():
lecture = get_lecture_from_request(check_access=True)
time_now = get_current_time()
lecture.start_time = time_now
switch_to_lecture(lecture)
db.session.commit()
return json_response(lecture_dict(lecture), date_conversion=True)
[docs]@lecture_routes.get("/getAllLecturesFromDocument")
def get_all_lectures():
if not request.args.get("doc_id"):
raise RouteException("missing argument(s)")
doc_id = int(request.args.get("doc_id"))
lectures = Lecture.get_all_in_document(doc_id)
time_now = get_current_time()
current_lectures = []
past_lectures = []
future_lectures = []
for lecture in lectures:
if lecture.start_time <= time_now < lecture.end_time:
current_lectures.append(lecture)
elif lecture.end_time <= time_now:
past_lectures.append(lecture)
else:
future_lectures.append(lecture)
return json_response(
{
"currentLectures": current_lectures,
"futureLectures": future_lectures,
"pastLectures": past_lectures,
},
date_conversion=True,
)
[docs]@lecture_routes.get("/showLectureInfo/<int:lecture_id>")
def show_lecture_info(lecture_id):
lecture = Lecture.find_by_id(lecture_id)
if not lecture:
raise RouteException("Lecture not found")
doc = DocEntry.find_by_id(lecture.doc_id)
return render_template(
"lectureInfo.jinja2", item=doc, lecture=lecture, translations=doc.translations
)
[docs]@lecture_routes.get("/showLectureInfoGivenName")
def show_lecture_info_given_name():
lecture = get_lecture_from_request(check_access=False)
return json_response(
lecture.to_json(
show_password=is_lecturer_of(lecture) or get_current_user_object().is_admin
),
date_conversion=True,
)
[docs]@lecture_routes.get("/getLectureByCode")
def lecture_needs_password():
lecture = get_lecture_from_request(check_access=False)
return json_response(lecture, date_conversion=True)
[docs]def get_lecture_users(lecture: Lecture):
lecturers = []
students = []
activity = Useractivity.query.filter_by(lecture=lecture).all()
cur_time = get_current_time()
for ac in activity:
user_id = ac.user_id
active = ac.active
person = {
"user": ac.user,
"activeSecondsAgo": int((cur_time - active).total_seconds()),
}
if lecture.lecturer == user_id:
lecturers.append(person)
else:
students.append(person)
return lecturers, students
[docs]def get_running_lectures(doc_id=None):
"""Gets all running and future lectures.
:param doc_id: The document id for which to get lectures.
"""
time_now = get_current_time()
list_of_lectures = []
is_lecturer = False
if doc_id:
list_of_lectures = Lecture.get_all_in_document(doc_id, time_now)
d = get_doc_or_abort(doc_id)
is_lecturer = bool(has_ownership(d))
current_lectures = []
future_lectures = []
for lecture in list_of_lectures:
if lecture.start_time <= time_now < lecture.end_time:
current_lectures.append(lecture)
else:
future_lectures.append(lecture)
return {
"isLecturer": is_lecturer,
"lectures": current_lectures,
"futureLectures": future_lectures,
}
[docs]@lecture_routes.post("/createLecture")
def create_lecture():
doc_id, start_time, end_time, lecture_code = verify_json_params(
"doc_id", "start_time", "end_time", "lecture_code"
)
start_time = dateutil.parser.parse(start_time)
end_time = dateutil.parser.parse(end_time)
lecture_id, password, options = verify_json_params(
"lecture_id", "password", "options", require=False
)
d = get_doc_or_abort(doc_id)
verify_ownership(d)
if not options:
options = {}
if not password:
password = ""
current_user = get_current_user_id()
lec = Lecture.find_by_code(lecture_code, doc_id)
if lec and not lecture_id:
raise RouteException(
"Can't create two or more lectures with the same name to the same document."
)
options = json.dumps(options)
if lecture_id is None:
lecture = Lecture(doc_id=doc_id, lecturer=current_user)
db.session.add(lecture)
else:
lecture = Lecture.find_by_id(lecture_id)
if not lecture:
raise NotExist()
lecture.start_time = start_time
lecture.end_time = end_time
lecture.password = password
lecture.lecture_code = lecture_code
lecture.options = options
current_time = get_current_time()
if start_time <= current_time <= end_time and not get_current_lecture():
switch_to_lecture(lecture)
db.session.commit()
return json_response(lecture, date_conversion=True)
[docs]@suppress_wuff(
StaleDataError,
"https://gitlab.com/tim-jyu/tim/-/issues/1976",
r"DELETE statement on table 'lectureusers' expected to delete",
)
def empty_lecture(lec: Lecture):
lec.users = []
clean_dictionaries_by_lecture(lec)
[docs]@lecture_routes.post("/endLecture")
def end_lecture():
lecture = get_lecture_from_request()
now = get_current_time()
lecture.end_time = now
empty_lecture(lecture)
db.session.commit()
return json_response(get_running_lectures(lecture.doc_id), date_conversion=True)
[docs]def clean_dictionaries_by_lecture(lecture: Lecture):
"""Cleans data from lecture that isn't running anymore.
:param lecture: The lecture.
"""
for q in lecture.running_questions:
db.session.delete(q)
stop_showing_points(lecture)
for a in lecture.useractivity:
db.session.delete(a)
QuestionActivity.query.filter(
(
QuestionActivity.asked_id.in_(
AskedQuestion.query.filter_by(
lecture_id=lecture.lecture_id
).with_entities(AskedQuestion.asked_id)
)
)
& QuestionActivity.kind.in_(
[
QuestionActivityKind.Usershown,
QuestionActivityKind.Pointsshown,
QuestionActivityKind.Pointsclosed,
QuestionActivityKind.Useranswered,
]
)
).delete(synchronize_session="fetch")
[docs]def delete_question_temp_data(question: AskedQuestion, lecture: Lecture):
delete_activity(
question,
[
QuestionActivityKind.Usershown,
QuestionActivityKind.Useranswered,
QuestionActivityKind.Pointsclosed,
QuestionActivityKind.Pointsshown,
],
)
Runningquestion.query.filter_by(lecture_id=lecture.lecture_id).delete()
stop_showing_points(lecture)
[docs]@lecture_routes.post("/extendLecture")
def extend_lecture():
new_end_time = request.args.get("new_end_time")
if not new_end_time:
raise RouteException("missing argument(s)")
lecture = get_lecture_from_request()
lecture.end_time = new_end_time
db.session.commit()
return ok_response()
[docs]@dataclass
class DeleteLectureModel:
lecture_id: int
[docs]@lecture_routes.post("/deleteLecture")
@use_model(DeleteLectureModel)
def delete_lecture(m: DeleteLectureModel):
lecture = get_lecture_from_request(lecture_id=m.lecture_id)
with db.session.no_autoflush:
empty_lecture(lecture)
Message.query.filter_by(lecture_id=lecture.lecture_id).delete()
LectureAnswer.query.filter_by(lecture_id=lecture.lecture_id).delete()
AskedQuestion.query.filter_by(lecture_id=lecture.lecture_id).delete()
db.session.delete(lecture)
db.session.commit()
return json_response(get_running_lectures(lecture.doc_id), date_conversion=True)
[docs]def get_lecture_from_request(
check_access=True, lecture_id: int | None = None
) -> Lecture:
lecture_id = get_option(request, "lecture_id", lecture_id, cast=int)
if not lecture_id:
lecture_code = get_option(request, "lecture_code", None)
doc_id = get_option(request, "doc_id", None)
lecture = Lecture.find_by_code(lecture_code, doc_id)
else:
lecture = Lecture.find_by_id(lecture_id)
if not lecture:
raise NotExist("Lecture not found")
if check_access:
verify_is_lecturer(lecture)
return lecture
[docs]@lecture_routes.post("/joinLecture")
def join_lecture():
"""Route to join lecture.
Checks that the given password is correct.
"""
lecture = get_lecture_from_request(check_access=False)
password_quess = request.args.get("password_quess")
# TODO Allow lecturer always join, even if the lecture is full
lecture_full = lecture.is_full
correct_password = True
if lecture.password and lecture.password != password_quess:
correct_password = False
u = get_current_user_object()
if lecture.is_running and not lecture_full and correct_password:
if not logged_in():
u = log_in_as_anonymous(session)
g.user = u
switch_to_lecture(lecture)
update_activity(lecture, u)
db.session.commit()
return json_response(
{
"correctPassword": correct_password,
**lecture_dict(lecture),
},
date_conversion=True,
)
[docs]def update_activity(lecture: Lecture, u: User):
ua = Useractivity(user_id=u.id, lecture_id=lecture.lecture_id, active=func.now())
db.session.merge(ua)
[docs]@lecture_routes.post("/leaveLecture")
def leave_lecture_route():
lecture = get_lecture_from_request(check_access=False)
leave_lecture(lecture)
db.session.commit()
return ok_response()
[docs]def leave_lecture(lecture: Lecture):
u = get_current_user_object()
if u in lecture.users:
lecture.users.remove(u)
[docs]@lecture_routes.post("/extendQuestion")
def extend_question():
asked_id = int(request.args.get("asked_id"))
extend = int(request.args.get("extend"))
q = get_asked_question(asked_id)
if not q:
raise NotExist()
rq: Runningquestion = q.running_question
if not q.is_running:
raise RouteException("Question is not running")
rq.end_time += timedelta(seconds=extend)
db.session.commit()
return ok_response()
[docs]def get_current_lecture() -> Lecture | None:
u = get_current_user_object()
lectures: list[Lecture] = u.lectures
if not lectures:
return None
if len(lectures) > 1:
raise Exception(
f"User {u.name} has joined to multiple lectures which should not be possible."
)
return lectures[0]
[docs]def get_current_lecture_or_abort() -> Lecture:
lec = get_current_lecture()
if not lec:
raise RouteException("Not joined to any lecture")
return lec
[docs]@lecture_routes.post("/askQuestion")
def ask_question():
if not (
request.args.get("question_id")
or request.args.get("asked_id")
or request.args.get("par_id")
):
raise RouteException("Bad request")
question_id = None
asked_id = None
par_id = None
if request.args.get("question_id"):
question_id = int(request.args.get("question_id"))
elif request.args.get("asked_id"):
asked_id = int(request.args.get("asked_id"))
else:
par_id = request.args.get("par_id")
lecture = get_current_lecture_or_abort()
verify_is_lecturer(lecture)
if question_id or par_id:
doc_id = get_option(request, "doc_id", None, cast=int)
if not doc_id:
raise RouteException("doc_id missing")
if question_id:
question = Question.query.get(question_id) # Old version???
question_json_str = question.questionjson
markup = json.loads(question_json_str)
else:
d = get_doc_or_abort(doc_id)
markup = get_question_data_from_document(d, par_id)
question_json_str = json.dumps(markup.markup)
question_hash = hashfunc(question_json_str)
asked_json = get_asked_json_by_hash(question_hash)
if not asked_json:
asked_json = AskedJson(json=question_json_str, hash=question_hash)
asked_time = get_current_time()
# Set points and expl as None because they're already contained in the JSON.
# Only if /updatePoints is called, they are set.
question = AskedQuestion(
lecture=lecture,
doc_id=doc_id,
asked_time=asked_time,
points=None,
expl=None,
asked_json=asked_json,
par_id=par_id,
)
db.session.add(question)
elif asked_id:
question = get_asked_question(asked_id)
if not question:
raise NotExist("Asked question not found.")
question.asked_time = get_current_time()
lecture = question.lecture
else:
raise RouteException("Missing parameters")
delete_question_temp_data(question, lecture)
rq = Runningquestion(
lecture=lecture,
asked_question=question,
ask_time=question.asked_time,
end_time=question.end_time,
)
db.session.add(rq)
db.session.commit()
return json_response(question, date_conversion=True)
[docs]class ShowAnswerPointsModel(AskedIdModel):
current_question_id: int | None = None
current_points_id: int | None = None
[docs]@lecture_routes.post("/showAnswerPoints")
@use_model(ShowAnswerPointsModel)
def show_points(m: ShowAnswerPointsModel):
lecture = get_current_lecture_or_abort()
verify_is_lecturer(lecture)
asked_id = m.asked_id
q = get_asked_question(asked_id)
if not q:
raise NotExist()
stop_showing_points(lecture)
sp = Showpoints(asked_question=q)
db.session.add(sp)
current_question_id = m.current_question_id
current_points_id = m.current_points_id
new_question = get_new_question(lecture, current_question_id, current_points_id)
db.session.commit()
if new_question is not None:
return json_response(new_question, date_conversion=True)
return empty_response()
[docs]def stop_showing_points(lecture: Lecture):
Showpoints.query.filter(
Showpoints.asked_id.in_(
AskedQuestion.query.filter_by(lecture_id=lecture.lecture_id).with_entities(
AskedQuestion.asked_id
)
)
).delete(synchronize_session="fetch")
[docs]@lecture_routes.post("/updatePoints/")
def update_question_points():
"""Route to get add question to database."""
if "asked_id" not in request.args or "points" not in request.args:
raise RouteException("missing argument(s)")
asked_id = int(request.args.get("asked_id"))
points = request.args.get("points")
expl = request.args.get("expl")
asked_question = get_asked_question(asked_id)
verify_is_lecturer(asked_question.lecture)
asked_question.points = points
asked_question.expl = expl
points_table = create_points_table(points)
question_answers: list[LectureAnswer] = asked_question.answers.all()
default_points = asked_question.get_default_points()
for answer in question_answers:
answer.points = calculate_points_from_json_answer(
answer.get_parsed_answer(), points_table, default_points
)
db.session.commit()
return ok_response()
[docs]def delete_activity(question: AskedQuestion, kinds):
QuestionActivity.query.filter(
(QuestionActivity.asked_id == question.asked_id)
& QuestionActivity.kind.in_(kinds)
).delete(synchronize_session="fetch")
[docs]@lecture_routes.get("/getQuestionByParId")
def get_question_by_par_id():
if not request.args.get("par_id") or not request.args.get("doc_id"):
raise RouteException("missing argument(s)")
doc_id = int(request.args.get("doc_id"))
par_id = request.args.get("par_id")
edit = request.args.get("edit", False)
d = get_doc_or_abort(doc_id)
verify_edit_access(d)
question = get_question_data_from_document(d, par_id, edit)
return json_response(question)
[docs]@lecture_routes.get("/getAskedQuestionById")
def get_asked_question_by_id():
if not request.args.get("asked_id"):
raise RouteException("missing argument(s)")
asked_id = int(request.args.get("asked_id"))
question = get_asked_question(asked_id)
verify_is_lecturer(question.lecture)
return json_response(question, date_conversion=True)
[docs]@lecture_routes.get("/getQuestionAnswer")
def get_question_answer_by_id():
answer_id = get_option(request, "id", default=None, cast=int)
if answer_id:
raise RouteException("missing argument(s)")
ans = LectureAnswer.get_by_id(answer_id)
if not ans:
raise NotExist("Answer not found")
verify_is_lecturer(ans.asked_question.lecture)
return json_response(ans, date_conversion=True)
[docs]@lecture_routes.post("/stopQuestion")
@use_model(AskedIdModel)
def stop_question(m: AskedIdModel):
"""Route to stop question from running."""
asked_id = m.asked_id
aq = get_asked_question(asked_id)
if not aq:
raise NotExist("Asked question not found")
lecture = get_current_lecture_or_abort()
verify_is_lecturer(lecture)
if not aq.is_running:
return json_response({"status": "Question not running anymore"})
aq.running_question.end_time = get_current_time()
delete_activity(
aq, [QuestionActivityKind.Usershown, QuestionActivityKind.Useranswered]
)
db.session.commit()
return ok_response()
[docs]@lecture_routes.get("/getLectureAnswers")
def get_lecture_answers():
"""Changing this to long poll requires removing threads."""
asked_id = get_option(request, "asked_id", None, cast=int)
if not asked_id:
raise RouteException("Bad request")
question = get_asked_question(asked_id)
verify_is_lecturer(question.lecture)
if not question:
raise NotExist("Asked question not found")
after = get_option(
request, "after", default=question.asked_time, cast=dateutil.parser.parse
)
lecture_answers = (
question.answers.filter(LectureAnswer.answered_on > after)
.order_by(LectureAnswer.answered_on.asc())
.all()
)
return json_response(
[
a.to_json(include_question=False, include_user=False)
for a in lecture_answers
],
date_conversion=True,
)
[docs]@dataclass
class AnswerToQuestionModel(AskedIdModel):
input: list[list[str]]
[docs]@lecture_routes.put("/answerToQuestion")
@use_model(AnswerToQuestionModel)
def answer_to_question(m: AnswerToQuestionModel):
asked_id = m.asked_id
answer = m.input
lecture = get_current_lecture_or_abort()
lecture_id = lecture.lecture_id
u = get_current_user_object()
asked_question = get_asked_question(asked_id)
lecture_answer: LectureAnswer = asked_question.answers.filter_by(
user_id=u.id
).first()
with user_activity_lock(u):
already_answered = asked_question.has_activity(
QuestionActivityKind.Useranswered, u
)
if not asked_question.is_running:
if (
not asked_question.running_question
or get_current_time()
> asked_question.running_question.end_time + timedelta(seconds=5)
):
return json_response(
{
"questionLate": "The question has already finished. Your answer was not saved."
}
)
if already_answered:
return json_response(
{
"alreadyAnswered": "You have already answered to question. Your first answer is saved."
}
)
asked_question.add_activity(QuestionActivityKind.Useranswered, u)
if (not lecture_answer) or (lecture_answer and answer != lecture_answer.answer):
whole_answer = answer
time_now = get_current_time()
if not is_lecturer_of(lecture):
whole_answer, question_points = asked_question.build_answer_and_points(
whole_answer, u
)
else:
question_points = asked_question.get_effective_points()
points_table = create_points_table(question_points)
default_points = asked_question.get_default_points()
points = calculate_points_from_json_answer(answer, points_table, default_points)
answer = json.dumps(whole_answer)
if lecture_answer and u.id != 0:
lecture_answer.answered_on = time_now
lecture_answer.answer = answer
lecture_answer.points = points
else:
ans = LectureAnswer(
user_id=u.id,
question_id=asked_id,
lecture_id=lecture_id,
answer=answer,
answered_on=time_now,
points=points,
)
db.session.add(ans)
db.session.commit()
return ok_response()
[docs]@lecture_routes.put("/closePoints")
def close_points():
asked_id = get_option(request, "asked_id", None, cast=int)
if not asked_id:
raise RouteException("Missing asked_id")
lecture = get_current_lecture_or_abort()
q = get_asked_question(asked_id)
if not q:
raise NotExist()
points = get_shown_points(lecture)
if points:
q.add_activity(QuestionActivityKind.Pointsclosed, get_current_user_object())
db.session.commit()
return ok_response()