crawl.py (21436B)
1 import argparse 2 import random 3 import logging 4 import re 5 from concurrent.futures import ThreadPoolExecutor 6 7 from datetime import datetime, timedelta 8 import os 9 import pathlib 10 import time 11 from urllib.parse import urljoin, uses_relative, uses_netloc 12 13 import peewee 14 15 from gus.excludes import EXCLUDED_URL_PREFIXES, EXCLUDED_URL_PATHS 16 from . import constants 17 from gus.lib.db_model import init_db, Page, PageContent, Link 18 from gus.lib.gemini import GeminiResource, GeminiRobotFileParser 19 import gus.lib.logging 20 from gus.lib.logging import strip_control_chars 21 22 # hack: the built-in methods in urllib need to know the 23 # Gemini protocol exists 24 uses_relative.append("gemini") 25 uses_netloc.append("gemini") 26 27 CRAWL_DELAYS = { 28 "alexschroeder.ch": 5000, 29 "communitywiki.org": 5000, 30 } 31 32 EXCLUDED_URL_PATTERN = re.compile( 33 r"^gemini://(\d{6}\.ch|almp\d{4}\.app|.*/_(revert|history)/).*", 34 flags=re.IGNORECASE 35 ) 36 37 def index_binary(resource, response): 38 logging.debug( 39 "Indexing binary for: %s", 40 strip_control_chars(resource.fetchable_url), 41 ) 42 43 doc = { 44 "url": resource.fetchable_url, 45 "domain": resource.normalized_host, 46 "port": resource.urlsplit.port or 1965, 47 "content_type": response.content_type, 48 "charset": response.charset, 49 "size": response.num_bytes, 50 "change_frequency": resource.get_default_change_frequency("binary"), 51 "last_crawl_at": datetime.utcnow(), 52 "last_crawl_success_at": datetime.utcnow(), 53 "last_status" : response.status, 54 "last_success_status": response.status, 55 "last_status_message" : response.error_message, 56 "first_seen_at" : datetime.utcnow() 57 } 58 existing_page = Page.get_or_none(url=resource.fetchable_url) 59 if existing_page: 60 doc["id"] = existing_page.id 61 if not (existing_page.first_seen_at is None): 62 doc["first_seen_at"] = existing_page.first_seen_at 63 existing_change_frequency = ( 64 existing_page.change_frequency 65 or resource.get_default_change_frequency("binary") 66 ) 67 doc["change_frequency"] = resource.increment_change_frequency( 68 existing_change_frequency, "binary" 69 ) 70 71 page = Page(**doc) 72 try: 73 page.save() 74 except: 75 logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) 76 77 return page 78 79 80 def index_redirect(resource, response): 81 logging.debug( 82 "Indexing redirect for: %s", 83 strip_control_chars(resource.fetchable_url), 84 ) 85 86 doc = { 87 "url": resource.fetchable_url, 88 "domain": resource.normalized_host, 89 "port": resource.urlsplit.port or 1965, 90 "change_frequency": resource.get_default_change_frequency("redirect"), 91 "last_crawl_at": datetime.utcnow(), 92 "last_crawl_success_at": datetime.utcnow(), 93 "last_status" : response.status, 94 "last_success_status" : response.status, 95 "last_status_message" : response.error_message, 96 "first_seen_at" : datetime.utcnow() 97 } 98 existing_page = Page.get_or_none(url=resource.fetchable_url) 99 if existing_page: 100 doc["id"] = existing_page.id 101 if not (existing_page.first_seen_at is None): 102 doc["first_seen_at"] = existing_page.first_seen_at 103 existing_change_frequency = ( 104 existing_page.change_frequency 105 or resource.get_default_change_frequency("redirect") 106 ) 107 doc["change_frequency"] = resource.increment_change_frequency( 108 existing_change_frequency, "redirect" 109 ) 110 111 page = Page(**doc) 112 try: 113 page.save() 114 except: 115 logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) 116 117 return page 118 119 120 def index_error(resource, is_temporary, response): 121 category = "temp_error" if is_temporary else "perm_error" 122 default_change_frequency = resource.get_default_change_frequency(category) 123 doc = { 124 "url": resource.fetchable_url, 125 "domain": resource.normalized_host, 126 "port": resource.urlsplit.port or 1965, 127 "change_frequency": default_change_frequency, 128 "last_crawl_at": datetime.utcnow(), 129 "last_status" : None if response is None else response.status, 130 "last_status_message" : None if response is None else response.error_message 131 } 132 existing_page = Page.get_or_none(url=resource.fetchable_url) 133 if existing_page: 134 doc["id"] = existing_page.id 135 existing_change_frequency = ( 136 existing_page.change_frequency or default_change_frequency 137 ) 138 doc["change_frequency"] = resource.increment_change_frequency( 139 existing_change_frequency, category 140 ) 141 page = Page(**doc) 142 try: 143 page.save() 144 except: 145 logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) 146 147 return page 148 149 150 def index_prompt(resource, response): 151 logging.debug( 152 "Indexing prompt for: %s", 153 strip_control_chars(resource.fetchable_url), 154 ) 155 156 doc = { 157 "url": resource.fetchable_url, 158 "domain": resource.normalized_host, 159 "port": resource.urlsplit.port or 1965, 160 "content_type": "input", 161 "charset": response.charset, 162 "size": response.num_bytes, 163 "change_frequency": resource.get_default_change_frequency("prompt"), 164 "last_crawl_at": datetime.utcnow(), 165 "last_crawl_success_at": datetime.utcnow(), 166 "last_status" : response.status, 167 "last_success_status" : response.status, 168 "last_status_message" : response.error_message, 169 "first_seen_at" : datetime.utcnow() 170 } 171 existing_page = Page.get_or_none(url=resource.fetchable_url) 172 if existing_page: 173 doc["id"] = existing_page.id 174 if not (existing_page.first_seen_at is None): 175 doc["first_seen_at"] = existing_page.first_seen_at 176 existing_change_frequency = ( 177 existing_page.change_frequency 178 or resource.get_default_change_frequency("prompt") 179 ) 180 doc["change_frequency"] = resource.increment_change_frequency( 181 existing_change_frequency, "prompt" 182 ) 183 184 page = Page(**doc) 185 try: 186 page.save() 187 content = { 188 "page_id": page.id, 189 "prompt": response.prompt, 190 "content": None 191 } 192 existing_pagecontent = PageContent.get_or_none(page_id=page.id) 193 if existing_pagecontent: 194 content["id"] = existing_pagecontent.id 195 196 pagecontent = PageContent(**content) 197 pagecontent.save() 198 except: 199 logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) 200 201 return page 202 203 204 def index_content(resource, response): 205 logging.debug( 206 "Storing content for: %s", 207 strip_control_chars(resource.fetchable_url), 208 ) 209 210 doc = { 211 "url": resource.fetchable_url, 212 "domain": resource.normalized_host, 213 "port": resource.urlsplit.port or 1965, 214 "content_type": response.content_type, 215 "charset": response.charset, 216 "size": response.num_bytes, 217 "change_frequency": resource.get_default_change_frequency("content"), 218 "last_crawl_at": datetime.utcnow(), 219 "last_crawl_success_at": datetime.utcnow(), 220 "last_status" : response.status, 221 "last_success_status" : response.status, 222 "last_status_message" : response.error_message, 223 "first_seen_at" : datetime.utcnow() 224 } 225 if response.content_type == "text/gemini": 226 doc["lang"] = (response.lang or "none",) 227 existing_page = Page.get_or_none(url=resource.fetchable_url) 228 is_different = False 229 if existing_page: 230 doc["id"] = existing_page.id 231 if not (existing_page.first_seen_at is None): 232 doc["first_seen_at"] = existing_page.first_seen_at 233 234 existing_pagecontent = PageContent.get_or_none(page_id=existing_page.id) 235 is_different = existing_pagecontent is None or response.content != existing_pagecontent.content 236 if is_different: 237 doc["change_frequency"] = resource.get_default_change_frequency("content") 238 else: 239 existing_change_frequency = ( 240 existing_page.change_frequency 241 or resource.get_default_change_frequency("content") 242 ) 243 doc["change_frequency"] = resource.increment_change_frequency( 244 existing_change_frequency, "content" 245 ) 246 247 page = Page(**doc) 248 try: 249 page.save() 250 if response.num_bytes <= constants.MAXIMUM_TEXT_PAGE_SIZE: 251 content = { 252 "page_id": page.id, 253 "prompt": None, 254 "content": response.content 255 } 256 257 existing_pagecontent = PageContent.get_or_none(page_id=page.id) 258 if existing_pagecontent: 259 content["id"] = existing_pagecontent.id 260 261 pagecontent = PageContent(**content) 262 pagecontent.save() 263 except Exception as e: 264 logging.error("Error adding page %s: %s", strip_control_chars(resource.fetchable_url), e) 265 266 return page, is_different 267 268 269 def should_skip(resource): 270 should_skip = False 271 try: 272 for excluded_prefix in EXCLUDED_URL_PREFIXES: 273 if resource.fetchable_url.startswith(excluded_prefix): 274 should_skip = True 275 break 276 for excluded_path in EXCLUDED_URL_PATHS: 277 if resource.urlsplit.path.lower().endswith(excluded_path): 278 should_skip = True 279 break 280 m = EXCLUDED_URL_PATTERN.match(resource.fetchable_url) 281 if m: 282 should_skip = True 283 except: 284 logging.error("Error checking for exclude of url: %s", strip_control_chars(resource.raw_url)) 285 should_skip = True 286 287 return should_skip 288 289 290 def index_links(from_resource, contained_resources): 291 from_page, created = Page.get_or_create(url=from_resource.fetchable_url) 292 293 ## first delete all links that this page as had before 294 ## than add new links 295 try: 296 Link.delete().where(Link.from_page == from_page).execute() 297 except: 298 logging.error("Error deleting a link: %s", Link.from_page) 299 data = [] 300 for cr in contained_resources: 301 if should_skip(cr): 302 continue 303 to_page = Page.get_or_none(url=cr.fetchable_url) 304 if not to_page: 305 to_page = Page.create( 306 url=cr.fetchable_url, 307 domain=cr.normalized_host, 308 port=cr.urlsplit.port or 1965, 309 first_seen_at=datetime.utcnow() 310 ) 311 data.append( 312 { 313 "from_page": from_page, 314 "to_page": to_page, 315 "is_cross_host_like": Link.get_is_cross_host_like(from_resource, cr), 316 } 317 ) 318 try: 319 Link.insert_many(data).execute() 320 except Exception as e: 321 logging.error("Error insert links: %s",e) 322 323 324 def fetch_robots_file(robot_host): 325 robot_url = urljoin("gemini://{}/".format(robot_host), "robots.txt") 326 logging.info( 327 "Fetching robots file: %s", strip_control_chars(robot_url) 328 ) 329 rp = GeminiRobotFileParser(robot_url) 330 rp.read() 331 return rp 332 333 334 def get_robots_file(robot_host): 335 logging.debug("Looking for robots file for host: %s", robot_host) 336 if robot_host not in robot_file_map: 337 robot_file_map[robot_host] = fetch_robots_file(robot_host) 338 return robot_file_map[robot_host] 339 340 341 def crawl_page( 342 gemini_resource, current_depth, redirect_chain=[] 343 ): 344 gr = gemini_resource 345 url = gr.fetchable_url 346 if not gemini_resource.is_valid: 347 logging.warn( 348 "Not a valid gemini resource, skipping: %s", 349 strip_control_chars(gemini_resource.url), 350 ) 351 return 352 if max_crawl_depth >= 0 and current_depth > max_crawl_depth: 353 logging.warn( 354 "Going too deep, skipping: %s", strip_control_chars(url) 355 ) 356 return 357 if should_skip(gr): 358 logging.debug( 359 "URL is excluded, skipping: %s", 360 strip_control_chars(url), 361 ) 362 return 363 if gr.normalized_host in failure_count and failure_count[gr.normalized_host] > constants.MAXIMUM_FAILED_REQUEST_COUNT: 364 logging.debug( 365 "Too many failed requests for host, skipping: %s", strip_control_chars(url) 366 ) 367 return 368 369 existing_page = Page.get_or_none(url=gr.fetchable_url) 370 if existing_page and existing_page.change_frequency is not None: 371 most_recent_crawl = existing_page.last_crawl_at 372 if most_recent_crawl and datetime.utcnow() < most_recent_crawl + timedelta( 373 hours=existing_page.change_frequency): 374 logging.debug( 375 "Too soon to recrawl, skipping: %s", 376 strip_control_chars(gr.fetchable_url), 377 ) 378 return 379 380 # ROBOTS 381 # use the normalized_host_like to fetch user-specific robots.txt of pubnixes 382 robots_file = get_robots_file(gr.normalized_host_like) 383 crawl_delay = None 384 if robots_file is not None: 385 logging.debug("Found robots.txt for URI: %s", gr.fetchable_url) 386 # only fetch if allowed for a matching user-agent: 387 # in priority order "gus" > "indexer" > "*" 388 can_fetch = robots_file.can_fetch_prioritized(["gus", "indexer", "*"], gr.fetchable_url) 389 390 # same approach as above - last value wins 391 crawl_delay = robots_file.crawl_delay("indexer") 392 393 if not can_fetch: 394 logging.debug( 395 "Blocked by robots.txt, skipping: %s", 396 strip_control_chars(url), 397 ) 398 return 399 400 # crawl delay 401 if gr.normalized_host in domain_hit_timings: 402 if gr.normalized_host in CRAWL_DELAYS: 403 next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta( 404 milliseconds=CRAWL_DELAYS[gr.normalized_host] 405 ) 406 elif not crawl_delay: 407 next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta( 408 milliseconds=300 409 ) 410 else: 411 next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta( 412 milliseconds=crawl_delay 413 ) 414 sleep_duration = max((next_allowed_hit - datetime.utcnow()).total_seconds(), 0) 415 time.sleep(sleep_duration) 416 domain_hit_timings[gr.normalized_host] = datetime.utcnow() 417 418 # Actually fetch! 419 logging.info("Fetching resource: %s", strip_control_chars(url)) 420 if gr.fully_qualified_parent_url is not None: 421 logging.debug( 422 "with parent: %s", 423 strip_control_chars(gr.fully_qualified_parent_url), 424 ) 425 response = gr.fetch() 426 427 if response is None: 428 # problem before getting a response 429 logging.warn("Failed to fetch: %s", strip_control_chars(url)) 430 page = index_error(gr, True, None) 431 432 failure_count[gr.normalized_host] = failure_count[gr.normalized_host] + 1 if gr.normalized_host in failure_count else 1 433 logging.warn("Failed request count for host %s is %d", gr.normalized_host, failure_count[gr.normalized_host]) 434 return 435 436 failure_count[gr.normalized_host] = 0 437 if response.status.startswith("4"): 438 # temporary error status 439 logging.debug( 440 "Got temporary error: %s: %s %s", 441 strip_control_chars(url), 442 response.status, 443 response.error_message, 444 ) 445 page = index_error(gr, True, response) 446 447 elif response.status.startswith("5"): 448 # permanent error status 449 logging.debug( 450 "Got permanent error: %s: %s %s", 451 strip_control_chars(url), 452 response.status, 453 response.error_message, 454 ) 455 page = index_error(gr, False, response) 456 457 elif response.status.startswith("3"): 458 # redirect status 459 logging.debug( 460 "Got redirected: %s: %s %s", 461 strip_control_chars(url), 462 response.status, 463 response.url, 464 ) 465 if len(redirect_chain) > constants.MAXIMUM_REDIRECT_CHAIN_LENGTH: 466 logging.info( 467 "Aborting, maximum redirect chain length reached: %s", 468 strip_control_chars(url), 469 ) 470 return 471 redirect_resource = GeminiResource( 472 response.url, gr.fetchable_url, gr.normalized_host 473 ) 474 if redirect_resource.fetchable_url == gr.fetchable_url: 475 logging.info( 476 "Aborting, redirecting to self: %s", 477 strip_control_chars(url), 478 ) 479 return 480 page = index_redirect(gr, response) 481 index_links(gr, [redirect_resource]) 482 try: 483 crawl_page(redirect_resource, current_depth, 484 redirect_chain=redirect_chain + [gr.fetchable_url],) 485 except Exception as e: 486 logging.error("Failed to crawl outdated resource %s with error: %s", redirect_resource.fetchable_url, e) 487 488 elif response.status.startswith("1"): 489 # input status 490 logging.debug( 491 "Input requested at: %s: %s %s", 492 strip_control_chars(url), 493 response.status, 494 response.prompt, 495 ) 496 page = index_prompt(gr, response) 497 elif response.status.startswith("2"): 498 # success status 499 logging.debug( 500 "Successful request: %s: %s %s", 501 strip_control_chars(url), 502 response.status, 503 response.content_type, 504 ) 505 if response.content_type.startswith("text/"): 506 page, is_different = index_content(gr, response) 507 if response.content_type != "text/gemini": 508 logging.debug( 509 "Content is not gemini text: %s: %s", 510 strip_control_chars(url), 511 response.content_type, 512 ) 513 else: 514 logging.debug( 515 "Got gemini text, extracting and crawling links: %s", 516 strip_control_chars(url), 517 ) 518 contained_resources = gr.extract_contained_resources(response.content) 519 index_links(gr, contained_resources) 520 for resource in contained_resources: 521 try: 522 crawl_page(resource, current_depth + 1) 523 except Exception as e: 524 logging.error("Failed to crawl outdated resource %s with error: %s", resource.fetchable_url, e) 525 else: 526 page = index_binary(gr, response) 527 else: 528 logging.warn( 529 "Got unhandled status: %s: %s", 530 strip_control_chars(url), 531 response.status, 532 ) 533 534 535 def load_expired_urls(): 536 expired_pages = Page.raw( 537 """SELECT p.url 538 FROM page as p 539 WHERE datetime(last_crawl_at, REPLACE('fnord hours', 'fnord', change_frequency)) < datetime('now') OR last_crawl_at IS NULL""" ) 540 return [page.url for page in expired_pages.execute()] 541 542 def load_seed_request_urls(): 543 with open("seed-requests.txt") as f: 544 content = f.readlines() 545 # remove whitespace characters like `\n` at the end of each line 546 content = [x.strip() for x in content] 547 return content 548 549 550 def crawl_resource(resource): 551 crawl_page(resource, 0) 552 553 554 def run_crawl(should_run_destructive=False, seed_urls=[]): 555 global index_dir 556 index_dir = constants.INDEX_DIR_NEW if should_run_destructive else constants.INDEX_DIR 557 pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True) 558 global db 559 db = init_db(f"{index_dir}/{constants.DB_FILENAME}") 560 global robot_file_map 561 robot_file_map = {} 562 global domain_hit_timings 563 domain_hit_timings = {} 564 global max_crawl_depth 565 max_crawl_depth = 700 566 567 global failure_count 568 failure_count = {} 569 570 expired_resources = [GeminiResource(url) for url in load_expired_urls()] 571 random.shuffle(expired_resources) 572 with ThreadPoolExecutor(max_workers=3) as executor: 573 executor.map(crawl_resource, expired_resources) 574 executor.shutdown(wait=True, cancel_futures=False) 575 576 submitted_resources = [GeminiResource(url) for url in load_seed_request_urls()] 577 random.shuffle(submitted_resources) 578 with ThreadPoolExecutor(max_workers=3) as executor: 579 executor.map(crawl_resource, submitted_resources) 580 executor.shutdown(wait=True, cancel_futures=False) 581 582 logging.info("Finished!") 583 584 585 def main(): 586 args = parse_args() 587 gus.lib.logging.handle_arguments(args) 588 589 run_crawl(args.should_run_destructive, seed_urls=args.seed_urls) 590 591 592 def parse_args(): 593 parser = argparse.ArgumentParser(description="Crawl Geminispace.") 594 parser.add_argument( 595 "--destructive", 596 "-d", 597 dest="should_run_destructive", 598 action="store_true", 599 default=False, 600 help="create a fresh index and perform a full Geminispace crawl", 601 ) 602 parser.add_argument( 603 "--seeds", 604 "-s", 605 metavar="URL", 606 dest="seed_urls", 607 nargs="+", 608 default=[], 609 help="one or more URLs with which to extend the seeds of the crawl", 610 ) 611 gus.lib.logging.add_arguments(parser) 612 args = parser.parse_args() 613 return args 614 615 if __name__ == "__main__": 616 main()