geminispace.info

gemini search engine
git clone https://git.clttr.info/geminispace.info.git
Log (Feed) | Files | Refs (Tags) | README | LICENSE

models.py (5616B)


      1 import re
      2 from datetime import datetime
      3 
      4 from . import constants
      5 from gus.lib.db_model import init_db, Page
      6 from gus.lib.gemini import GeminiResource
      7 from gus.lib.index_statistics import (
      8     compute_index_statistics,
      9     load_all_statistics_from_file,
     10 )
     11 from gus.lib.misc import bytes2human
     12 import gus.lib.search as search
     13 
     14 TEXT_CONTENT_TYPE = ["text/plain", "text/gemini", "text/markdown"]
     15 
     16 
     17 class GUS:
     18     def __init__(self):
     19         self.index = search.Index(constants.INDEX_DIR)
     20         self.db = init_db(f"{constants.INDEX_DIR}/{constants.DB_FILENAME}")
     21         self.statistics = compute_index_statistics(self.db)
     22         self.statistics_historical_overall = load_all_statistics_from_file(
     23             constants.STATISTICS_FILE)
     24         hosts_query = Page.raw(
     25             """
     26             SELECT DISTINCT p.domain
     27             FROM page AS p
     28             WHERE last_success_status = 20
     29             ORDER BY p.domain
     30             """
     31         )
     32         self.hosts = hosts_query.execute()
     33         
     34         newest_hosts_query = Page.raw(
     35             """
     36             SELECT p.domain, p.first_seen_at
     37             FROM page AS p
     38             WHERE last_success_status = 20
     39             AND first_seen_at IS NOT NULL
     40             GROUP BY p.domain
     41             ORDER BY first_seen_at DESC
     42             LIMIT 50
     43             """
     44         )
     45         self.newest_hosts = newest_hosts_query.execute()
     46 
     47         newest_pages_query = Page.raw(
     48             """SELECT p.url, p.first_seen_at FROM page as p
     49             WHERE last_success_status = 20
     50             AND first_seen_at IS NOT NULL
     51             ORDER BY first_seen_at DESC
     52             LIMIT 50""")
     53         self.newest_pages = newest_pages_query.execute()
     54         
     55         feeds_query = Page.raw(
     56             """SELECT DISTINCT p.url
     57             FROM page AS p
     58             WHERE last_success_status = 20
     59             AND (p.url LIKE '%atom.xml'
     60             OR p.url LIKE '%feed.xml'
     61             OR p.url LIKE '%rss.xml'
     62             OR p.url LIKE '%.rss'
     63             OR p.url LIKE '%.atom'
     64             OR p.url LIKE '%twtxt.txt'
     65             OR p.content_type IN ('application/atom+xml', 'application/rss+xml'))
     66             ORDER BY p.url
     67             """)
     68         self.feeds = feeds_query.execute()
     69     
     70 
     71     def search_index(self, query, requested_page):
     72         query = self.index.parse_query(query)
     73         results = self.index.search(query, requested_page, pagelen=10)
     74         return (
     75             len(results),
     76             [
     77                 {
     78                     "score": result.score,
     79                     "indexed_at": result["indexed_at"],
     80                     "url": result["url"],
     81                     "content_type": result["content_type"],
     82                     "charset": result["charset"] if "charset" in result else "none",
     83                     "size": result["size"] if "size" in result else 0,
     84                     "prompt": result["prompt"] if "prompt" in result else "",
     85                     "highlights": self.index.highlight(result) if result["content_type"] in TEXT_CONTENT_TYPE else "",
     86                     "link_text": GUS._get_link_text(result),
     87                     "backlink_count": result["backlink_count"],
     88                 }
     89                 for result in results
     90             ],
     91         )
     92 
     93     def get_backlinks(self, url):
     94         resource = GeminiResource(url)
     95         if not resource.is_valid:
     96             return [], []
     97 
     98         u = resource.fetchable_url.rstrip("/")
     99         backlinks_query = Page.raw(
    100             """SELECT p_from.url, l.is_cross_host_like
    101             FROM page AS p_from
    102             JOIN link as l ON l.from_page_id == p_from.id
    103             JOIN page as p_to ON p_to.id == l.to_page_id
    104             WHERE p_to.url IN (?, ?)
    105             AND p_from.url != ?
    106             GROUP BY p_from.url
    107             ORDER BY l.is_cross_host_like, p_from.url ASC""",
    108             u,
    109             f"{u}/",
    110             resource.fetchable_url,
    111         )
    112         backlinks = backlinks_query.execute()
    113 
    114         internal_backlink_urls = [b.url for b in backlinks if not b.is_cross_host_like]
    115         external_backlink_urls = [b.url for b in backlinks if b.is_cross_host_like]
    116         return internal_backlink_urls, external_backlink_urls
    117 
    118     def _get_link_text(result):
    119         if result["content_type"] == "input":
    120             prompt_suffix = ": {}".format(result["prompt"])
    121             link_text = "{} ({}{})".format(
    122                 result["url"][9:], result["content_type"], prompt_suffix
    123             )
    124         else:
    125             link_text = "{} ({}, {})".format(
    126                 result["url"][9:],
    127                 result["content_type"],
    128                 bytes2human(result["size"], format="%(value).0f%(symbol)s"),
    129             )
    130         return link_text
    131 
    132     def get_search_suggestions(self, query):
    133         return self.index.suggestions(query)
    134 
    135 
    136 def compute_requested_results_page(request_path):
    137     page = 1
    138     p = re.compile("^(/v)?/search(/\d+)?/?")
    139     m = p.match(request_path)
    140     if m.group(2) is not None:
    141         page = int(m.group(2)[1:])
    142     return max(page, 1)
    143 
    144 
    145 def compute_verbose(request_path):
    146     verbose = False
    147     p = re.compile("^(/v)?/search(/\d+)?/?")
    148     m = p.match(request_path)
    149     if m.group(1) is not None:
    150         verbose = True
    151     return verbose
    152 
    153 
    154 def process_seed_request(seed_request):
    155     with open(constants.SEED_REQUEST_FILE, "a") as seed_file:
    156         if seed_request.startswith("Gemini://"):
    157             seed_request = seed_request.replace('G', 'g', 1)
    158         if not seed_request.startswith("gemini://"):
    159             seed_request = "gemini://{}".format(seed_request)
    160         seed_file.write("{}\n".format(seed_request))