Source code for timApp.tests.server.test_admin

from timApp.admin.user_cli import (
    find_and_merge_users,
    find_and_soft_delete,
    do_merge_users,
    do_soft_delete,
)
from timApp.document.docentry import DocEntry
from timApp.messaging.messagelist.listinfo import Channel
from timApp.tests.db.timdbtest import TEST_USER_1_ID, TEST_USER_2_ID, TEST_USER_3_ID
from timApp.tests.server.timroutetest import TimRouteTest
from timApp.timdb.sqa import db
from timApp.user.special_group_names import SPECIAL_USERNAMES
from timApp.user.user import User, UserInfo
from timApp.user.usercontact import ContactOrigin
from timApp.user.usergroup import UserGroup
from timApp.util.flask.requesthelper import RouteException, NotExist


[docs]class SearchTest(TimRouteTest):
[docs]class MergeTest(TimRouteTest):
[docs] def test_user_merge(self): self.login_test1() with self.assertRaises(RouteException): find_and_merge_users("testuser1", "testuser1") User.create_with_group( UserInfo( username="someguy", full_name="Some Guy", email="some.guy@example.com" ) ) db.session.commit() with self.assertRaises(RouteException): find_and_merge_users("testuser1", "someguy") with self.assertRaises(NotExist): find_and_merge_users("testuser1", "x") for u in SPECIAL_USERNAMES: with self.assertRaises(RouteException): find_and_merge_users("testuser1", u) d = self.create_doc(initial_par="#- {plugin=textfield #t}") self.post_answer("textfield", f"{d.id}.t", user_input={"c": "x"}) path = d.path tg1 = UserGroup.create("tg1") tg2 = UserGroup.create("tg2") self.test_user_1.add_to_group(tg1, added_by=self.test_user_3) self.test_user_1.add_to_group(tg2, added_by=None) t1pg = self.test_user_1.get_personal_group() t2pg = self.test_user_2.get_personal_group() db.session.commit() def membership_set(user: User): return {(m.usergroup_id, m.added_by) for m in user.memberships} def check_memberships( primary: User, secondary: User, primary_personal: UserGroup, secondary_personal: UserGroup, ): db.session.refresh(primary) db.session.refresh(secondary) self.assertEqual( {(primary_personal.id, None), (tg1.id, TEST_USER_3_ID), (tg2.id, None)}, membership_set(primary), ) self.assertEqual({(secondary_personal.id, None)}, membership_set(secondary)) def check_contacts( primary: User, secondary: User, extra_emails: list[str] = None ): db.session.refresh(primary) db.session.refresh(secondary) extra_emails = extra_emails or [] self.assertEqual( {(primary.email, Channel.EMAIL), (secondary.email, Channel.EMAIL)} | {(e, Channel.EMAIL) for e in extra_emails}, {(c.contact, c.channel) for c in primary.contacts}, ) self.assertEqual( {(secondary.email, Channel.EMAIL)}, {(c.contact, c.channel) for c in secondary.contacts}, ) r = find_and_merge_users("testuser2", "testuser1") self.assertEqual(1, r.accesses) self.assertEqual(0, r.annotations) self.assertEqual(1, r.answers) self.assertEqual(0, r.lectureanswers) self.assertEqual(0, r.messages) self.assertEqual(0, r.notes) self.assertEqual(0, r.owned_lectures) self.assertEqual(0, r.readparagraphs) self.assertEqual(0, r.velps) self.assertEqual(2, r.groups) self.assertEqual(1, r.contacts) db.session.commit() self.assertIsNone(DocEntry.find_by_path(path)) check_memberships(self.test_user_2, self.test_user_1, t2pg, t1pg) check_contacts(self.test_user_2, self.test_user_1) r = find_and_merge_users("testuser2", "testuser1") self.assertEqual(0, r.accesses) self.assertEqual(0, r.annotations) self.assertEqual(0, r.answers) self.assertEqual(0, r.lectureanswers) self.assertEqual(0, r.messages) self.assertEqual(0, r.notes) self.assertEqual(0, r.owned_lectures) self.assertEqual(0, r.readparagraphs) self.assertEqual(0, r.velps) self.assertEqual(0, r.groups) self.assertEqual(0, r.contacts) db.session.commit() check_memberships(self.test_user_2, self.test_user_1, t2pg, t1pg) check_contacts(self.test_user_2, self.test_user_1) r = find_and_merge_users("testuser1", "testuser2") self.assertEqual(1, r.accesses) self.assertEqual(0, r.annotations) self.assertEqual(1, r.answers) self.assertEqual(0, r.lectureanswers) self.assertEqual(0, r.messages) self.assertEqual(0, r.notes) self.assertEqual(0, r.owned_lectures) self.assertEqual(0, r.readparagraphs) self.assertEqual(0, r.velps) self.assertEqual(2, r.groups) self.assertEqual(1, r.contacts) db.session.commit() check_memberships(self.test_user_1, self.test_user_2, t1pg, t2pg) check_contacts(self.test_user_1, self.test_user_2) self.assertIsNotNone(DocEntry.find_by_path(path)) r = find_and_merge_users("testuser1", "testuser2") self.assertEqual(0, r.accesses) self.assertEqual(0, r.annotations) self.assertEqual(0, r.answers) self.assertEqual(0, r.lectureanswers) self.assertEqual(0, r.messages) self.assertEqual(0, r.notes) self.assertEqual(0, r.owned_lectures) self.assertEqual(0, r.readparagraphs) self.assertEqual(0, r.velps) self.assertEqual(0, r.groups) self.assertEqual(0, r.contacts) db.session.commit() test_user_2 = self.test_user_2 check_memberships(self.test_user_1, test_user_2, t1pg, t2pg) check_contacts(self.test_user_1, self.test_user_2) self.test_user_2.set_emails( ["some_email@example.com"], ContactOrigin.Custom, remove=False, force_verify=True, ) r = find_and_merge_users("testuser1", "testuser2") self.assertEqual(0, r.accesses) self.assertEqual(0, r.annotations) self.assertEqual(0, r.answers) self.assertEqual(0, r.lectureanswers) self.assertEqual(0, r.messages) self.assertEqual(0, r.notes) self.assertEqual(0, r.owned_lectures) self.assertEqual(0, r.readparagraphs) self.assertEqual(0, r.velps) self.assertEqual(0, r.groups) self.assertEqual(1, r.contacts) db.session.commit() test_user_2 = self.test_user_2 check_memberships(self.test_user_1, test_user_2, t1pg, t2pg) check_contacts(self.test_user_1, self.test_user_2, ["some_email@example.com"]) find_and_soft_delete("testuser2") self.assertIsNone(User.get_by_name("testuser2")) u = User.get_by_name(f"testuser2_deleted_{test_user_2.id}") db.session.refresh(u) self.assertIsNotNone(u) self.assertEqual(u.email, u.primary_email_contact.contact) with self.assertRaises(RouteException): find_and_soft_delete("testuser2") with self.assertRaises(RouteException): find_and_soft_delete(f"testuser2_deleted_{test_user_2.id}")
[docs] def test_merge_multi(self): u1, ug1 = User.create_with_group( UserInfo(username="user1", email="user1@example.com", full_name="User 1") ) u1_new, ug1_new = User.create_with_group( UserInfo( username="user1_new", email="user1_new@example.com", full_name="User 1 New", ) ) db.session.commit() # First, merge two users do_merge_users(u1_new, u1) do_soft_delete(u1) db.session.commit() self.assertEqual(u1.email, f"user1@example.com_deleted_{u1.id}") # Create a user again with the same info as original u1 # This should pass since previous u1 has been disabled u1_again, ug1 = User.create_with_group( UserInfo(username="user1", email="user1@example.com", full_name="User 1") ) db.session.commit() # Simulate u1_new logging in again (e.g. via HAKA) do_merge_users(u1_new, u1_again) do_soft_delete(u1_again) db.session.commit() self.assertEqual(u1_again.email, f"user1@example.com_deleted_{u1_again.id}")
[docs]class UserDeleteTest(TimRouteTest):
[docs] def test_deleted_user_logout(self): u, _ = User.create_with_group( UserInfo(username="m@example.com", email="m@example.com", full_name="M") ) db.session.commit() self.get("/") self.login(username="m@example.com") d = self.create_doc() find_and_soft_delete("m@example.com") db.session.commit() self.get(d.url, expect_status=302, expect_content="/") self.get(d.url, expect_status=403) self.login(username=f"m@example.com_deleted_{u.id}") self.post_answer( "x", f"{d.id}.t", user_input={}, expect_status=403, expect_content="Please refresh the page and log in again.", ) self.post_answer( "x", f"{d.id}.t", user_input={}, expect_status=400, expect_content="Task not found in the document: t", )