geminispace.info

gemini search engine
git clone https://git.clttr.info/geminispace.info.git
Log (Feed) | Files | Refs (Tags) | README | LICENSE

crawl.py (21436B)


      1 import argparse
      2 import random
      3 import logging
      4 import re
      5 from concurrent.futures import ThreadPoolExecutor
      6 
      7 from datetime import datetime, timedelta
      8 import os
      9 import pathlib
     10 import time
     11 from urllib.parse import urljoin, uses_relative, uses_netloc
     12 
     13 import peewee
     14 
     15 from gus.excludes import EXCLUDED_URL_PREFIXES, EXCLUDED_URL_PATHS
     16 from . import constants
     17 from gus.lib.db_model import init_db, Page, PageContent, Link
     18 from gus.lib.gemini import GeminiResource, GeminiRobotFileParser
     19 import gus.lib.logging
     20 from gus.lib.logging import strip_control_chars
     21 
     22 # hack: the built-in methods in urllib need to know the
     23 # Gemini protocol exists
     24 uses_relative.append("gemini")
     25 uses_netloc.append("gemini")
     26 
     27 CRAWL_DELAYS = {
     28     "alexschroeder.ch": 5000,
     29     "communitywiki.org": 5000,
     30 }
     31 
     32 EXCLUDED_URL_PATTERN = re.compile(
     33     r"^gemini://(\d{6}\.ch|almp\d{4}\.app|.*/_(revert|history)/).*",
     34     flags=re.IGNORECASE
     35 )
     36 
     37 def index_binary(resource, response):
     38     logging.debug(
     39         "Indexing binary for: %s",
     40         strip_control_chars(resource.fetchable_url),
     41     )
     42 
     43     doc = {
     44         "url": resource.fetchable_url,
     45         "domain": resource.normalized_host,
     46         "port": resource.urlsplit.port or 1965,
     47         "content_type": response.content_type,
     48         "charset": response.charset,
     49         "size": response.num_bytes,
     50         "change_frequency": resource.get_default_change_frequency("binary"),
     51         "last_crawl_at": datetime.utcnow(),
     52         "last_crawl_success_at": datetime.utcnow(),
     53         "last_status" : response.status,
     54         "last_success_status": response.status,
     55         "last_status_message" : response.error_message,
     56         "first_seen_at" : datetime.utcnow()
     57     }
     58     existing_page = Page.get_or_none(url=resource.fetchable_url)
     59     if existing_page:
     60         doc["id"] = existing_page.id
     61         if not (existing_page.first_seen_at is None):
     62             doc["first_seen_at"] = existing_page.first_seen_at 
     63         existing_change_frequency = (
     64             existing_page.change_frequency
     65             or resource.get_default_change_frequency("binary")
     66         )
     67         doc["change_frequency"] = resource.increment_change_frequency(
     68             existing_change_frequency, "binary"
     69         )
     70 
     71     page = Page(**doc)
     72     try:
     73         page.save()
     74     except:
     75         logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url))
     76 
     77     return page
     78 
     79 
     80 def index_redirect(resource, response):
     81     logging.debug(
     82         "Indexing redirect for: %s",
     83         strip_control_chars(resource.fetchable_url),
     84     )
     85 
     86     doc = {
     87         "url": resource.fetchable_url,
     88         "domain": resource.normalized_host,
     89         "port": resource.urlsplit.port or 1965,
     90         "change_frequency": resource.get_default_change_frequency("redirect"),
     91         "last_crawl_at": datetime.utcnow(),
     92         "last_crawl_success_at": datetime.utcnow(),
     93         "last_status" : response.status,
     94         "last_success_status" : response.status,
     95         "last_status_message" : response.error_message,
     96         "first_seen_at" : datetime.utcnow()
     97     }
     98     existing_page = Page.get_or_none(url=resource.fetchable_url)
     99     if existing_page:
    100         doc["id"] = existing_page.id
    101         if not (existing_page.first_seen_at is None):
    102             doc["first_seen_at"] = existing_page.first_seen_at 
    103         existing_change_frequency = (
    104             existing_page.change_frequency
    105             or resource.get_default_change_frequency("redirect")
    106         )
    107         doc["change_frequency"] = resource.increment_change_frequency(
    108             existing_change_frequency, "redirect"
    109         )
    110 
    111     page = Page(**doc)
    112     try:
    113         page.save()
    114     except:
    115         logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url))
    116     
    117     return page
    118 
    119 
    120 def index_error(resource, is_temporary, response):
    121     category = "temp_error" if is_temporary else "perm_error"
    122     default_change_frequency = resource.get_default_change_frequency(category)
    123     doc = {
    124         "url": resource.fetchable_url,
    125         "domain": resource.normalized_host,
    126         "port": resource.urlsplit.port or 1965,
    127         "change_frequency": default_change_frequency,
    128         "last_crawl_at": datetime.utcnow(),
    129         "last_status" : None if response is None else response.status,
    130         "last_status_message" : None if response is None else response.error_message
    131     }
    132     existing_page = Page.get_or_none(url=resource.fetchable_url)
    133     if existing_page:
    134         doc["id"] = existing_page.id
    135         existing_change_frequency = (
    136             existing_page.change_frequency or default_change_frequency
    137         )
    138         doc["change_frequency"] = resource.increment_change_frequency(
    139             existing_change_frequency, category
    140         )
    141     page = Page(**doc)
    142     try:
    143         page.save()
    144     except:
    145         logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url))
    146     
    147     return page
    148 
    149 
    150 def index_prompt(resource, response):
    151     logging.debug(
    152         "Indexing prompt for: %s",
    153         strip_control_chars(resource.fetchable_url),
    154     )
    155 
    156     doc = {
    157         "url": resource.fetchable_url,
    158         "domain": resource.normalized_host,
    159         "port": resource.urlsplit.port or 1965,
    160         "content_type": "input",
    161         "charset": response.charset,
    162         "size": response.num_bytes,
    163         "change_frequency": resource.get_default_change_frequency("prompt"),
    164         "last_crawl_at": datetime.utcnow(),
    165         "last_crawl_success_at": datetime.utcnow(),
    166         "last_status" : response.status,
    167         "last_success_status" : response.status,
    168         "last_status_message" : response.error_message,
    169         "first_seen_at" : datetime.utcnow()
    170     }
    171     existing_page = Page.get_or_none(url=resource.fetchable_url)
    172     if existing_page:
    173         doc["id"] = existing_page.id
    174         if not (existing_page.first_seen_at is None):
    175             doc["first_seen_at"] = existing_page.first_seen_at 
    176         existing_change_frequency = (
    177             existing_page.change_frequency
    178             or resource.get_default_change_frequency("prompt")
    179         )
    180         doc["change_frequency"] = resource.increment_change_frequency(
    181             existing_change_frequency, "prompt"
    182         )
    183     
    184     page = Page(**doc)
    185     try:
    186         page.save()
    187         content = {
    188             "page_id": page.id,
    189             "prompt": response.prompt,
    190             "content": None
    191         }
    192         existing_pagecontent = PageContent.get_or_none(page_id=page.id)
    193         if existing_pagecontent:
    194             content["id"] = existing_pagecontent.id
    195 
    196         pagecontent = PageContent(**content)
    197         pagecontent.save()
    198     except:
    199         logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) 
    200     
    201     return page
    202 
    203 
    204 def index_content(resource, response):
    205     logging.debug(
    206         "Storing content for: %s",
    207         strip_control_chars(resource.fetchable_url),
    208     )
    209 
    210     doc = {
    211         "url": resource.fetchable_url,
    212         "domain": resource.normalized_host,
    213         "port": resource.urlsplit.port or 1965,
    214         "content_type": response.content_type,
    215         "charset": response.charset,
    216         "size": response.num_bytes,
    217         "change_frequency": resource.get_default_change_frequency("content"),
    218         "last_crawl_at": datetime.utcnow(),
    219         "last_crawl_success_at": datetime.utcnow(),
    220         "last_status" : response.status,
    221         "last_success_status" : response.status,
    222         "last_status_message" : response.error_message,
    223         "first_seen_at" : datetime.utcnow()
    224     }
    225     if response.content_type == "text/gemini":
    226         doc["lang"] = (response.lang or "none",)
    227     existing_page = Page.get_or_none(url=resource.fetchable_url)
    228     is_different = False
    229     if existing_page:
    230         doc["id"] = existing_page.id
    231         if not (existing_page.first_seen_at is None):
    232             doc["first_seen_at"] = existing_page.first_seen_at
    233 
    234         existing_pagecontent = PageContent.get_or_none(page_id=existing_page.id)
    235         is_different = existing_pagecontent is None or response.content != existing_pagecontent.content
    236         if is_different:
    237             doc["change_frequency"] = resource.get_default_change_frequency("content")
    238         else:
    239             existing_change_frequency = (
    240                 existing_page.change_frequency
    241                 or resource.get_default_change_frequency("content")
    242             )
    243             doc["change_frequency"] = resource.increment_change_frequency(
    244                 existing_change_frequency, "content"
    245             )
    246 
    247     page = Page(**doc)
    248     try:
    249         page.save()
    250         if response.num_bytes <= constants.MAXIMUM_TEXT_PAGE_SIZE:
    251             content = {
    252                 "page_id": page.id,
    253                 "prompt": None,
    254                 "content": response.content
    255             }
    256            
    257             existing_pagecontent = PageContent.get_or_none(page_id=page.id)
    258             if existing_pagecontent:
    259                 content["id"] = existing_pagecontent.id
    260             
    261             pagecontent = PageContent(**content)
    262             pagecontent.save()
    263     except Exception as e:
    264         logging.error("Error adding page %s: %s", strip_control_chars(resource.fetchable_url), e)
    265     
    266     return page, is_different
    267 
    268 
    269 def should_skip(resource):
    270     should_skip = False
    271     try:
    272         for excluded_prefix in EXCLUDED_URL_PREFIXES:
    273             if resource.fetchable_url.startswith(excluded_prefix):
    274                 should_skip = True
    275                 break
    276         for excluded_path in EXCLUDED_URL_PATHS:
    277             if resource.urlsplit.path.lower().endswith(excluded_path):
    278                 should_skip = True
    279                 break
    280         m = EXCLUDED_URL_PATTERN.match(resource.fetchable_url)
    281         if m:
    282             should_skip = True
    283     except:
    284         logging.error("Error checking for exclude of url: %s", strip_control_chars(resource.raw_url))
    285         should_skip = True
    286 
    287     return should_skip
    288 
    289 
    290 def index_links(from_resource, contained_resources):
    291     from_page, created = Page.get_or_create(url=from_resource.fetchable_url)
    292    
    293     ## first delete all links that this page as had before
    294     ## than add new links
    295     try:
    296         Link.delete().where(Link.from_page == from_page).execute()
    297     except:
    298         logging.error("Error deleting a link: %s", Link.from_page)
    299     data = []
    300     for cr in contained_resources:
    301         if should_skip(cr):
    302             continue
    303         to_page = Page.get_or_none(url=cr.fetchable_url)
    304         if not to_page:
    305             to_page = Page.create(
    306                 url=cr.fetchable_url,
    307                 domain=cr.normalized_host,
    308                 port=cr.urlsplit.port or 1965,
    309                 first_seen_at=datetime.utcnow()
    310             )
    311         data.append(
    312             {
    313                 "from_page": from_page,
    314                 "to_page": to_page,
    315                 "is_cross_host_like": Link.get_is_cross_host_like(from_resource, cr),
    316             }
    317         )
    318     try:
    319         Link.insert_many(data).execute()
    320     except Exception as e:
    321         logging.error("Error insert links: %s",e) 
    322 
    323 
    324 def fetch_robots_file(robot_host):
    325     robot_url = urljoin("gemini://{}/".format(robot_host), "robots.txt")
    326     logging.info(
    327         "Fetching robots file: %s", strip_control_chars(robot_url)
    328     )
    329     rp = GeminiRobotFileParser(robot_url)
    330     rp.read()
    331     return rp
    332 
    333 
    334 def get_robots_file(robot_host):
    335     logging.debug("Looking for robots file for host: %s", robot_host)
    336     if robot_host not in robot_file_map:
    337         robot_file_map[robot_host] = fetch_robots_file(robot_host)
    338     return robot_file_map[robot_host]
    339 
    340 
    341 def crawl_page(
    342     gemini_resource, current_depth, redirect_chain=[]
    343 ):
    344     gr = gemini_resource
    345     url = gr.fetchable_url
    346     if not gemini_resource.is_valid:
    347         logging.warn(
    348             "Not a valid gemini resource, skipping: %s",
    349             strip_control_chars(gemini_resource.url),
    350         )
    351         return
    352     if max_crawl_depth >= 0 and current_depth > max_crawl_depth:
    353         logging.warn(
    354             "Going too deep, skipping: %s", strip_control_chars(url)
    355         )
    356         return
    357     if should_skip(gr):
    358         logging.debug(
    359             "URL is excluded, skipping: %s",
    360             strip_control_chars(url),
    361         )
    362         return
    363     if gr.normalized_host in failure_count and failure_count[gr.normalized_host] > constants.MAXIMUM_FAILED_REQUEST_COUNT:
    364         logging.debug(
    365             "Too many failed requests for host, skipping: %s", strip_control_chars(url)
    366         )
    367         return
    368     
    369     existing_page = Page.get_or_none(url=gr.fetchable_url)
    370     if existing_page and existing_page.change_frequency is not None:
    371         most_recent_crawl = existing_page.last_crawl_at
    372         if most_recent_crawl and datetime.utcnow() < most_recent_crawl + timedelta(
    373             hours=existing_page.change_frequency):
    374             logging.debug(
    375                 "Too soon to recrawl, skipping: %s",
    376                 strip_control_chars(gr.fetchable_url),
    377             )
    378             return
    379 
    380     # ROBOTS
    381     # use the normalized_host_like to fetch user-specific robots.txt of pubnixes
    382     robots_file = get_robots_file(gr.normalized_host_like)
    383     crawl_delay = None
    384     if robots_file is not None:
    385         logging.debug("Found robots.txt for URI: %s", gr.fetchable_url)
    386         # only fetch if allowed for a matching user-agent:
    387         # in priority order "gus" > "indexer" > "*"
    388         can_fetch = robots_file.can_fetch_prioritized(["gus", "indexer", "*"], gr.fetchable_url)
    389 
    390         # same approach as above - last value wins
    391         crawl_delay = robots_file.crawl_delay("indexer")
    392 
    393         if not can_fetch:
    394             logging.debug(
    395                 "Blocked by robots.txt, skipping: %s",
    396                 strip_control_chars(url),
    397             )
    398             return
    399 
    400     # crawl delay
    401     if gr.normalized_host in domain_hit_timings:
    402         if gr.normalized_host in CRAWL_DELAYS:
    403             next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
    404                 milliseconds=CRAWL_DELAYS[gr.normalized_host]
    405             )
    406         elif not crawl_delay:
    407             next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
    408                 milliseconds=300
    409             )
    410         else:
    411             next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
    412                 milliseconds=crawl_delay
    413             )
    414         sleep_duration = max((next_allowed_hit - datetime.utcnow()).total_seconds(), 0)
    415         time.sleep(sleep_duration)
    416     domain_hit_timings[gr.normalized_host] = datetime.utcnow()
    417 
    418     # Actually fetch!
    419     logging.info("Fetching resource: %s", strip_control_chars(url))
    420     if gr.fully_qualified_parent_url is not None:
    421         logging.debug(
    422             "with parent: %s",
    423             strip_control_chars(gr.fully_qualified_parent_url),
    424         )
    425     response = gr.fetch()
    426 
    427     if response is None:
    428         # problem before getting a response
    429         logging.warn("Failed to fetch: %s", strip_control_chars(url))
    430         page = index_error(gr, True, None)
    431         
    432         failure_count[gr.normalized_host] = failure_count[gr.normalized_host] + 1 if gr.normalized_host in failure_count else 1
    433         logging.warn("Failed request count for host %s is %d", gr.normalized_host, failure_count[gr.normalized_host])
    434         return
    435 
    436     failure_count[gr.normalized_host] = 0
    437     if response.status.startswith("4"):
    438         # temporary error status
    439         logging.debug(
    440             "Got temporary error: %s: %s %s",
    441             strip_control_chars(url),
    442             response.status,
    443             response.error_message,
    444         )
    445         page = index_error(gr, True, response)
    446 
    447     elif response.status.startswith("5"):
    448         # permanent error status
    449         logging.debug(
    450             "Got permanent error: %s: %s %s",
    451             strip_control_chars(url),
    452             response.status,
    453             response.error_message,
    454         )
    455         page = index_error(gr, False, response)
    456 
    457     elif response.status.startswith("3"):
    458         # redirect status
    459         logging.debug(
    460             "Got redirected: %s: %s %s",
    461             strip_control_chars(url),
    462             response.status,
    463             response.url,
    464         )
    465         if len(redirect_chain) > constants.MAXIMUM_REDIRECT_CHAIN_LENGTH:
    466             logging.info(
    467                 "Aborting, maximum redirect chain length reached: %s",
    468                 strip_control_chars(url),
    469             )
    470             return
    471         redirect_resource = GeminiResource(
    472             response.url, gr.fetchable_url, gr.normalized_host
    473         )
    474         if redirect_resource.fetchable_url == gr.fetchable_url:
    475             logging.info(
    476                 "Aborting, redirecting to self: %s",
    477                 strip_control_chars(url),
    478             )
    479             return
    480         page = index_redirect(gr, response)
    481         index_links(gr, [redirect_resource])
    482         try:
    483             crawl_page(redirect_resource, current_depth,
    484                 redirect_chain=redirect_chain + [gr.fetchable_url],)
    485         except Exception as e:
    486             logging.error("Failed to crawl outdated resource %s with error: %s", redirect_resource.fetchable_url, e)
    487 
    488     elif response.status.startswith("1"):
    489         # input status
    490         logging.debug(
    491             "Input requested at: %s: %s %s",
    492             strip_control_chars(url),
    493             response.status,
    494             response.prompt,
    495         )
    496         page = index_prompt(gr, response)
    497     elif response.status.startswith("2"):
    498         # success status
    499         logging.debug(
    500             "Successful request: %s: %s %s",
    501             strip_control_chars(url),
    502             response.status,
    503             response.content_type,
    504         )
    505         if response.content_type.startswith("text/"):
    506             page, is_different = index_content(gr, response)
    507             if response.content_type != "text/gemini":
    508                 logging.debug(
    509                     "Content is not gemini text: %s: %s",
    510                     strip_control_chars(url),
    511                     response.content_type,
    512                 )
    513             else:
    514                 logging.debug(
    515                     "Got gemini text, extracting and crawling links: %s",
    516                     strip_control_chars(url),
    517                 )
    518                 contained_resources = gr.extract_contained_resources(response.content)
    519                 index_links(gr, contained_resources)
    520                 for resource in contained_resources:
    521                     try:
    522                         crawl_page(resource, current_depth + 1)
    523                     except Exception as e:
    524                         logging.error("Failed to crawl outdated resource %s with error: %s", resource.fetchable_url, e)
    525         else:
    526             page = index_binary(gr, response)
    527     else:
    528         logging.warn(
    529             "Got unhandled status: %s: %s",
    530             strip_control_chars(url),
    531             response.status,
    532         )
    533 
    534 
    535 def load_expired_urls():
    536     expired_pages = Page.raw(
    537            """SELECT p.url
    538              FROM page as p
    539              WHERE datetime(last_crawl_at, REPLACE('fnord hours', 'fnord', change_frequency)) < datetime('now') OR last_crawl_at IS NULL""" )
    540     return [page.url for page in expired_pages.execute()]
    541 
    542 def load_seed_request_urls():
    543     with open("seed-requests.txt") as f:
    544         content = f.readlines()
    545     # remove whitespace characters like `\n` at the end of each line
    546     content = [x.strip() for x in content]
    547     return content
    548 
    549 
    550 def crawl_resource(resource):
    551     crawl_page(resource, 0)
    552 
    553 
    554 def run_crawl(should_run_destructive=False, seed_urls=[]):
    555     global index_dir
    556     index_dir = constants.INDEX_DIR_NEW if should_run_destructive else constants.INDEX_DIR
    557     pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
    558     global db
    559     db = init_db(f"{index_dir}/{constants.DB_FILENAME}")
    560     global robot_file_map
    561     robot_file_map = {}
    562     global domain_hit_timings
    563     domain_hit_timings = {}
    564     global max_crawl_depth
    565     max_crawl_depth = 700
    566 
    567     global failure_count
    568     failure_count = {}
    569 
    570     expired_resources = [GeminiResource(url) for url in load_expired_urls()]
    571     random.shuffle(expired_resources)
    572     with ThreadPoolExecutor(max_workers=3) as executor:
    573         executor.map(crawl_resource, expired_resources)
    574     executor.shutdown(wait=True, cancel_futures=False)
    575     
    576     submitted_resources = [GeminiResource(url) for url in load_seed_request_urls()]
    577     random.shuffle(submitted_resources)
    578     with ThreadPoolExecutor(max_workers=3) as executor:
    579         executor.map(crawl_resource, submitted_resources)
    580     executor.shutdown(wait=True, cancel_futures=False)
    581 
    582     logging.info("Finished!")
    583 
    584 
    585 def main():
    586     args = parse_args()
    587     gus.lib.logging.handle_arguments(args)
    588 
    589     run_crawl(args.should_run_destructive, seed_urls=args.seed_urls)
    590 
    591 
    592 def parse_args():
    593     parser = argparse.ArgumentParser(description="Crawl Geminispace.")
    594     parser.add_argument(
    595         "--destructive",
    596         "-d",
    597         dest="should_run_destructive",
    598         action="store_true",
    599         default=False,
    600         help="create a fresh index and perform a full Geminispace crawl",
    601     )
    602     parser.add_argument(
    603         "--seeds",
    604         "-s",
    605         metavar="URL",
    606         dest="seed_urls",
    607         nargs="+",
    608         default=[],
    609         help="one or more URLs with which to extend the seeds of the crawl",
    610     )
    611     gus.lib.logging.add_arguments(parser)
    612     args = parser.parse_args()
    613     return args
    614 
    615 if __name__ == "__main__":
    616     main()