models.py (5616B)
1 import re 2 from datetime import datetime 3 4 from . import constants 5 from gus.lib.db_model import init_db, Page 6 from gus.lib.gemini import GeminiResource 7 from gus.lib.index_statistics import ( 8 compute_index_statistics, 9 load_all_statistics_from_file, 10 ) 11 from gus.lib.misc import bytes2human 12 import gus.lib.search as search 13 14 TEXT_CONTENT_TYPE = ["text/plain", "text/gemini", "text/markdown"] 15 16 17 class GUS: 18 def __init__(self): 19 self.index = search.Index(constants.INDEX_DIR) 20 self.db = init_db(f"{constants.INDEX_DIR}/{constants.DB_FILENAME}") 21 self.statistics = compute_index_statistics(self.db) 22 self.statistics_historical_overall = load_all_statistics_from_file( 23 constants.STATISTICS_FILE) 24 hosts_query = Page.raw( 25 """ 26 SELECT DISTINCT p.domain 27 FROM page AS p 28 WHERE last_success_status = 20 29 ORDER BY p.domain 30 """ 31 ) 32 self.hosts = hosts_query.execute() 33 34 newest_hosts_query = Page.raw( 35 """ 36 SELECT p.domain, p.first_seen_at 37 FROM page AS p 38 WHERE last_success_status = 20 39 AND first_seen_at IS NOT NULL 40 GROUP BY p.domain 41 ORDER BY first_seen_at DESC 42 LIMIT 50 43 """ 44 ) 45 self.newest_hosts = newest_hosts_query.execute() 46 47 newest_pages_query = Page.raw( 48 """SELECT p.url, p.first_seen_at FROM page as p 49 WHERE last_success_status = 20 50 AND first_seen_at IS NOT NULL 51 ORDER BY first_seen_at DESC 52 LIMIT 50""") 53 self.newest_pages = newest_pages_query.execute() 54 55 feeds_query = Page.raw( 56 """SELECT DISTINCT p.url 57 FROM page AS p 58 WHERE last_success_status = 20 59 AND (p.url LIKE '%atom.xml' 60 OR p.url LIKE '%feed.xml' 61 OR p.url LIKE '%rss.xml' 62 OR p.url LIKE '%.rss' 63 OR p.url LIKE '%.atom' 64 OR p.url LIKE '%twtxt.txt' 65 OR p.content_type IN ('application/atom+xml', 'application/rss+xml')) 66 ORDER BY p.url 67 """) 68 self.feeds = feeds_query.execute() 69 70 71 def search_index(self, query, requested_page): 72 query = self.index.parse_query(query) 73 results = self.index.search(query, requested_page, pagelen=10) 74 return ( 75 len(results), 76 [ 77 { 78 "score": result.score, 79 "indexed_at": result["indexed_at"], 80 "url": result["url"], 81 "content_type": result["content_type"], 82 "charset": result["charset"] if "charset" in result else "none", 83 "size": result["size"] if "size" in result else 0, 84 "prompt": result["prompt"] if "prompt" in result else "", 85 "highlights": self.index.highlight(result) if result["content_type"] in TEXT_CONTENT_TYPE else "", 86 "link_text": GUS._get_link_text(result), 87 "backlink_count": result["backlink_count"], 88 } 89 for result in results 90 ], 91 ) 92 93 def get_backlinks(self, url): 94 resource = GeminiResource(url) 95 if not resource.is_valid: 96 return [], [] 97 98 u = resource.fetchable_url.rstrip("/") 99 backlinks_query = Page.raw( 100 """SELECT p_from.url, l.is_cross_host_like 101 FROM page AS p_from 102 JOIN link as l ON l.from_page_id == p_from.id 103 JOIN page as p_to ON p_to.id == l.to_page_id 104 WHERE p_to.url IN (?, ?) 105 AND p_from.url != ? 106 GROUP BY p_from.url 107 ORDER BY l.is_cross_host_like, p_from.url ASC""", 108 u, 109 f"{u}/", 110 resource.fetchable_url, 111 ) 112 backlinks = backlinks_query.execute() 113 114 internal_backlink_urls = [b.url for b in backlinks if not b.is_cross_host_like] 115 external_backlink_urls = [b.url for b in backlinks if b.is_cross_host_like] 116 return internal_backlink_urls, external_backlink_urls 117 118 def _get_link_text(result): 119 if result["content_type"] == "input": 120 prompt_suffix = ": {}".format(result["prompt"]) 121 link_text = "{} ({}{})".format( 122 result["url"][9:], result["content_type"], prompt_suffix 123 ) 124 else: 125 link_text = "{} ({}, {})".format( 126 result["url"][9:], 127 result["content_type"], 128 bytes2human(result["size"], format="%(value).0f%(symbol)s"), 129 ) 130 return link_text 131 132 def get_search_suggestions(self, query): 133 return self.index.suggestions(query) 134 135 136 def compute_requested_results_page(request_path): 137 page = 1 138 p = re.compile("^(/v)?/search(/\d+)?/?") 139 m = p.match(request_path) 140 if m.group(2) is not None: 141 page = int(m.group(2)[1:]) 142 return max(page, 1) 143 144 145 def compute_verbose(request_path): 146 verbose = False 147 p = re.compile("^(/v)?/search(/\d+)?/?") 148 m = p.match(request_path) 149 if m.group(1) is not None: 150 verbose = True 151 return verbose 152 153 154 def process_seed_request(seed_request): 155 with open(constants.SEED_REQUEST_FILE, "a") as seed_file: 156 if seed_request.startswith("Gemini://"): 157 seed_request = seed_request.replace('G', 'g', 1) 158 if not seed_request.startswith("gemini://"): 159 seed_request = "gemini://{}".format(seed_request) 160 seed_file.write("{}\n".format(seed_request))