import os
import re
from pathlib import Path
from typing import Optional, NamedTuple, Union
import magic
from werkzeug.utils import secure_filename
from timApp.answer.answer_models import AnswerUpload
from timApp.document.docinfo import DocInfo
from timApp.item.block import insert_block, Block, BlockType
from timApp.item.item import ItemBase
from timApp.timdb.dbaccess import get_files_path
from timApp.timdb.exceptions import TimDbException
from timApp.timdb.sqa import db
from timApp.user.user import User
DIR_MAPPING = {
BlockType.File: "files",
BlockType.Image: "images",
BlockType.Upload: "uploads",
}
[docs]class PluginUploadInfo(NamedTuple):
"""Additional information required for saving a :class:`PluginUpload`."""
task_id_name: str
doc: DocInfo
user: User
[docs]def get_storage_path(block_type: BlockType):
"""Gets the storage path for the given block type.
:param block_type: The block type.
:return: The storage path.
"""
return get_files_path() / "blocks" / DIR_MAPPING[block_type]
[docs]class UploadedFile(ItemBase):
"""A file that has been uploaded by a user."""
def __init__(self, b: Block):
self._block = b
[docs] @staticmethod
def find_by_id(block_id: int) -> Optional["UploadedFile"]:
b: Block | None = Block.query.get(block_id)
if not b:
return None
klass = CLASS_MAPPING.get(BlockType(b.type_id))
if not klass:
return None
return klass(b)
[docs] @staticmethod
def get_by_url(url: str) -> Optional["UploadedFile"]:
"""
Get file matching the given URL string.
:param url: File url as a string.
:return: UploadedFile, StampedPDF, or None, if neither was found.
"""
match = re.search(r"/files/(?P<id>\d+)/(?P<filename>.+)", url)
if not match:
return None
file_id = int(match.group("id"))
filename = str(match.group("filename"))
return UploadedFile.get_by_id_and_filename(file_id, filename)
[docs] @staticmethod
def get_by_id_and_filename(
file_id: int, filename: str
) -> Union["UploadedFile", "StampedPDF"] | None:
"""
Get uploaded file or its stamped version in case file name differs (i.e. it has "_stamped" in it).
:param file_id: File id.
:param filename: File name, which may contain "_stamped".
:return: UploadedFile, StampedPDF, or None, if neither was found.
"""
f = UploadedFile.find_by_id(file_id)
if not f:
return None
if filename != f.filename:
# Try to find stamped PDF file.
s = StampedPDF(f.block)
if filename != s.filename:
return None
if not s.filesystem_path.exists():
return None
f = s
return f
@property
def id(self):
return self.block.id
@property
def block_type(self):
return BlockType(self.block.type_id)
@property
def relative_filesystem_path(self):
assert self.id is not None
return Path(str(self.id)) / self.filename
@property
def base_path(self):
return get_storage_path(self.block_type)
@property
def filesystem_path(self):
return self.base_path / self.relative_filesystem_path
@property
def filename(self):
return self.block.description
@property
def data(self):
with self.filesystem_path.open(mode="rb") as f:
return f.read()
@property
def size(self):
return os.path.getsize(self.filesystem_path)
@property
def content_mimetype(self):
return get_mimetype(self.filesystem_path.as_posix())
@property
def is_content_pdf(self):
return self.content_mimetype == "application/pdf"
[docs] @classmethod
def save_new(
cls,
file_filename: str,
block_type: BlockType,
file_data: bytes | None = None,
original_file: Path | None = None,
upload_info: PluginUploadInfo | None = None,
) -> "UploadedFile":
if file_data is None and original_file is None:
raise TimDbException(
"Either file data or original file location must be given"
)
if block_type not in DIR_MAPPING:
raise TimDbException(f"Invalid block type given: {block_type}")
secured_name = secure_filename(file_filename)
if block_type == BlockType.Upload:
assert upload_info
base_path = get_storage_path(block_type)
path = (
base_path
/ str(upload_info.doc.id)
/ upload_info.task_id_name
/ upload_info.user.name
)
path.mkdir(parents=True, exist_ok=True)
file_id = len(os.listdir(path)) + 1
path = path / str(file_id) / secured_name
file_block = insert_block(
block_type=block_type,
description=path.relative_to(base_path).as_posix(),
)
au = AnswerUpload(block=file_block)
db.session.add(au)
else:
file_block = insert_block(block_type=block_type, description=secured_name)
db.session.flush()
f = CLASS_MAPPING[block_type](file_block)
p = f.filesystem_path
p.parent.mkdir(parents=True)
if file_data is not None:
with p.open(mode="wb") as fi:
fi.write(file_data)
else:
original_file.rename(p)
return f
[docs]class StampedPDF(UploadedFile):
@property
def filename(self):
return Path(self.block.description).stem + "_stamped.pdf"
[docs]class PluginUpload(UploadedFile):
"""A file that is associated with an :class:`~.Answer`."""
@property
def relative_filesystem_path(self) -> Path:
return Path(self.block.description)
@property
def filename(self):
return self.relative_filesystem_path.parts[-1]
CLASS_MAPPING = {
BlockType.File: UploadedFile,
BlockType.Image: UploadedFile,
BlockType.Upload: PluginUpload,
}
WHITELIST_MIMETYPES = {
"application/pdf",
"image/gif",
"image/jpeg",
"image/jpg",
"image/png",
"image/svg+xml",
"text/plain",
"text/xml",
"application/octet-stream",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.wordprocessingml.template",
"application/vnd.ms-word.document.macroEnabled.12",
"application/vnd.ms-word.template.macroEnabled.12",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.spreadsheetml.template",
"application/vnd.ms-excel.sheet.macroEnabled.12",
"application/vnd.ms-excel.template.macroEnabled.12",
"application/vnd.ms-excel.addin.macroEnabled.12",
"application/vnd.ms-excel.sheet.binary.macroEnabled.12",
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.openxmlformats-officedocument.presentationml.template",
"application/vnd.openxmlformats-officedocument.presentationml.slideshow",
"application/vnd.ms-powerpoint.addin.macroEnabled.12",
"application/vnd.ms-powerpoint.presentation.macroEnabled.12",
"application/vnd.ms-powerpoint.template.macroEnabled.12",
"application/vnd.ms-powerpoint.slideshow.macroEnabled.12",
"application/vnd.ms-access",
}
REMAP_MIMETYPES = {
"application/csv": "text/csv",
}
[docs]def get_mimetype(p):
mime = magic.Magic(mime=True)
mt = mime.from_file(p)
if mt == "image/svg":
mt += "+xml"
if isinstance(mt, bytes):
mt = mt.decode("utf-8")
if remapped := REMAP_MIMETYPES.get(mt):
mt = remapped
if mt not in WHITELIST_MIMETYPES:
if mt.startswith("text/"):
mt = "text/plain"
else:
mt = "application/octet-stream"
return mt