"""Routes for manage view."""
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from re import Pattern
from typing import Generator
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
from flask import redirect
from flask import render_template, Response
from flask import request
from isodate import Duration
from sqlalchemy import inspect
from sqlalchemy.orm.state import InstanceState
from timApp.auth.accesshelper import (
verify_manage_access,
verify_ownership,
verify_view_access,
has_ownership,
verify_edit_access,
get_doc_or_abort,
get_item_or_abort,
get_folder_or_abort,
verify_copy_access,
AccessDenied,
get_single_view_access,
has_edit_access,
)
from timApp.auth.accesstype import AccessType
from timApp.auth.auth_models import AccessTypeModel, BlockAccess
from timApp.auth.sessioninfo import get_current_user_group_object
from timApp.auth.sessioninfo import get_current_user_object
from timApp.document.create_item import copy_document_and_enum_translations
from timApp.document.docentry import DocEntry
from timApp.document.docinfo import move_document, find_free_name, DocInfo
from timApp.document.exceptions import ValidationException
from timApp.document.translation.translation import Translation
from timApp.folder.createopts import FolderCreationOptions
from timApp.folder.folder import Folder, path_includes
from timApp.item.block import BlockType, Block, copy_default_rights
from timApp.item.copy_rights import copy_rights
from timApp.item.item import Item
from timApp.item.validation import (
validate_item,
validate_item_and_create_intermediate_folders,
has_special_chars,
)
from timApp.timdb.sqa import db
from timApp.user.user import User, ItemOrBlock
from timApp.user.usergroup import UserGroup
from timApp.user.users import (
remove_default_access,
get_default_rights_holders,
get_rights_holders,
remove_access,
)
from timApp.user.userutils import (
grant_access,
grant_default_access,
is_some_default_right_document,
)
from timApp.util.flask.requesthelper import (
get_option,
RouteException,
NotExist,
)
from timApp.util.flask.responsehelper import (
json_response,
ok_response,
get_grid_modules,
safe_redirect,
)
from timApp.util.flask.typedblueprint import TypedBlueprint
from timApp.util.logger import log_info
from timApp.util.utils import (
remove_path_special_chars,
split_location,
join_location,
get_current_time,
cached_property,
seq_to_str,
)
manage_page = TypedBlueprint(
"manage_page", __name__, url_prefix=""
) # TODO: Better URL prefix.
[docs]@manage_page.get("/manage/<path:path>")
def manage(path: str) -> Response | str:
if has_special_chars(path):
qs = request.query_string.decode("utf-8")
return redirect(
remove_path_special_chars(request.path) + (f"?{qs}" if qs else "")
)
item = DocEntry.find_by_path(path, fallback_to_id=True)
if item is None:
item = Folder.find_by_path(path, fallback_to_id=True)
if item is None:
raise NotExist()
verify_view_access(item)
is_folder = isinstance(item, Folder)
if not is_folder and has_edit_access(item):
item.serialize_content = True
item.changelog_length = get_option(request, "history", 100)
return render_template(
"manage.jinja2",
route="manage",
translations=item.translations if not is_folder else None,
item=item,
js=["angular-ui-grid"],
jsMods=get_grid_modules(),
orgs=UserGroup.get_organizations(),
access_types=AccessTypeModel.query.all(),
)
[docs]@manage_page.get("/changelog/<int:doc_id>/<int:length>")
def get_changelog(doc_id: int, length: int) -> Response:
doc = get_doc_or_abort(doc_id)
verify_manage_access(doc)
return json_response({"versions": doc.get_changelog_with_names(length)})
[docs]class TimeType(Enum):
always = 0
range = 1
duration = 2
[docs]@dataclass
class TimeOpt:
type: TimeType
duration: Duration | None = None
to: datetime | None = None
ffrom: datetime | None = field(metadata={"data_key": "from"}, default=None)
durationTo: datetime | None = None
durationFrom: datetime | None = None
@cached_property
def effective_opt(self):
acc_to = None
dur_from = None
dur_to = None
duration = None
accessible_from = get_current_time()
if self.type == TimeType.range:
accessible_from = self.ffrom
acc_to = self.to
if self.type == TimeType.duration:
accessible_from = None
dur_from = self.durationFrom
acc_to = self.to
dur_to = self.durationTo
duration = self.duration_timedelta
return TimeOpt(
type=self.type,
duration=duration,
to=acc_to,
ffrom=accessible_from,
durationFrom=dur_from,
durationTo=dur_to,
)
@property
def duration_timedelta(self):
if not self.duration:
return None
if isinstance(self.duration, timedelta):
return self.duration
try:
return self.duration.totimedelta(start=datetime.min)
except (OverflowError, ValueError):
raise RouteException("Duration is too long.")
[docs]class EditOption(Enum):
Add = "add"
Remove = "remove"
[docs]@dataclass
class PermissionEditModel:
type: AccessType = field(metadata={"by_value": True})
time: TimeOpt
groups: list[str]
confirm: bool | None
def __post_init__(self):
if self.confirm and self.time.type == TimeType.range and self.time.ffrom:
raise RouteException("Cannot require confirm with start time set")
@cached_property
def group_objects(self):
return UserGroup.query.filter(UserGroup.name.in_(self.groups)).all()
@property
def nonexistent_groups(self):
return sorted(list(set(self.groups) - {g.name for g in self.group_objects}))
[docs]@dataclass
class PermissionSingleEditModel(PermissionEditModel):
id: int
[docs]class DefaultItemType(Enum):
document = 0
folder = 1
[docs]@dataclass
class DefaultPermissionModel(PermissionSingleEditModel):
item_type: DefaultItemType
[docs]@dataclass
class PermissionRemoveModel:
id: int
type: AccessType = field(metadata={"by_value": True})
group: int
[docs]@dataclass
class DefaultPermissionRemoveModel(PermissionRemoveModel):
item_type: DefaultItemType
[docs]@dataclass
class PermissionMassEditModel(PermissionEditModel):
ids: list[int]
action: EditOption = field(metadata={"by_value": True})
[docs]@manage_page.get("/permissions/add/<int:doc_id>/<username>")
def add_permission_basic(
doc_id: int, username: str, type: str, duration: int
) -> Response:
if type != "view":
raise RouteException("Only 'view' is allowed to prevent misuse")
i = get_item_or_abort(doc_id)
settings = i.document.get_settings()
if not settings.allow_url_permission_edits():
raise AccessDenied(
"The document permissions cannot be edited via URLs. Add `allow_url_permission_edits: true` to document settings to allow this."
)
verify_permission_edit_access(i, AccessType.view)
accs = add_perm(
PermissionEditModel(
type=AccessType.view,
groups=[username],
time=TimeOpt(
type=TimeType.duration,
duration=Duration(hours=duration),
),
confirm=False,
),
i,
replace_active_duration=False,
)
res_message = ""
if accs:
a = accs[0]
a_info: InstanceState = inspect(a)
if a_info.transient or a_info.pending:
log_right(f"added {a.info_str} for {username} in {i.path}")
res_message = "Added right"
elif a_info.modified:
log_right(f"updated to {a.info_str} for {username} in {i.path}")
res_message = "Updated existing right"
else:
log_right(
f"skipped {a.info_str} for {username} in {i.path} because an active right already exists"
)
res_message = "Skipped, because an active right already exists. Expire the active right first."
db.session.commit()
return json_response({"message": res_message})
[docs]@manage_page.put("/permissions/add", model=PermissionSingleEditModel)
def add_permission(m: PermissionSingleEditModel):
i = get_item_or_abort(m.id)
is_owner = verify_permission_edit_access(i, m.type)
accs = add_perm(m, i)
if accs:
a = accs[0]
check_ownership_loss(is_owner, i)
log_right(f"added {a.info_str} for {seq_to_str(m.groups)} in {i.path}")
db.session.commit()
return permission_response(m)
[docs]def permission_response(m: PermissionEditModel):
return json_response({"not_exist": m.nonexistent_groups})
[docs]def log_right(s: str):
u = get_current_user_object()
log_info(f"RIGHTS: {u.name} {s}")
[docs]def get_group_and_doc(doc_id: int, username: str) -> tuple[UserGroup, DocInfo]:
i = get_item_or_abort(doc_id)
verify_permission_edit_access(i, AccessType.view)
g = UserGroup.get_by_name(username)
if not g:
raise RouteException("User not found")
return g, i
[docs]def raise_or_redirect(message: str, redir: str | None = None) -> Response:
if not redir:
raise RouteException(message)
url = list(urlparse(redir))
query = dict(parse_qsl(url[4]))
query |= {"error": message}
url[4] = urlencode(query)
return safe_redirect(urlunparse(url))
[docs]@manage_page.get("/permissions/expire/<int:doc_id>/<username>")
def expire_permission_url(doc_id: int, username: str, redir: str | None = None):
g, i = get_group_and_doc(doc_id, username)
ba: BlockAccess | None = BlockAccess.query.filter_by(
type=AccessType.view.value,
block_id=i.id,
usergroup_id=g.id,
).first()
if not ba:
return raise_or_redirect("Right not found.", redir)
if ba.expired:
return raise_or_redirect("Right is already expired.", redir)
ba.accessible_to = get_current_time()
if ba.duration:
ba.duration = None
ba.duration_from = None
ba.duration_to = None
db.session.commit()
return ok_response() if not redir else safe_redirect(redir)
[docs]@manage_page.get("/permissions/confirm/<int:doc_id>/<username>")
def confirm_permission_url(doc_id: int, username: str, redir: str | None = None):
g, i = get_group_and_doc(doc_id, username)
m = PermissionRemoveModel(id=doc_id, type=AccessType.view, group=g.id)
return do_confirm_permission(m, i, redir)
[docs]@manage_page.put("/permissions/confirm", model=PermissionRemoveModel)
def confirm_permission(m: PermissionRemoveModel) -> Response:
i = get_item_or_abort(m.id)
verify_permission_edit_access(i, m.type)
return do_confirm_permission(m, i)
[docs]def do_confirm_permission(
m: PermissionRemoveModel, i: DocInfo, redir: str | None = None
):
ba: BlockAccess | None = BlockAccess.query.filter_by(
type=m.type.value,
block_id=m.id,
usergroup_id=m.group,
).first()
if not ba:
return raise_or_redirect("Right not found.", redir)
if not ba.require_confirm:
return raise_or_redirect(
f"{m.type.name} right for {ba.usergroup.name} does not require confirmation or it was already confirmed.",
redir,
)
ba.do_confirm()
ug: UserGroup = UserGroup.query.get(m.group)
log_right(f"confirmed {ba.info_str} for {ug.name} in {i.path}")
db.session.commit()
return ok_response() if not redir else safe_redirect(redir)
[docs]@manage_page.put("/permissions/edit", model=PermissionMassEditModel)
def edit_permissions(m: PermissionMassEditModel) -> Response:
groups = m.group_objects
nonexistent = set(m.groups) - {g.name for g in groups}
if nonexistent:
raise RouteException(f"Non-existent groups: {nonexistent}")
items = (
Block.query.filter(
Block.id.in_(m.ids)
& Block.type_id.in_([BlockType.Document.value, BlockType.Folder.value])
)
.order_by(Block.id)
.all()
)
a = None
owned_items_before = set()
for i in items:
checked_owner = verify_permission_edit_access(i, m.type)
if checked_owner:
owned_items_before.add(i)
if m.action == EditOption.Add:
accs = add_perm(m, i)
if accs:
a = accs[0]
else:
for g in groups:
a = remove_perm(g, i, m.type) or a
if m.type == AccessType.owner:
owned_items_after = set()
u = get_current_user_object()
for i in items:
if u.has_ownership(i):
owned_items_after.add(i)
if owned_items_before != owned_items_after:
raise AccessDenied("You cannot remove ownership from yourself.")
if a:
action = "added" if m.action == EditOption.Add else "removed"
log_right(
f"{action} {a.info_str} for {seq_to_str(m.groups)} in blocks: {seq_to_str(list(str(x) for x in m.ids))}"
)
db.session.commit()
return permission_response(m)
[docs]def add_perm(
p: PermissionEditModel,
item: Item,
replace_active_duration: bool = True,
) -> list[BlockAccess]:
if get_current_user_object().get_personal_folder().id == item.id:
if p.type == AccessType.owner:
raise AccessDenied("You cannot add owners to your personal folder.")
opt = p.time.effective_opt
accs = []
for group in p.group_objects:
a = grant_access(
group,
item,
p.type,
accessible_from=opt.ffrom,
accessible_to=opt.to,
duration_from=opt.durationFrom,
duration_to=opt.durationTo,
duration=opt.duration,
require_confirm=p.confirm,
replace_active_duration=replace_active_duration,
)
accs.append(a)
return accs
[docs]@manage_page.put("/permissions/remove", model=PermissionRemoveModel)
def remove_permission(m: PermissionRemoveModel) -> Response:
i = get_item_or_abort(m.id)
had_ownership = verify_permission_edit_access(i, m.type)
ug: UserGroup = UserGroup.query.get(m.group)
if not ug:
raise RouteException("User group not found")
a = remove_perm(ug, i.block, m.type)
check_ownership_loss(had_ownership, i)
log_right(f"removed {a.info_str} for {ug.name} in {i.path}")
db.session.commit()
return ok_response()
[docs]@dataclass
class PermissionClearModel:
paths: list[str]
type: AccessType = field(metadata={"by_value": True})
[docs]@manage_page.put("/permissions/clear", model=PermissionClearModel)
def clear_permissions(m: PermissionClearModel) -> Response:
for p in m.paths:
i = DocEntry.find_by_path(p, try_translation=True)
if not i:
i = Folder.find_by_path(p)
if not i:
raise RouteException(f"Item not found: {p}")
verify_ownership(i)
i.block.accesses = {
(ugid, permtype): v
for (ugid, permtype), v in i.block.accesses.items()
if permtype != m.type.value
}
db.session.commit()
return ok_response()
# noinspection PyShadowingBuiltins
[docs]@manage_page.post("/permissions/selfExpire")
def self_expire_permission(id: int) -> Response:
i = get_item_or_abort(id)
acc = verify_view_access(i, require=False)
if not acc:
return ok_response()
acc = get_single_view_access(i)
acc.accessible_to = get_current_time()
log_right(f"self-expired view access in {i.path}")
db.session.commit()
return ok_response()
[docs]def remove_perm(group: UserGroup, b: Block, t: AccessType):
return remove_access(group, b, t)
[docs]def check_ownership_loss(had_ownership, item):
db.session.flush()
db.session.refresh(item)
if had_ownership and not has_ownership(item):
raise AccessDenied("You cannot remove ownership from yourself.")
[docs]@manage_page.get("/alias/<int:doc_id>")
def get_doc_names(doc_id: int) -> Response:
d = get_doc_or_abort(doc_id)
verify_manage_access(d)
return json_response(d.aliases)
[docs]@manage_page.put("/alias/<int:doc_id>/<path:new_alias>")
def add_alias(doc_id: int, new_alias: str, public: bool = True) -> Response:
d = get_doc_or_abort(doc_id)
verify_manage_access(d)
new_alias = new_alias.strip("/")
validate_item_and_create_intermediate_folders(
new_alias, BlockType.Document, get_current_user_group_object()
)
d.add_alias(new_alias, public)
db.session.commit()
return ok_response()
[docs]@manage_page.post("/alias/<path:alias>")
def change_alias(alias: str, new_name: str, public: bool = True) -> Response:
alias = alias.strip("/")
new_alias = new_name.strip("/")
doc = DocEntry.find_by_path(alias, try_translation=False)
if doc is None:
raise NotExist("The document does not exist!")
verify_manage_access(doc)
if alias != new_alias:
dst_f = Folder.find_first_existing(new_alias)
if not get_current_user_object().can_write_to_folder(dst_f):
raise AccessDenied(
"You don't have permission to write to the destination folder."
)
validate_item_and_create_intermediate_folders(
new_alias, BlockType.Document, get_current_user_group_object()
)
doc.name = new_alias
doc.public = public
if all(not a.public for a in doc.aliases):
raise RouteException(
"This is the only visible name for this document, so you cannot make it invisible."
)
db.session.commit()
return ok_response()
[docs]@manage_page.delete("/alias/<path:alias>")
def remove_alias(alias: str) -> Response:
alias = alias.strip("/")
doc = DocEntry.find_by_path(alias, try_translation=False)
if doc is None:
raise NotExist("The document does not exist!")
verify_manage_access(doc)
if len(doc.aliases) <= 1:
raise AccessDenied("You can't delete the only name the document has.")
f = Folder.find_first_existing(alias)
if not get_current_user_object().can_write_to_folder(f):
raise AccessDenied("You don't have permission to write to that folder.")
db.session.delete(doc)
db.session.commit()
return ok_response()
[docs]@manage_page.put("/rename/<int:item_id>")
def rename_folder(item_id: int) -> Response:
new_name = request.get_json()["new_name"].strip("/")
d = DocEntry.find_by_id(item_id)
if d:
raise AccessDenied("Rename route is no longer supported for documents.")
f = get_folder_or_abort(item_id)
verify_manage_access(f)
parent, _ = split_location(new_name)
parent_f = Folder.find_by_path(parent)
if parent_f is None:
# Maybe do a recursive create with permission checks here later?
raise AccessDenied("The location does not exist.")
if parent_f.id == item_id:
raise AccessDenied("A folder cannot contain itself.")
validate_item(new_name, BlockType.Folder)
f.rename_path(new_name)
db.session.commit()
return json_response({"new_name": new_name})
[docs]@manage_page.get("/permissions/get/<int:item_id>")
def get_permissions(item_id: int) -> Response:
i = get_item_or_abort(item_id)
verify_manage_access(i)
grouprights = get_rights_holders(item_id)
return json_response(
{
"grouprights": grouprights,
"accesstypes": AccessTypeModel.query.all(),
"orgs": UserGroup.get_organizations(),
},
date_conversion=True,
)
[docs]@manage_page.get("/defaultPermissions/<object_type>/get/<int:folder_id>")
def get_default_document_permissions(folder_id: int, object_type: str) -> Response:
f = get_folder_or_abort(folder_id)
verify_manage_access(f)
grouprights = get_default_rights_holders(f, BlockType.from_str(object_type))
return json_response({"grouprights": grouprights}, date_conversion=True)
[docs]@manage_page.put("/defaultPermissions/add", model=DefaultPermissionModel)
def add_default_doc_permission(m: DefaultPermissionModel) -> Response:
i = get_folder_or_abort(m.id)
verify_permission_edit_access(i, m.type)
opt = m.time.effective_opt
grant_default_access(
m.group_objects,
i,
m.type,
BlockType.from_str(m.item_type.name),
accessible_from=opt.ffrom,
accessible_to=opt.to,
duration_from=opt.durationFrom,
duration_to=opt.durationTo,
duration=opt.duration,
)
db.session.commit()
return permission_response(m)
[docs]@manage_page.put("/defaultPermissions/remove", model=DefaultPermissionRemoveModel)
def remove_default_doc_permission(m: DefaultPermissionRemoveModel) -> Response:
f = get_folder_or_abort(m.id)
verify_permission_edit_access(f, m.type)
ug = UserGroup.query.get(m.group)
if not ug:
raise NotExist("Usergroup not found")
remove_default_access(ug, f, m.type, BlockType.from_str(m.item_type.name))
db.session.commit()
return ok_response()
[docs]def verify_permission_edit_access(i: ItemOrBlock, perm_type: AccessType) -> bool:
"""Verifies that the user has right to edit a permission.
:param i: The item to check for permission.
:param perm_type: The permission type.
:return: True if owner permission was checked, false if just manage access.
"""
if perm_type == AccessType.owner:
verify_ownership(i)
return True
else:
verify_manage_access(i)
return False
[docs]@manage_page.delete("/documents/<int:doc_id>")
def del_document(doc_id: int) -> Response:
d = get_doc_or_abort(doc_id)
verify_ownership(d)
f = get_trash_folder()
if d.path.startswith(f.path):
return ok_response()
if isinstance(d, Translation):
deleted_doc = DocEntry.create(
f"{f.path}/tl_{d.id}_{d.src_docid}_{d.lang_id}_deleted",
title=f"Deleted translation (src_docid: {d.src_docid}, lang_id: {d.lang_id})",
)
d.docentry = deleted_doc
else:
move_document(d, f)
db.session.commit()
return ok_response()
TRASH_FOLDER_PATH = f"roskis"
[docs]def get_trash_folder() -> Folder:
f = Folder.find_by_path(TRASH_FOLDER_PATH)
if not f:
f = Folder.create(
TRASH_FOLDER_PATH,
owner_groups=UserGroup.get_admin_group(),
title="Roskakori",
)
return f
[docs]@manage_page.delete("/folders/<int:folder_id>")
def delete_folder(folder_id: int) -> Response:
f = get_folder_or_abort(folder_id)
verify_ownership(f)
if f.location == "users":
raise AccessDenied("Personal folders cannot be deleted.")
trash = get_trash_folder()
if f.location == trash.path:
raise RouteException("Folder is already deleted.")
trash_path = find_free_name(trash, f)
f.rename_path(trash_path)
db.session.commit()
return ok_response()
[docs]@manage_page.put("/changeTitle/<int:item_id>")
def change_title(item_id: int, new_title: str) -> Response:
item = get_item_or_abort(item_id)
verify_edit_access(item)
item.title = new_title
db.session.commit()
return ok_response()
[docs]def get_copy_folder_params(folder_id: int, dest: str, exclude: str | None):
f = get_folder_or_abort(folder_id)
verify_copy_access(f, message=f"Missing copy access to folder {f.path}")
compiled = get_pattern(exclude)
if path_includes(dest, f.path):
raise AccessDenied("Cannot copy folder inside of itself.")
return f, dest, compiled
[docs]@dataclass
class CopyOptions:
copy_active_rights: bool = True
copy_expired_rights: bool = False
stop_on_errors: bool = True
[docs]@manage_page.post("/copy/<int:folder_id>")
def copy_folder_endpoint(
folder_id: int,
destination: str,
exclude: str | None,
copy_options: CopyOptions = field(default_factory=CopyOptions),
) -> Response:
f, dest, compiled = get_copy_folder_params(folder_id, destination, exclude)
o = get_current_user_group_object()
nf = Folder.find_by_path(dest)
if not nf:
validate_item_and_create_intermediate_folders(dest, BlockType.Folder, o)
nf = Folder.create(
dest, o, creation_opts=FolderCreationOptions(apply_default_rights=True)
)
u = get_current_user_object()
errors = copy_folder(f, nf, u, compiled, copy_options)
if errors and copy_options.stop_on_errors:
db.session.rollback()
else:
db.session.commit()
return json_response({"new_folder": nf, "errors": [str(e) for e in errors]})
[docs]def get_pattern(exclude: str | None) -> Pattern[str]:
if not exclude:
exclude = "a^"
try:
return re.compile(exclude)
except:
raise RouteException(f"Wrong pattern format: {exclude}")
[docs]@manage_page.post("/copy/<int:folder_id>/preview")
def copy_folder_preview(
folder_id: int, destination: str, exclude: str | None
) -> Response:
f, dest, compiled = get_copy_folder_params(folder_id, destination, exclude)
preview_list = []
for i in enum_items(f, compiled):
preview_list.append(
{
"to": join_location(dest, i.get_relative_path(f.path)),
"from": i.path,
}
)
return json_response(
{
"preview": preview_list,
"dest_exists": Folder.find_by_path(dest) is not None,
}
)
[docs]def enum_items(folder: Folder, exclude_re: Pattern) -> Generator[Item, None, None]:
for d in folder.get_all_documents(include_subdirs=False):
if not exclude_re.search(d.path):
yield d
for f in folder.get_all_folders():
if not exclude_re.search(f.path):
yield f
yield from enum_items(f, exclude_re)
[docs]def copy_folder(
from_folder: Folder,
to_folder: Folder,
user_who_copies: User,
exclude_re: Pattern,
options: CopyOptions,
) -> list[ValidationException]:
errors = []
process_queue: list[tuple[Folder, Folder]] = [(from_folder, to_folder)]
user_who_copies_group = user_who_copies.get_personal_group()
default_right_documents = []
folder_opts = FolderCreationOptions(get_templates_rights_from_parent=False)
def do_copy_doc(doc: DocInfo, folder_to: Folder) -> bool | None:
if exclude_re.search(doc.path):
return False
if not user_who_copies.has_copy_access(doc):
raise AccessDenied(f"Missing copy access to document {doc.path}")
nd_path = join_location(folder_to.path, doc.short_name)
if DocEntry.find_by_path(nd_path):
raise AccessDenied(f"Document already exists at path {nd_path}")
nd = DocEntry.create(
nd_path,
title=doc.title,
folder_opts=folder_opts,
)
copy_rights(
doc,
nd,
new_owner=user_who_copies,
copy_active=options.copy_active_rights,
copy_expired=options.copy_expired_rights,
)
copy_default_rights(
nd, BlockType.Document, owners_to_skip=[user_who_copies_group]
)
nd.document.modifier_group_id = user_who_copies_group.id
try:
for tr, new_tr in copy_document_and_enum_translations(
doc, nd, copy_uploads=True
):
copy_rights(
tr,
new_tr,
new_owner=user_who_copies,
copy_active=options.copy_active_rights,
copy_expired=options.copy_expired_rights,
)
copy_default_rights(
new_tr,
BlockType.Document,
owners_to_skip=[user_who_copies_group],
)
except ValidationException as e:
errors.append(e)
if options.stop_on_errors:
return None
return True
while process_queue:
f_from, f_to = process_queue.pop()
db.session.flush()
if not user_who_copies.can_write_to_folder(f_to):
raise AccessDenied(f"Missing edit access to folder {f_to.path}")
if not user_who_copies.has_copy_access(f_from):
raise AccessDenied(f"Missing copy access to folder {f_from.path}")
for d in f_from.get_all_documents(include_subdirs=False):
if is_some_default_right_document(d):
default_right_documents.append((d, f_to))
continue
match do_copy_doc(d, f_to):
case False:
continue
case None:
return errors
for f in f_from.get_all_folders():
if exclude_re.search(f.path):
continue
nf_path = join_location(f_to.path, f.short_name)
nf = Folder.find_by_path(nf_path)
if nf:
pass
else:
nf = Folder.create(
nf_path,
title=f.title,
creation_opts=folder_opts,
)
copy_rights(
f,
nf,
new_owner=user_who_copies,
copy_active=options.copy_active_rights,
copy_expired=options.copy_expired_rights,
)
copy_default_rights(
nf,
BlockType.Folder,
owners_to_skip=[user_who_copies_group],
)
process_queue.append((f, nf))
# Copy default permissions last to ensure they are not re-applied during copying, which
# can cause DB persistence errors
for d, f_to in default_right_documents:
db.session.flush()
match do_copy_doc(d, f_to):
case False:
continue
case None:
return errors
return errors