Source code for timApp.lecture.askedquestion

import json
from contextlib import contextmanager
from datetime import timedelta, datetime
from typing import Optional

from sqlalchemy import func
from sqlalchemy.exc import InvalidRequestError

from timApp.lecture.askedjson import AskedJson
from timApp.lecture.lecture import Lecture
from timApp.lecture.question_utils import qst_rand_array, qst_filter_markup_points
from timApp.lecture.questionactivity import QuestionActivityKind, QuestionActivity
from timApp.timdb.sqa import db
from timApp.timtypes import UserType
from timApp.util.utils import get_current_time


[docs]class AskedQuestion(db.Model): __tablename__ = "askedquestion" asked_id = db.Column(db.Integer, primary_key=True) lecture_id = db.Column( db.Integer, db.ForeignKey("lecture.lecture_id"), nullable=False ) doc_id = db.Column(db.Integer, db.ForeignKey("block.id")) par_id = db.Column(db.Text) asked_time = db.Column(db.DateTime(timezone=True), nullable=False) points = db.Column(db.Text) # not a single number; cannot be numeric asked_json_id = db.Column( db.Integer, db.ForeignKey("askedjson.asked_json_id"), nullable=False ) expl = db.Column(db.Text) asked_json: AskedJson = db.relationship( "AskedJson", back_populates="asked_questions", lazy="joined" ) lecture: Lecture = db.relationship( "Lecture", back_populates="asked_questions", lazy="joined" ) answers = db.relationship( "LectureAnswer", back_populates="asked_question", lazy="dynamic" ) answers_all = db.relationship("LectureAnswer", back_populates="asked_question") running_question = db.relationship( "Runningquestion", back_populates="asked_question", lazy="select", uselist=False ) questionactivity = db.relationship( "QuestionActivity", back_populates="asked_question", lazy="dynamic" ) showpoints = db.relationship( "Showpoints", back_populates="asked_question", lazy="select" ) @property def end_time(self) -> datetime | None: timelimit = self.time_limit if not timelimit: return None return self.asked_time + timedelta(seconds=timelimit)
[docs] def has_activity(self, kind: QuestionActivityKind, user: UserType): return self.questionactivity.filter_by(kind=kind, user_id=user.id).first()
[docs] def add_activity(self, kind: QuestionActivityKind, user: UserType): if self.has_activity(kind, user): return a = QuestionActivity(kind=kind, user=user, asked_question=self) db.session.add(a)
@property def time_limit(self): return self.asked_json.to_json()["json"].get("timeLimit")
[docs] def to_json(self, hide_points=False): aj = self.asked_json.to_json(hide_points) if self.expl: aj["json"]["expl"] = json.loads(self.expl) if not hide_points and self.points: aj["json"]["points"] = self.points return { "asked_id": self.asked_id, "json": aj, "asked_time": self.asked_time, "doc_id": self.doc_id, "lecture_id": self.lecture_id, "par_id": self.par_id, }
[docs] def get_effective_points(self): if self.points: return self.points aj = self.asked_json.to_json() return aj["json"].get("points")
[docs] def get_default_points(self): aj = self.asked_json.to_json() return aj["json"].get("defaultPoints", 0)
[docs] def build_answer_and_points(self, answer, u: UserType): """ Checks whether question was randomized If so, set question point input accordingly and expand answer to contain randomization data """ q_data = json.loads(self.asked_json.json) random_rows = q_data.get("randomizedRows", 0) if random_rows: row_count = len(q_data.get("rows", [])) q_type = q_data.get("questionType") rand_arr = qst_rand_array( row_count, random_rows, str(u.id), locks=q_data.get("doNotMove") ) question_points = self.get_effective_points() if question_points: question_points = qst_filter_markup_points( question_points, q_type, rand_arr ) # If lecture question's rows are randomized, it will be saved as a dict # containing additional information about how the answerer saw the question. # e.g {"c": [["2"]], "order": [4, 3, 5], "rows": 5, "question_type": "radio-vertical"} return { "c": answer, "order": rand_arr, "rows": row_count, "question_type": q_type, }, question_points return answer, self.get_effective_points()
@property def is_running(self): rq = self.running_question if not rq: return False et = rq.end_time if not et: return True return get_current_time() < et
[docs]def get_asked_question(asked_id: int) -> AskedQuestion | None: return AskedQuestion.query.get(asked_id)
[docs]@contextmanager def user_activity_lock(user: UserType): db.session.query(func.pg_advisory_xact_lock(user.id)).all() yield return
# db.session.query(func.pg_advisory_lock(user.id)).all() # try: # yield # finally: # try: # r = db.session.query(func.pg_advisory_unlock(user.id)).scalar() # except InvalidRequestError: # db.session.rollback() # r = db.session.query(func.pg_advisory_unlock(user.id)).scalar() # if not r: # raise Exception(f'Failed to release lock: {user.id}')