Source code for timApp.tests.server.test_importdata

"""Server tests for importData plugin."""
import json
from contextlib import contextmanager

import responses
from requests import PreparedRequest

from timApp.tests.browser.browsertest import BrowserTest
from timApp.timdb.sqa import db
from timApp.user.personaluniquecode import SchacPersonalUniqueCode, PersonalUniqueCode
from timApp.user.user import User
from timApp.user.usergroup import UserGroup


[docs]def field_result( missing_users=None, created_users=None, user_key="username", changed=0, unchanged=0, result_text="", codes=None, ): if not missing_users: missing_users = [] if not created_users: created_users = [] if not codes: codes = [[]] * len(missing_users) empty_user = { "email": None, "full_name": None, "given_name": None, "last_name": None, "origin": None, "password": None, "password_hash": None, "unique_codes": [], "username": None, } return { "result": result_text, "fieldresult": { "fields_changed": changed, "fields_ignored": 0, "fields_unchanged": unchanged, "users_created": created_users, "users_missing": [ { **empty_user, user_key: u, "unique_codes": c, } for u, c in zip(missing_users, codes) ], }, }
[docs]class ImportDataTestBase(BrowserTest):
[docs] def imp(self, d, data, expect, status: int, task=None, aalto_return=None): if not task: task = "t" with self.importdata_ctx(aalto_return): self.post_answer( "importData", f"{d.id}.{task}", data, expect_content=expect, expect_status=status, )
[docs] def grant_user_creation_right(self): self.current_user.add_to_group( UserGroup.get_user_creator_group(), added_by=None ) db.session.commit() db.session.refresh(self.current_user)
[docs]class ImportDataTest(ImportDataTestBase):
[docs] def test_importdata(self): self.login_test1() d = self.create_doc( initial_par=""" #- {defaultplugin=textfield} {#a#} {#b#} #- {plugin=importData #t} #- {plugin=importData #t2} prefilter: |!! return ["x;y;z", "a;b;c"]; !! #- {plugin=importData #t3} prefilter: |!! data.push("u;x;1"); return data; !! #- {plugin=importData #t4} joinProperty: x #- {plugin=importData #t5} joinProperty: studentID #- {plugin=importData #t6} joinProperty: id #- {plugin=importData #t7} joinProperty: email #- {plugin=importData #t8} joinProperty: username #- {plugin=importData #t9} joinProperty: studentID(jyu.fi) """ ) def imp_200(data, expect_web=None, fields=None, expect=None, task=None): import_input = {"data": data} if fields: import_input["fields"] = fields self.imp( d, import_input, expect if expect else {"web": expect_web}, 200, task ) def imp_400(data, expect, fields=None, task=None): import_input = {"data": data} if fields: import_input["fields"] = fields self.imp(d, import_input, expect, 400, task) imp_400("testuser1;x;x", "Task not found in the document: x") imp_400("testuser1;;x", "Invalid task name: ") imp_400("testuser1;a;x;z;y", "Task not found in the document: z") imp_200( "testuser1;a;x;z;y;w", field_result( result_text="Wrong lines: 1\n\ntestuser1;a;x;z;y;w: odd number of field-value pairs" ), ) imp_400("testuser1;1;2", "Task not found in the document: z", fields=["z"]) imp_400("testuser1;a;2", "Task not found in the document: z", fields=["a=z"]) imp_400( "testuser1;x;1;y;2\ntestuser1;z;3", "Task not found in the document: z", fields=["x=a", "y=b", "*"], ) imp_200("", field_result()) imp_200("x", field_result(result_text="Wrong lines: 1\n\nx: too few parts")) imp_200("x;x;x", field_result(missing_users=["x"])) imp_200("x;x", field_result(result_text="Wrong lines: 1\n\nx;x: too few parts")) imp_200("testuser1;a;x", field_result(changed=1)) imp_200("testuser1;a;x;b;y", field_result(changed=1, unchanged=1)) imp_200("testuser1;a;x\ntestuser1;b;y", field_result(unchanged=2)) imp_200( "testuser1;x;x", expect={"savedNew": 3, "valid": True, "web": field_result(unchanged=1)}, fields=["a"], ) imp_200( "testuser1;x;x", expect={ "savedNew": 5, "valid": True, "web": field_result(changed=1, unchanged=1), }, fields=["a", "b"], ) imp_200( "testuser1;x", expect={"savedNew": None, "valid": True, "web": field_result(unchanged=1)}, fields=["a", "b"], ) imp_200( "testuser1", expect={"savedNew": None, "valid": True, "web": field_result()}, fields=["a", "b"], ) imp_200("testuser1", field_result(missing_users=["x", "a"]), task="t2") imp_200( "testuser1;x;1;y;2\ntestuser1;z;3\ntestuser1", expect={"savedNew": 8, "valid": True, "web": field_result(changed=2)}, fields=["x=a", "y=b"], ) imp_200("x;a;1", field_result(missing_users=["x", "u"]), task="t3") imp_200("x;a;1", {"error": "Invalid joinProperty: x"}, task="t4") imp_200("x;a;1", {"error": "Invalid joinProperty: studentID"}, task="t5") imp_200( "x;a;1", { "error": "User ids must be ints (invalid literal for int() with base " "10: 'x')" }, task="t6", ) imp_200(f"{self.current_user.id};a;1", field_result(unchanged=1), task="t6") imp_200(f"{self.current_user.email};a;1", field_result(unchanged=1), task="t7") imp_200( f"{self.current_user.id};a;1", field_result( missing_users=[str(self.current_user.id)], user_key="email", ), task="t7", ) imp_200(f"{self.current_user.name};a;1", field_result(unchanged=1), task="t8") imp_200( f"{self.current_user.email};a;1", field_result(missing_users=[self.current_user.email]), task="t8", ) imp_200( f"x;a;1", field_result( missing_users=["imported_studentid_x"], codes=[[{"code": "x", "codetype": "studentID", "org": "jyu.fi"}]], ), task="t9", ) self.current_user.set_unique_codes( [ SchacPersonalUniqueCode(code="x", codetype="studentID", org="jyu.fi"), ] ) db.session.commit() imp_200(f"x;a;1", field_result(unchanged=1), task="t9")
[docs]class AaltoImportTest(ImportDataTestBase):
[docs] def test_aalto_import(self): self.login_test1() d = self.create_doc( initial_par=""" ``` {#t plugin="importData"} allowMissing: true aplus: course: 1234 ``` """ ) self.imp( d, { "data": """ UserID,StudentID,Email,Tags,1 Count,1 Total,1 Ratio,2 Count,2 Total,2 Ratio 123,12345X,matti.meikalainen@aalto.fi,aalto,5,200,1.0,4,700,1.0 """.strip(), }, {"web": {"error": "cannot send data from browser if aplus is given"}}, 200, ) self.imp( d, { "token": "xxx", }, "You do not have permission to create users.", 403, aalto_return=[ { "UserID": 123, "StudentID": "12345X", "Email": "matti.meikalainen@aalto.fi", "Tags": "aalto", "1 Count": 2, "1 Total": 100, "1 Ratio": 0.125, "2 Count": 0, "2 Total": 0, "2 Ratio": 0.0, } ], ) self.grant_user_creation_right() self.imp( d, { "token": "xxx", }, { "web": field_result( changed=5, # Fields with zero values are skipped, so that's why 5 and not 6 here. created_users=[ { "email": "matti.meikalainen@aalto.fi", "id": 5, "name": "matti.meikalainen@aalto.fi", "real_name": "Meikalainen Matti", "student_id": None, }, ], ), }, 200, aalto_return=[ { "UserID": 123, "StudentID": "12345X", "Email": "matti.meikalainen@aalto.fi", "Tags": "aalto", "1 Count": 5, "1 Total": 200, "1 Ratio": 1.0, "2 Count": 0, "2 Total": 700, "2 Ratio": 1.0, } ], ) matti_user = User.get_by_email("matti.meikalainen@aalto.fi") self.assertEqual( PersonalUniqueCode.find_by_student_id("12345X", "aalto.fi").user, matti_user ) self.verify_answer_content(f"{d.id}.count1", "c", "5", matti_user) self.verify_answer_content(f"{d.id}.total1", "c", "200", matti_user) self.verify_answer_content(f"{d.id}.ratio1", "c", "1.0", matti_user) self.verify_answer_content( f"{d.id}.count2", "c", None, matti_user, expected_count=0 ) self.verify_answer_content(f"{d.id}.total2", "c", "700", matti_user) self.verify_answer_content(f"{d.id}.ratio2", "c", "1.0", matti_user) self.imp( d, { "token": "xxx", }, { "web": field_result(unchanged=5), }, 200, aalto_return=[ { "UserID": 123, "StudentID": "12345X", "Email": "matti.meikalainen@aalto.fi", "Tags": "aalto", "1 Count": 5, "1 Total": 200, "1 Ratio": 1.0, "2 Count": 0, "2 Total": 700, "2 Ratio": 1.0, } ], ) puc = PersonalUniqueCode.find_by_student_id("12345X", "aalto.fi") self.assertIsNotNone(puc) u = puc.user self.assertEqual("matti.meikalainen@aalto.fi", u.email) self.assertEqual("matti.meikalainen@aalto.fi", u.name) self.assertEqual("Meikalainen Matti", u.real_name) self.imp( d, { "token": "xxx", }, 'Invalid email: ""', 400, aalto_return=[ { "UserID": 123, "StudentID": "12345Z", "Email": "", "Tags": "aalto", "1 Count": 5, "1 Total": 200, "1 Ratio": 1.0, "2 Count": 4, "2 Total": 700, "2 Ratio": 1.0, } ], )
[docs] def test_import_add_to_group(self): self.login_test1() d = self.create_doc( initial_par=""" #- {#t plugin=importData} addUsersToGroup: tg1 allowMissing: true """ ) self.imp( d, {"data": "testuser2;x;1"}, "Group does not exist: tg1", 400, ) ug = UserGroup.create("tg1") db.session.add(ug) ug.admin_doc = self.create_doc().block db.session.commit() self.imp( d, {"data": "testuser2;x;1"}, { "web": field_result(changed=1), }, 200, ) self.imp( d, {"data": "testuser3;x;1"}, { "web": field_result(changed=1), }, 200, ) ug = UserGroup.get_by_name("tg1") self.assertEqual( [self.test_user_2, self.test_user_3], sorted(ug.users, key=lambda u: u.name) )