Source code for timApp.folder.folder

from __future__ import annotations

from typing import Iterable, Any, TYPE_CHECKING

from sqlalchemy import true, and_

from timApp.auth.auth_models import BlockAccess
from timApp.document.docentry import DocEntry, get_documents
from timApp.document.docinfo import DocInfo
from timApp.document.specialnames import TEMPLATE_FOLDER_NAME
from timApp.folder.createopts import FolderCreationOptions
from timApp.item.block import Block, insert_block, copy_default_rights, BlockType
from timApp.item.item import Item
from timApp.timdb.exceptions import ItemAlreadyExistsException
from timApp.timdb.sqa import db
from timApp.user.usergroup import UserGroup
from timApp.util.utils import split_location, join_location, relative_location

if TYPE_CHECKING:
    from timApp.user.user import User

ROOT_FOLDER_ID = -1


[docs]class Folder(db.Model, Item): """Represents a folder in the directory hierarchy.""" __tablename__ = "folder" id = db.Column(db.Integer, db.ForeignKey("block.id"), primary_key=True) """Folder identifier.""" name = db.Column(db.Text, nullable=False) """Folder name (last part of path).""" location = db.Column(db.Text, nullable=False) """Folder location (first parts of the path).""" __table_args__ = (db.UniqueConstraint("name", "location", name="folder_uc"),) _block = db.relationship("Block", back_populates="folder", lazy="joined")
[docs] @staticmethod def get_root() -> Folder: return Folder(id=ROOT_FOLDER_ID, name="", location="")
[docs] @staticmethod def get_by_id(fid) -> Folder | None: return Folder.query.get(fid) if fid != ROOT_FOLDER_ID else Folder.get_root()
[docs] @staticmethod def find_by_location(location, name) -> Folder | None: return Folder.query.filter_by(name=name, location=location).first()
[docs] @staticmethod def find_by_path(path, fallback_to_id=False) -> Folder | None: if path == "": return Folder.get_root() parent_loc, name = split_location(path) f = Folder.find_by_location(parent_loc, name) if f is None and fallback_to_id: try: return Folder.get_by_id(int(path)) except ValueError: return None return f
[docs] @staticmethod def find_first_existing(path: str) -> Folder | None: """Finds the first existing folder for the given path. For example, if the folders a a/b exist, then: if path is a/b/c, returns a/b if path is a/b, returns a if path is a, returns the root folder if path is empty, returns None """ while path: path, _ = split_location(path) d = Folder.find_by_path(path) if d: return d return None
[docs] @staticmethod def get_all_in_path( root_path: str = "", filter_ids: Iterable[int] | None = None, recurse=False, ) -> list[Folder]: """Gets all the folders under a path. :param recurse: Whether to search recursively. :param root_path: Restricts the search to a specific folder. :param filter_ids: An optional iterable of document ids for filtering the folders. Must be non-empty if supplied. :return: A list of Folder objects. """ f_filter = Folder.location == root_path if recurse: f_filter = ( (f_filter | Folder.location.startswith(root_path + "/")) if root_path else true() ) q = Folder.query.filter(f_filter) if filter_ids: q = q.filter(Folder.id.in_(filter_ids)) return q.all()
[docs] def is_root(self) -> bool: return self.id == -1
[docs] def delete(self): assert self.is_empty db.session.delete(self) BlockAccess.query.filter_by(block_id=self.id).delete() Block.query.filter_by(type_id=BlockType.Folder.value, id=self.id).delete()
[docs] def rename(self, new_name: str): assert "/" not in new_name old_path = self.path self.name = new_name self.rename_content(old_path, self.path)
[docs] def rename_path(self, new_path: str) -> None: """Renames the folder, updating all the documents within. :param new_path: The new name for the folder. """ old_path = self.path self.path = new_path self.rename_content(old_path, new_path)
[docs] def rename_content(self, old_path: str, new_path: str): """Renames contents of the folder.""" docs_in_folder: list[DocEntry] = DocEntry.query.filter( DocEntry.name.like(old_path + "/%") ).all() for d in docs_in_folder: d.name = d.name.replace(old_path, new_path, 1) folders_in_folder = Folder.query.filter( (Folder.location == old_path) | (Folder.location.like(old_path + "/%")) ).all() for f in folders_in_folder: f.location = f.location.replace(old_path, new_path, 1)
@property def is_empty(self): q = Folder.query.filter_by(location=self.path) if db.session.query(q.exists()).scalar(): return False q = DocEntry.query.filter(DocEntry.name.like(self.path + "/%")) return not db.session.query(q.exists()).scalar() @property def parent(self) -> Folder | None: if self.is_root(): return None return super().parent @property def path(self): return self.get_full_path() @path.setter def path(self, new_path: str): loc, name = split_location(new_path) self.location = loc self.name = name @property def path_without_lang(self): """Returns path without the language part. For folders, this is the same as the path itself. """ return self.path @property def block(self) -> Block | None: """Overridden for optimization: root folder does not have a db entry, so we won't try to query for it.""" if self.is_root(): return None return Item.block.fget(self)
[docs] def get_full_path(self) -> str: return join_location(self.location, self.name)
[docs] def get_document( self, relative_path: str, create_if_not_exist=False, creator_group=None ) -> None | (DocEntry): doc = DocEntry.query.filter_by( name=join_location(self.get_full_path(), relative_path) ).first() if doc is not None: return doc if create_if_not_exist: rel_folder, short_name = split_location(relative_path) Folder.create( join_location(self.get_full_path(), rel_folder), owner_groups=creator_group, ) return DocEntry.create( join_location(self.get_full_path(), relative_path), owner_group=creator_group, title=short_name, ) else: return None
[docs] def get_all_documents( self, relative_paths: list[str] = None, include_subdirs: bool = False, custom_filter: Any = None, query_options: Any = None, filter_user: User | None = None, ) -> list[DocInfo]: if relative_paths is not None: include_subdirs = True paths = [ join_location(self.get_full_path(), path) for path in relative_paths ] if custom_filter is None: custom_filter = DocEntry.name.in_(paths) else: custom_filter = and_(custom_filter, DocEntry.name.in_(paths)) return get_documents( include_nonpublic=True, filter_folder=self.get_full_path(), search_recursively=include_subdirs, custom_filter=custom_filter, query_options=query_options, filter_user=filter_user, )
[docs] def get_all_folders(self) -> list[Folder]: return Folder.get_all_in_path(self.path)
[docs] def relative_path(self, item: Item) -> str: return relative_location(item.path, self.path)
[docs] @staticmethod def create( path: str, owner_groups: list[UserGroup] | UserGroup | None = None, title=None, creation_opts: FolderCreationOptions = FolderCreationOptions(), ) -> Folder: """Creates a new folder with the specified name. If the folder already exists, it is returned. :param title: The folder title. :param creation_opts: Additional folder creation options. :param path: The name of the folder to be created. :param owner_groups: The owner group. :returns: The created or existing folder. """ path = path.strip("/") if DocEntry.find_by_path(path): db.session.rollback() raise ItemAlreadyExistsException( f"A document already exists at path {path}" ) # Root folder is special case if not path: return Folder.get_root() rel_path, rel_name = split_location(path) folder = Folder.query.filter_by(name=rel_name, location=rel_path).first() if folder is not None: return folder # Make sure that the parent folder exists p_f = Folder.create(rel_path, owner_groups) # Templates is a special folder, so it should have the same owner as its parent, # except if we're in users folder. owner_groups = ( p_f.owners if creation_opts.get_templates_rights_from_parent and rel_name == TEMPLATE_FOLDER_NAME and rel_path != "users" else owner_groups ) if owner_groups is None: owner_groups = [] owner_groups = ( owner_groups if isinstance(owner_groups, list) else [owner_groups] ) block = insert_block( BlockType.Folder, title or rel_name, owner_groups, ) f = Folder(_block=block, name=rel_name, location=rel_path) db.session.add(f) if creation_opts.apply_default_rights: copy_default_rights(f, BlockType.Folder, owners_to_skip=owner_groups) return f
[docs] def to_json(self): return {**super().to_json(), "isFolder": True}
[docs]def path_includes(longer_path: str, shorter_path: str): longer_path_stripped = longer_path.strip("/") shorter_path_stripped = shorter_path.strip("/") return (longer_path_stripped + "/").startswith(shorter_path_stripped + "/")