Source code for timApp.util.flask.search

"""Routes for searching."""
import os
import re
import sre_constants
import subprocess
import time
from dataclasses import dataclass
from datetime import datetime
from io import StringIO
from pathlib import Path
from typing import Match

from flask import Blueprint, json
from flask import request
from sqlalchemy.orm import joinedload, lazyload, defaultload

from timApp.auth.accesshelper import has_view_access, verify_admin, has_edit_access
from timApp.auth.accesstype import AccessType
from timApp.auth.auth_models import BlockAccess
from timApp.auth.sessioninfo import get_current_user_object
from timApp.document.docentry import DocEntry, get_documents
from timApp.document.docinfo import DocInfo
from timApp.folder.folder import Folder
from timApp.item.block import Block
from timApp.item.routes import get_document_relevance
from timApp.item.tag import Tag
from timApp.timdb.dbaccess import get_files_path
from timApp.timdb.exceptions import InvalidReferenceException
from timApp.util.flask.requesthelper import (
    get_option,
    use_model,
    RouteException,
    NotExist,
)
from timApp.util.flask.responsehelper import json_response
from timApp.util.logger import log_error, log_warning
from timApp.util.utils import get_error_message, cache_folder_path

search_routes = Blueprint("search", __name__, url_prefix="/search")

WHITE_LIST = ["c#"]  # Ignore query length limitations
MIN_QUERY_LENGTH = 3  # For word and title search. Tags have no limitations.
MIN_WHOLE_WORDS_QUERY_LENGTH = 1  # For whole word search.
PREVIEW_LENGTH = 40  # Before and after the search word separately.
PREVIEW_MAX_LENGTH = 160
SEARCH_CACHE_FOLDER = cache_folder_path / "searchcache"
PROCESSED_CONTENT_FILE_PATH = SEARCH_CACHE_FOLDER / "content_all_processed.log"
PROCESSED_TITLE_FILE_PATH = SEARCH_CACHE_FOLDER / "titles_all_processed.log"
RAW_CONTENT_FILE_PATH = SEARCH_CACHE_FOLDER / "all.log"
DEFAULT_RELEVANCE = 10


[docs]@dataclass class GetFoldersModel: folder: str
[docs]@search_routes.get("getFolders") @use_model(GetFoldersModel) def get_subfolders(m: GetFoldersModel): """ Returns subfolders of the starting folder. :return: Response containing a list of subfolder paths. """ root_path = m.folder if root_path == "": return json_response([]) folders = Folder.query.filter(Folder.location.like(root_path + "%")).limit(50) folders_viewable = [root_path] for folder in folders: if has_view_access(folder): folders_viewable.append(folder.path) return json_response(folders_viewable)
[docs]def get_common_search_params(req) -> tuple[str, str, bool, bool, bool, bool]: """ Picks parameters that are common in the search routes from a request. :param req: Request. :return: A tuple with six values. """ query = req.args.get("query", "") case_sensitive = get_option(req, "caseSensitive", default=False, cast=bool) folder = req.args.get("folder", "") regex = get_option(req, "regex", default=False, cast=bool) search_owned_docs = get_option(req, "searchOwned", default=False, cast=bool) search_whole_words = get_option(req, "searchWholeWords", default=False, cast=bool) return query, folder, regex, case_sensitive, search_whole_words, search_owned_docs
[docs]def log_search_error( error: str, query: str, doc: str, tag: str = "", par: str = "", title: bool = False, path: bool = False, ) -> None: """ Forms an error report and sends it to timLog. :param error: The error's message :param query: Search word. :param doc: Document identifier. :param tag: Tag name. :param par: Par id. :param title: If error was in title search. :param path: If error was in path search. :return: None. """ if not error: error = "Unknown error" common_part = f"'{error}' while searching '{query}' in document {doc}" tag_part = "" par_part = "" title_part = "" path_part = "" if tag: tag_part = f" tag {tag}" if par: par_part = f" paragraph {par}" if title: title_part = " title" if path: path_part = " path" log_error(common_part + tag_part + par_part + title_part + path_part)
[docs]def preview_result( md: str, query, m: Match[str], snippet_length: int = PREVIEW_LENGTH, max_length: int = PREVIEW_MAX_LENGTH, ) -> str: """ Forms preview of the match paragraph. :param md: Paragraph markdown to preview. :param query: Search word. :param m: Match object. :param snippet_length: The length of preview before and after search word. :param max_length: The maximum allowed length of the preview. :return: Preview with set amount of characters around search word. """ start_index = m.start() - snippet_length end_index = m.end() + snippet_length # If the match is longer than given threshold, limit its size. if end_index - start_index > max_length: end_index = m.start() + len(query) + snippet_length prefix = "..." postfix = "..." if start_index < 0: start_index = 0 prefix = "" if end_index > len(md): end_index = len(md) postfix = "" return prefix + md[start_index:end_index] + postfix
[docs]class WordResult: """ One match word with location and match word. """ def __init__(self, match_word: str, match_start: int, match_end: int): """ Title or paragraph word result object constructor. :param match_word: String that matched query. :param match_start: Match start index. :param match_end: Match end index. """ self.match_word = match_word self.match_start = match_start self.match_end = match_end
[docs] def to_json(self): """ :return: A dictionary containing object data, suitable for JSON-conversion. """ return { "match_word": self.match_word, "match_start": self.match_start, "match_end": self.match_end, }
[docs]class ParResult: """ Paragraph search results. """ def __init__( self, par_id: str = "", preview: str = "", word_results=None, alt_num_results=0 ): """ Paragraph result object constructor. :param par_id: Paragrapg id. :param preview: A snippet from paragraph markdown. :param word_results: List of word search results in the paragraph. :param alt_num_results: Alternative to listing word results. """ if word_results is None: word_results = [] self.par_id = par_id self.preview = preview self.word_results = word_results self.alt_num_results = alt_num_results
[docs] def add_result(self, result: WordResult) -> None: """ Add new word result. :param result: New word result from paragraph markdown. :return: None. """ self.word_results.append(result)
[docs] def has_results(self) -> bool: """ :return: True if the object contains results. """ return len(self.word_results) > 0 or self.alt_num_results > 0
[docs] def to_json(self): """ :return: A dictionary of attributes and derived attributes. """ results_dicts = [] for r in self.word_results: results_dicts.append(r) return { "par_id": self.par_id, "preview": self.preview, "results": results_dicts, "num_results": self.get_match_count(), }
[docs] def get_match_count(self) -> int: """ :return: How many matches there are in this paragraph. """ if len(self.word_results) > 0: return len(self.word_results) else: return self.alt_num_results
[docs]class TitleResult: """ Title search result containing a list of match data. """ def __init__(self, word_results=None, alt_num_results: int = 0): """ Title result object constructor. :param word_results: List of word results from the title string. :param alt_num_results: Alternative to listing word results. """ if word_results is None: word_results = [] self.word_results = word_results self.alt_num_results = alt_num_results
[docs] def add_result(self, result: WordResult) -> None: """ Add new result to the list. :param result: New word result. :return: None. """ self.word_results.append(result)
[docs] def has_results(self) -> bool: """ :return: Whether the object contains any results. """ return len(self.word_results) > 0 or self.alt_num_results > 0
[docs] def to_json(self): """ :return: A dictionary of attributes and derived attributes, suitable for JSON-conversion. """ results = [] for r in self.word_results: results.append(r) return {"results": results, "num_results": self.get_match_count()}
[docs] def get_match_count(self) -> int: """ :return: How many match words the title has. """ if len(self.word_results) > 0: return len(self.word_results) else: return self.alt_num_results
[docs]class DocResult: """ Contains one document's title and word search information. """ def __init__( self, doc_info: DocInfo, par_results=None, title_results=None, incomplete=False ): if par_results is None: par_results = [] if title_results is None: title_results = [] self.doc_info = doc_info self.par_results = par_results self.title_results = title_results self.incomplete = incomplete
[docs] def add_par_result(self, result: ParResult) -> None: """ Add new paragraph search result to the list. :param result: New paragraph result. :return: None. """ self.par_results.append(result)
[docs] def add_title_result(self, result: TitleResult) -> None: """ Add new title search result to the list. :param result: New title result. :return: None. """ self.title_results.append(result)
[docs] def has_results(self) -> bool: """ :return: Whether the document has any results in it. """ return len(self.par_results) > 0 or len(self.title_results) > 0
[docs] def to_json(self): """ :return: A dictionary of the object, suitable for JSON-conversion. """ par_result_dicts = [] for r in self.par_results: par_result_dicts.append(r) title_result_dicts = [] for r in self.title_results: title_result_dicts.append(r) return { "doc": self.doc_info, "incomplete": self.incomplete, "title_results": title_result_dicts, "num_title_results": self.get_title_match_count(), "par_results": par_result_dicts, "num_par_results": self.get_par_match_count(), }
[docs] def get_par_match_count(self) -> int: """ :return: Total document count for paragraph word matches. """ count = 0 for p in self.par_results: count += p.get_match_count() return count
[docs] def get_title_match_count(self) -> int: """ :return: Total document count for title matches. """ count = 0 for p in self.title_results: count += p.get_match_count() return count
[docs]def result_response( results, title_result_count: int = 0, word_result_count: int = 0, incomplete_search_reason="", ): """ Formats result data for JSON-response. :param results: List of result dictionaries. :param title_result_count: Number of title results. :param word_result_count: Number of paragraph word results. :param incomplete_search_reason: Whether search was cut short. :return: Dictionary containing search results. """ return { "title_result_count": title_result_count, "word_result_count": word_result_count, "errors": [], "incomplete_search_reason": incomplete_search_reason, "results": results, }
[docs]def validate_query(query: str, search_whole_words: bool) -> None: """ Abort if query is too short. :param query: Search word(s). :param search_whole_words: Whole words search has different limits. :return: None. """ if len(query.strip()) < MIN_QUERY_LENGTH and not search_whole_words: if query.strip().lower() not in WHITE_LIST: raise RouteException( f"Search text must be at least {MIN_QUERY_LENGTH} character(s) long with whitespace stripped." ) if len(query.strip()) < MIN_WHOLE_WORDS_QUERY_LENGTH and search_whole_words: raise RouteException( f"Whole word search text must be at least {MIN_WHOLE_WORDS_QUERY_LENGTH} character(s) " f"long with whitespace stripped." )
# Query options for loading DocEntry relevance eagerly; it should speed up search cache processing because # we know we'll need relevance. docentry_eager_relevance_opt = ( defaultload(DocEntry._block).joinedload(Block.relevance), )
[docs]def add_doc_info_title_line(doc_id: int) -> str | None: """ Forms a JSON-compatible string with doc id, title and path. :param doc_id: Document id. :return: String with doc data. """ doc_info = DocEntry.find_by_id( doc_id, docentry_load_opts=docentry_eager_relevance_opt ) if not doc_info: return None doc_relevance = get_document_relevance(doc_info) return ( json.dumps( {"doc_id": doc_id, "d_r": doc_relevance, "doc_title": doc_info.title}, ensure_ascii=False, ) + "\n" )
[docs]def add_doc_info_content_line( doc_id: int, par_data, remove_deleted_pars: bool = True, add_title: bool = False ) -> str | None: """ Forms a JSON-compatible string with doc_id and list of paragraph data with id and md attributes. :param doc_id: Document id. :param par_data: List of paragraph dictionaries. :param remove_deleted_pars: Check paragraph existence and leave deleted ones out. :param add_title Add document title. :return: String with paragraph data grouped under a document. """ if not par_data: return None doc_info = DocEntry.find_by_id( doc_id, docentry_load_opts=docentry_eager_relevance_opt ) if not doc_info: return None par_json_list = [] doc_relevance = get_document_relevance(doc_info) for par in par_data: par_dict = json.loads(f"{{{par}}}") par_id = par_dict["id"] if remove_deleted_pars: # If par can't be found (deleted), don't add it. if not doc_info.document.has_paragraph(par_id): continue # Resolve the markdown in full (including references) for better search doc_par = doc_info.document.get_paragraph(par_id) par_md_buf = StringIO() if doc_par.is_par_reference() or doc_par.is_area_reference(): try: ref_pars = doc_par.get_referenced_pars() except InvalidReferenceException: par_md_buf.write(doc_par.md) else: for p in ref_pars: par_md_buf.write(f"{p.md}\n") else: par_md_buf.write(doc_par.md) par_md = par_md_buf.getvalue().replace("\r", " ").replace("\n", " ") # Cherry pick attributes, because others are unnecessary for the search. par_attrs = par_dict["attrs"] par_json_list.append({"id": par_id, "attrs": par_attrs, "md": par_md}) if add_title: doc_title = doc_info.title return ( json.dumps( { "doc_id": doc_id, "d_r": doc_relevance, "doc_title": doc_title, "pars": par_json_list, }, ensure_ascii=False, ) + "\n" ) else: return ( json.dumps( {"doc_id": doc_id, "d_r": doc_relevance, "pars": par_json_list}, ensure_ascii=False, ) + "\n" )
[docs]def get_doc_par_id(line: str) -> tuple[int, str, str] | None: """ Takes doc id, par id and par data from one grep search result line. :param line: Tim pars grep search result line. :return: Triple containing ids and par data. """ if line and len(line) > 10: temp = line[2:].split("/", 2) doc_id = int(temp[0]) par_id = temp[1] par_data = temp[2].replace("current:", "", 1) par_data = par_data[1:-2] return doc_id, par_id, par_data else: return None
[docs]def create_search_files(remove_deleted_pars=True): """ Groups all TIM-paragraphs under documents and combines them into a single file. Creates also a similar file for title searches and a raw file without grouping. :param remove_deleted_pars: Check paragraph existence before adding. :return: Status code and a message confirming success of file creation. """ temp_content_file_name = ( SEARCH_CACHE_FOLDER / f"temp_{PROCESSED_CONTENT_FILE_PATH.name}" ) temp_title_file_name = ( SEARCH_CACHE_FOLDER / f"temp_{PROCESSED_TITLE_FILE_PATH.name}" ) index_log_file_name = SEARCH_CACHE_FOLDER / "index_log.log" f: Path = RAW_CONTENT_FILE_PATH.parent f.mkdir(exist_ok=True) try: subprocess.Popen( f'grep -R "" --include="current" . > {RAW_CONTENT_FILE_PATH} 2>&1', cwd=(get_files_path() / "pars"), shell=True, ).communicate() except Exception as e: return ( 400, f"Failed to create preliminary file {RAW_CONTENT_FILE_PATH}: {get_error_message(e)}", ) try: raw_file = RAW_CONTENT_FILE_PATH.open("r", encoding="utf-8") except FileNotFoundError: return 400, f"Failed to open preliminary file {RAW_CONTENT_FILE_PATH}" try: with raw_file, temp_content_file_name.open( "w+", encoding="utf-8" ) as temp_content_file, temp_title_file_name.open( "w+", encoding="utf-8" ) as temp_title_file, index_log_file_name.open( "w+", encoding="utf-8" ) as index_log_file: current_doc, current_pars = -1, [] for line in raw_file: try: doc_id, par_id, par = get_doc_par_id(line) if not doc_id: continue if not current_doc: current_doc = doc_id # If same doc as previous line or the first, just add par data to list. if current_doc == doc_id: current_pars.append(par) # Otherwise save the previous one and empty par data. else: new_content_line = add_doc_info_content_line( current_doc, current_pars, remove_deleted_pars ) new_title_line = add_doc_info_title_line(current_doc) if new_content_line: temp_content_file.write(new_content_line) if new_title_line: temp_title_file.write(new_title_line) current_doc = doc_id current_pars.clear() current_pars.append(par) except Exception as e: err = f"SEARCH_INDEX: '{get_error_message(e)}' while writing search file line '{line}'" index_log_file.write(f"{err}\n") print(err) # Write the last line separately, because loop leaves it unsaved. if current_doc and current_pars: new_content_line = add_doc_info_content_line( current_doc, current_pars, remove_deleted_pars ) new_title_line = add_doc_info_title_line(current_doc) if new_content_line: temp_content_file.write(new_content_line) if new_title_line: temp_title_file.write(new_title_line) temp_content_file.flush() temp_title_file.flush() os.fsync(temp_content_file) os.fsync(temp_title_file) temp_content_file_name.rename(PROCESSED_CONTENT_FILE_PATH) temp_title_file_name.rename(PROCESSED_TITLE_FILE_PATH) return ( 200, f"Combined and processed paragraph files created to " f"{PROCESSED_CONTENT_FILE_PATH} and {PROCESSED_TITLE_FILE_PATH}", ) except Exception as e: return ( 400, f"Creating files to {PROCESSED_CONTENT_FILE_PATH} and {PROCESSED_TITLE_FILE_PATH} failed: {get_error_message(e)}!", )
[docs]@search_routes.get("createContentFile") def create_search_files_route(): """ Route for grouping all TIM-paragraphs under documents and combining them into a single file. Creates also a similar file for title searches and a raw file without grouping. Note: may take several minutes, so timeout settings need to be lenient. :return: A message confirming success of file creation. """ verify_admin() # 'removeDeletedPars' checks paragraph existence before adding at the cost of taking more time. status, msg = create_search_files( get_option(request, "removeDeletedPars", default=True, cast=bool) ) return json_response(status_code=status, jsondata=msg)
[docs]def compile_regex( query: str, regex: bool, case_sensitive: bool, search_whole_words: bool ): """ Set flags and compile regular expression. Abort if invalid or empty regex. :param query: Search word. :param regex: Regex search. :param case_sensitive: Distinguish between upper and lower case in search. :param search_whole_words: Search words separated by spaces, commas etc. :return: Compiled regex. """ if case_sensitive: flags = re.DOTALL else: flags = re.DOTALL | re.IGNORECASE if regex: term = query else: term = re.escape(query) if search_whole_words: term = rf"\b{term}\b" try: term_regex = re.compile(term, flags) except sre_constants.error as e: raise RouteException(f"Invalid regex: {str(e)}") else: if not term_regex: raise RouteException(f"Regex compiling failed") return term_regex
[docs]def is_excluded(relevance: int, relevance_threshold: int) -> bool: """ Exclude if relevance is less than relevance threshold. :param relevance: Document relevance value. :param relevance_threshold: Min included relevance. :return: True if document relevance is less than relevance threshold. """ if relevance < relevance_threshold: return True return False
[docs]def is_timeouted(start_time: float, timeout: float) -> bool: """ Compares elapsed time and timeout limit. :param start_time: The time comparison starts from. :param timeout: Maximum allowed elapsed time. :return: True if timeout has been passed, false if not. """ elapsed_time = time.time() - start_time return elapsed_time > timeout