av98.py (63960B)
1 #!/usr/bin/env python3 2 # AV-98 Gemini client 3 # Dervied from VF-1 (https://github.com/solderpunk/VF-1), 4 # (C) 2019, 2020 Solderpunk <solderpunk@sdf.org> 5 # With contributions from: 6 # - danceka <hannu.hartikainen@gmail.com> 7 # - <jprjr@tilde.club> 8 # - <vee@vnsf.xyz> 9 # - Klaus Alexander Seistrup <klaus@seistrup.dk> 10 # - govynnus <govynnus@sdf.org> 11 # - Björn Wärmedal <bjorn.warmedal@gmail.com> 12 # - <jake@rmgr.dev> 13 14 import argparse 15 import cmd 16 import codecs 17 import collections 18 import datetime 19 from email.message import EmailMessage 20 import fnmatch 21 import getpass 22 import glob 23 import hashlib 24 import io 25 import mimetypes 26 import os 27 import os.path 28 import random 29 import shlex 30 import shutil 31 import socket 32 import sqlite3 33 import ssl 34 from ssl import CertificateError 35 import subprocess 36 import sys 37 import tempfile 38 import time 39 import urllib.parse 40 import uuid 41 import webbrowser 42 43 try: 44 import ansiwrap as textwrap 45 except ModuleNotFoundError: 46 import textwrap 47 48 try: 49 from cryptography import x509 50 from cryptography.hazmat.backends import default_backend 51 _HAS_CRYPTOGRAPHY = True 52 _BACKEND = default_backend() 53 except ModuleNotFoundError: 54 _HAS_CRYPTOGRAPHY = False 55 56 _VERSION = "1.0.4dev" 57 58 _MAX_REDIRECTS = 5 59 _MAX_CACHE_SIZE = 10 60 _MAX_CACHE_AGE_SECS = 180 61 62 # Command abbreviations 63 _ABBREVS = { 64 "a": "add", 65 "b": "back", 66 "bb": "blackbox", 67 "bm": "bookmarks", 68 "book": "bookmarks", 69 "f": "fold", 70 "fo": "forward", 71 "g": "go", 72 "h": "history", 73 "hist": "history", 74 "l": "less", 75 "n": "next", 76 "p": "previous", 77 "prev": "previous", 78 "q": "quit", 79 "r": "reload", 80 "s": "save", 81 "se": "search", 82 "/": "search", 83 "t": "tour", 84 "u": "up", 85 } 86 87 _MIME_HANDLERS = { 88 "application/pdf": "xpdf %s", 89 "audio/mpeg": "mpg123 %s", 90 "audio/ogg": "ogg123 %s", 91 "image/*": "feh %s", 92 "text/html": "lynx -dump -force_html %s", 93 "text/*": "cat %s", 94 } 95 96 # monkey-patch Gemini support in urllib.parse 97 # see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py 98 urllib.parse.uses_relative.append("gemini") 99 urllib.parse.uses_netloc.append("gemini") 100 101 102 def fix_ipv6_url(url): 103 if not url.count(":") > 2: # Best way to detect them? 104 return url 105 # If there's a pair of []s in there, it's probably fine as is. 106 if "[" in url and "]" in url: 107 return url 108 # Easiest case is a raw address, no schema, no path. 109 # Just wrap it in square brackets and whack a slash on the end 110 if "/" not in url: 111 return "[" + url + "]/" 112 # Now the trickier cases... 113 if "://" in url: 114 schema, schemaless = url.split("://") 115 else: 116 schema, schemaless = None, url 117 if "/" in schemaless: 118 netloc, rest = schemaless.split("/",1) 119 schemaless = "[" + netloc + "]" + "/" + rest 120 if schema: 121 return schema + "://" + schemaless 122 return schemaless 123 124 standard_ports = { 125 "gemini": 1965, 126 "gopher": 70, 127 } 128 129 class GeminiItem(): 130 131 def __init__(self, url, name=""): 132 if "://" not in url and ("./" not in url and url[0] != "/"): 133 url = "gemini://" + url 134 self.url = fix_ipv6_url(url) 135 self.name = name 136 self.local = False 137 parsed = urllib.parse.urlparse(self.url) 138 self.scheme = parsed.scheme 139 self.host = parsed.hostname 140 self.port = parsed.port or standard_ports.get(self.scheme, 0) 141 self.path = parsed.path 142 if self.host == None: 143 h = self.url.split('/') 144 self.host = h[0:len(h)-1] 145 self.local = True 146 self.scheme = 'local' 147 def root(self): 148 return GeminiItem(self._derive_url("/")) 149 150 def up(self): 151 pathbits = list(os.path.split(self.path.rstrip('/'))) 152 # Don't try to go higher than root 153 if len(pathbits) == 1: 154 return self 155 # Get rid of bottom component 156 pathbits.pop() 157 new_path = os.path.join(*pathbits) 158 return GeminiItem(self._derive_url(new_path)) 159 160 def query(self, query): 161 query = urllib.parse.quote(query) 162 return GeminiItem(self._derive_url(query=query)) 163 164 def _derive_url(self, path="", query=""): 165 """ 166 A thin wrapper around urlunparse which avoids inserting standard ports 167 into URLs just to keep things clean. 168 """ 169 return urllib.parse.urlunparse((self.scheme, 170 self.host if self.port == standard_ports[self.scheme] else self.host + ":" + str(self.port), 171 path or self.path, "", query, "")) 172 173 def absolutise_url(self, relative_url): 174 """ 175 Convert a relative URL to an absolute URL by using the URL of this 176 GeminiItem as a base. 177 """ 178 return urllib.parse.urljoin(self.url, relative_url) 179 180 def to_map_line(self, name=None): 181 if name or self.name: 182 return "=> {} {}\n".format(self.url, name or self.name) 183 else: 184 return "=> {}\n".format(self.url) 185 186 @classmethod 187 def from_map_line(cls, line, origin_gi): 188 assert line.startswith("=>") 189 assert line[2:].strip() 190 bits = line[2:].strip().split(maxsplit=1) 191 bits[0] = origin_gi.absolutise_url(bits[0]) 192 return cls(*bits) 193 194 CRLF = '\r\n' 195 196 # Cheap and cheerful URL detector 197 def looks_like_url(word): 198 return "." in word and word.startswith("gemini://") 199 200 class UserAbortException(Exception): 201 pass 202 203 # GeminiClient Decorators 204 def needs_gi(inner): 205 def outer(self, *args, **kwargs): 206 if not self.gi: 207 print("You need to 'go' somewhere, first") 208 return None 209 else: 210 return inner(self, *args, **kwargs) 211 outer.__doc__ = inner.__doc__ 212 return outer 213 214 def restricted(inner): 215 def outer(self, *args, **kwargs): 216 if self.restricted: 217 print("Sorry, this command is not available in restricted mode!") 218 return None 219 else: 220 return inner(self, *args, **kwargs) 221 outer.__doc__ = inner.__doc__ 222 return outer 223 224 class GeminiClient(cmd.Cmd): 225 226 def __init__(self, restricted=False): 227 cmd.Cmd.__init__(self) 228 229 # Set umask so that nothing we create can be read by anybody else. 230 # The certificate cache and TOFU database contain "browser history" 231 # type sensitivie information. 232 os.umask(0o077) 233 234 # Find config directory 235 ## Look for something pre-existing 236 for confdir in ("~/.av98/", "~/.config/av98/"): 237 confdir = os.path.expanduser(confdir) 238 if os.path.exists(confdir): 239 self.config_dir = confdir 240 break 241 ## Otherwise, make one in .config if it exists 242 else: 243 if os.path.exists(os.path.expanduser("~/.config/")): 244 self.config_dir = os.path.expanduser("~/.config/av98/") 245 else: 246 self.config_dir = os.path.expanduser("~/.av98/") 247 print("Creating config directory {}".format(self.config_dir)) 248 os.makedirs(self.config_dir) 249 250 self.no_cert_prompt = "\x1b[38;5;76m" + "AV-98" + "\x1b[38;5;255m" + "> " + "\x1b[0m" 251 self.cert_prompt = "\x1b[38;5;202m" + "AV-98" + "\x1b[38;5;255m" 252 self.prompt = self.no_cert_prompt 253 self.gi = None 254 self.history = [] 255 self.hist_index = 0 256 self.idx_filename = "" 257 self.index = [] 258 self.index_index = -1 259 self.lookup = self.index 260 self.marks = {} 261 self.page_index = 0 262 self.permanent_redirects = {} 263 self.previous_redirectors = set() 264 self.restricted = restricted 265 self.tmp_filename = "" 266 self.visited_hosts = set() 267 self.waypoints = [] 268 269 self.client_certs = { 270 "active": None 271 } 272 self.active_cert_domains = [] 273 self.active_is_transient = False 274 self.transient_certs_created = [] 275 276 self.options = { 277 "debug" : False, 278 "ipv6" : True, 279 "timeout" : 600, 280 "width" : 80, 281 "auto_follow_redirects" : True, 282 "gopher_proxy" : None, 283 "tls_mode" : "tofu", 284 "http_proxy": None, 285 "cache" : False 286 } 287 288 self.log = { 289 "start_time": time.time(), 290 "requests": 0, 291 "ipv4_requests": 0, 292 "ipv6_requests": 0, 293 "bytes_recvd": 0, 294 "ipv4_bytes_recvd": 0, 295 "ipv6_bytes_recvd": 0, 296 "dns_failures": 0, 297 "refused_connections": 0, 298 "reset_connections": 0, 299 "timeouts": 0, 300 "cache_hits": 0, 301 } 302 303 self._connect_to_tofu_db() 304 305 self.cache = {} 306 self.cache_timestamps = {} 307 308 def _connect_to_tofu_db(self): 309 310 db_path = os.path.join(self.config_dir, "tofu.db") 311 self.db_conn = sqlite3.connect(db_path) 312 self.db_cur = self.db_conn.cursor() 313 314 self.db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache 315 (hostname text, address text, fingerprint text, 316 first_seen date, last_seen date, count integer)""") 317 318 def _go_to_gi(self, gi, update_hist=True, check_cache=True, handle=True): 319 """This method might be considered "the heart of AV-98". 320 Everything involved in fetching a gemini resource happens here: 321 sending the request over the network, parsing the response if 322 its a menu, storing the response in a temporary file, choosing 323 and calling a handler program, and updating the history.""" 324 325 # Don't try to speak to servers running other protocols 326 if gi.scheme in ("http", "https"): 327 if not self.options.get("http_proxy",None): 328 webbrowser.open_new_tab(gi.url) 329 return 330 else: 331 print("Do you want to try to open this link with a http proxy?") 332 resp = input("(Y)/N ") 333 if resp.strip().lower() in ("n","no"): 334 webbrowser.open_new_tab(gi.url) 335 return 336 elif gi.scheme == "gopher" and not self.options.get("gopher_proxy", None): 337 print("""AV-98 does not speak Gopher natively. 338 However, you can use `set gopher_proxy hostname:port` to tell it about a 339 Gopher-to-Gemini proxy (such as a running Agena instance), in which case 340 you'll be able to transparently follow links to Gopherspace!""") 341 return 342 elif gi.local: 343 if os.path.exists(gi.path): 344 with open(gi.path,'r') as f: 345 body = f.read() 346 self._handle_gemtext(body,gi) 347 self.gi = gi 348 self._update_history(gi) 349 return 350 else: 351 print("Sorry, that file does not exist.") 352 return 353 elif gi.scheme not in ("gemini", "gopher"): 354 print("Sorry, no support for {} links.".format(gi.scheme)) 355 return 356 357 # Obey permanent redirects 358 if gi.url in self.permanent_redirects: 359 new_gi = GeminiItem(self.permanent_redirects[gi.url], name=gi.name) 360 self._go_to_gi(new_gi) 361 return 362 363 # Use cache, or hit the network if resource is not cached 364 if check_cache and self.options["cache"] and self._is_cached(gi.url): 365 mime, body, tmpfile = self._get_cached(gi.url) 366 else: 367 try: 368 gi, mime, body, tmpfile = self._fetch_over_network(gi) 369 except UserAbortException: 370 return 371 except Exception as err: 372 # Print an error message 373 if isinstance(err, socket.gaierror): 374 self.log["dns_failures"] += 1 375 print("ERROR: DNS error!") 376 elif isinstance(err, ConnectionRefusedError): 377 self.log["refused_connections"] += 1 378 print("ERROR: Connection refused!") 379 elif isinstance(err, ConnectionResetError): 380 self.log["reset_connections"] += 1 381 print("ERROR: Connection reset!") 382 elif isinstance(err, (TimeoutError, socket.timeout)): 383 self.log["timeouts"] += 1 384 print("""ERROR: Connection timed out! 385 Slow internet connection? Use 'set timeout' to be more patient.""") 386 else: 387 print("ERROR: " + str(err)) 388 return 389 390 # Pass file to handler, unless we were asked not to 391 if handle: 392 if mime == "text/gemini": 393 self._handle_gemtext(body, gi) 394 else: 395 cmd_str = self._get_handler_cmd(mime) 396 try: 397 subprocess.call(shlex.split(cmd_str % tmpfile)) 398 except FileNotFoundError: 399 print("Handler program %s not found!" % shlex.split(cmd_str)[0]) 400 print("You can use the ! command to specify another handler program or pipeline.") 401 402 # Update state 403 self.gi = gi 404 self.mime = mime 405 if update_hist: 406 self._update_history(gi) 407 408 def _fetch_over_network(self, gi): 409 410 # Be careful with client certificates! 411 # Are we crossing a domain boundary? 412 if self.active_cert_domains and gi.host not in self.active_cert_domains: 413 if self.active_is_transient: 414 print("Permanently delete currently active transient certificate?") 415 resp = input("Y/N? ") 416 if resp.strip().lower() in ("y", "yes"): 417 print("Destroying certificate.") 418 self._deactivate_client_cert() 419 else: 420 print("Staying here.") 421 raise UserAbortException() 422 else: 423 print("PRIVACY ALERT: Deactivate client cert before connecting to a new domain?") 424 resp = input("Y/N? ") 425 if resp.strip().lower() in ("n", "no"): 426 print("Keeping certificate active for {}".format(gi.host)) 427 else: 428 print("Deactivating certificate.") 429 self._deactivate_client_cert() 430 431 # Suggest reactivating previous certs 432 if not self.client_certs["active"] and gi.host in self.client_certs: 433 print("PRIVACY ALERT: Reactivate previously used client cert for {}?".format(gi.host)) 434 resp = input("Y/N? ") 435 if resp.strip().lower() in ("y", "yes"): 436 self._activate_client_cert(*self.client_certs[gi.host]) 437 else: 438 print("Remaining unidentified.") 439 self.client_certs.pop(gi.host) 440 441 # Is this a local file? 442 if gi.local: 443 address, f = None, open(gi.path, "rb") 444 else: 445 address, f = self._send_request(gi) 446 447 # Spec dictates <META> should not exceed 1024 bytes, 448 # so maximum valid header length is 1027 bytes. 449 header = f.readline(1027) 450 header = header.decode("UTF-8") 451 if not header or header[-1] != '\n': 452 raise RuntimeError("Received invalid header from server!") 453 header = header.strip() 454 self._debug("Response header: %s." % header) 455 456 # Validate header 457 status, meta = header.split(maxsplit=1) 458 if len(meta) > 1024 or len(status) != 2 or not status.isnumeric(): 459 f.close() 460 raise RuntimeError("Received invalid header from server!") 461 462 # Update redirect loop/maze escaping state 463 if not status.startswith("3"): 464 self.previous_redirectors = set() 465 466 # Handle non-SUCCESS headers, which don't have a response body 467 # Inputs 468 if status.startswith("1"): 469 print(meta) 470 if status == "11": 471 user_input = getpass.getpass("> ") 472 else: 473 user_input = input("> ") 474 return self._fetch_over_network(gi.query(user_input)) 475 476 # Redirects 477 elif status.startswith("3"): 478 new_gi = GeminiItem(gi.absolutise_url(meta)) 479 if new_gi.url == gi.url: 480 raise RuntimeError("URL redirects to itself!") 481 elif new_gi.url in self.previous_redirectors: 482 raise RuntimeError("Caught in redirect loop!") 483 elif len(self.previous_redirectors) == _MAX_REDIRECTS: 484 raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS) 485 # Never follow cross-domain redirects without asking 486 elif new_gi.host != gi.host: 487 follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url) 488 # Never follow cross-protocol redirects without asking 489 elif new_gi.scheme != gi.scheme: 490 follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url) 491 # Don't follow *any* redirect without asking if auto-follow is off 492 elif not self.options["auto_follow_redirects"]: 493 follow = input("Follow redirect to %s? (y/n) " % new_gi.url) 494 # Otherwise, follow away 495 else: 496 follow = "yes" 497 if follow.strip().lower() not in ("y", "yes"): 498 raise UserAbortException() 499 self._debug("Following redirect to %s." % new_gi.url) 500 self._debug("This is consecutive redirect number %d." % len(self.previous_redirectors)) 501 self.previous_redirectors.add(gi.url) 502 if status == "31": 503 # Permanent redirect 504 self.permanent_redirects[gi.url] = new_gi.url 505 return self._fetch_over_network(new_gi) 506 507 # Errors 508 elif status.startswith("4") or status.startswith("5"): 509 raise RuntimeError(meta) 510 511 # Client cert 512 elif status.startswith("6"): 513 self._handle_cert_request(meta) 514 return self._fetch_over_network(gi) 515 516 # Invalid status 517 elif not status.startswith("2"): 518 raise RuntimeError("Server returned undefined status code %s!" % status) 519 520 # If we're here, this must be a success and there's a response body 521 assert status.startswith("2") 522 523 mime = meta 524 if mime == "": 525 mime = "text/gemini; charset=utf-8" 526 527 msg = EmailMessage() 528 msg['Content-Type'] = mime 529 mime, mime_options = msg.get_content_type(), msg['Content-Type'].params 530 if "charset" in mime_options: 531 try: 532 codecs.lookup(mime_options["charset"]) 533 except LookupError: 534 raise RuntimeError("Header declared unknown encoding %s" % value) 535 536 # Read the response body over the network 537 body = f.read() 538 539 # Save the result in a temporary file 540 ## Set file mode 541 if mime.startswith("text/"): 542 mode = "w" 543 encoding = mime_options.get("charset", "UTF-8") 544 try: 545 body = body.decode(encoding) 546 except UnicodeError: 547 raise RuntimeError("Could not decode response body using %s encoding declared in header!" % encoding) 548 else: 549 mode = "wb" 550 encoding = None 551 ## Write 552 tmpf = tempfile.NamedTemporaryFile(mode, encoding=encoding, delete=False) 553 size = tmpf.write(body) 554 tmpf.close() 555 self.tmp_filename = tmpf.name 556 self._debug("Wrote %d byte response to %s." % (size, self.tmp_filename)) 557 558 # Maintain cache and log 559 if self.options["cache"]: 560 self._add_to_cache(gi.url, mime, self.tmp_filename) 561 self._log_visit(gi, address, size) 562 563 return gi, mime, body, self.tmp_filename 564 565 def _send_request(self, gi): 566 """Send a selector to a given host and port. 567 Returns the resolved address and binary file with the reply.""" 568 if gi.scheme == "gemini": 569 # For Gemini requests, connect to the host and port specified in the URL 570 host, port = gi.host, gi.port 571 elif gi.scheme == "gopher": 572 # For Gopher requests, use the configured proxy 573 host, port = self.options["gopher_proxy"].rsplit(":", 1) 574 self._debug("Using gopher proxy: " + self.options["gopher_proxy"]) 575 elif gi.scheme in ("http", "https"): 576 host, port = self.options["http_proxy"].rsplit(":",1) 577 self._debug("Using http proxy: " + self.options["http_proxy"]) 578 # Do DNS resolution 579 addresses = self._get_addresses(host, port) 580 581 # Prepare TLS context 582 protocol = ssl.PROTOCOL_TLS_CLIENT # if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2 583 context = ssl.SSLContext(protocol) 584 # Use CAs or TOFU 585 if self.options["tls_mode"] == "ca": 586 context.verify_mode = ssl.CERT_REQUIRED 587 context.check_hostname = True 588 context.load_default_certs() 589 else: 590 context.check_hostname = False 591 context.verify_mode = ssl.CERT_NONE 592 # Impose minimum TLS version 593 ## In 3.7 and above, this is easy... 594 if sys.version_info.minor >= 7: 595 context.minimum_version = ssl.TLSVersion.TLSv1_2 596 ## Otherwise, it seems very hard... 597 ## The below is less strict than it ought to be, but trying to disable 598 ## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures 599 ## with recent versions of OpenSSL. What a mess... 600 else: 601 context.options |= ssl.OP_NO_SSLv3 602 context.options |= ssl.OP_NO_SSLv2 603 # Try to enforce sensible ciphers 604 try: 605 context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH") 606 except ssl.SSLError: 607 # Rely on the server to only support sensible things, I guess... 608 pass 609 # Load client certificate if needed 610 if self.client_certs["active"]: 611 certfile, keyfile = self.client_certs["active"] 612 context.load_cert_chain(certfile, keyfile) 613 614 # Connect to remote host by any address possible 615 err = None 616 for address in addresses: 617 self._debug("Connecting to: " + str(address[4])) 618 s = socket.socket(address[0], address[1]) 619 s.settimeout(self.options["timeout"]) 620 s = context.wrap_socket(s, server_hostname = gi.host) 621 try: 622 s.connect(address[4]) 623 break 624 except OSError as e: 625 err = e 626 else: 627 # If we couldn't connect to *any* of the addresses, just 628 # bubble up the exception from the last attempt and deny 629 # knowledge of earlier failures. 630 raise err 631 632 if sys.version_info.minor >=5: 633 self._debug("Established {} connection.".format(s.version())) 634 self._debug("Cipher is: {}.".format(s.cipher())) 635 636 # Do TOFU 637 if self.options["tls_mode"] != "ca": 638 cert = s.getpeercert(binary_form=True) 639 self._validate_cert(address[4][0], host, cert) 640 641 # Remember that we showed the current cert to this domain... 642 if self.client_certs["active"]: 643 self.active_cert_domains.append(gi.host) 644 self.client_certs[gi.host] = self.client_certs["active"] 645 646 # Send request and wrap response in a file descriptor 647 self._debug("Sending %s<CRLF>" % gi.url) 648 s.sendall((gi.url + CRLF).encode("UTF-8")) 649 return address, s.makefile(mode = "rb") 650 651 def _get_addresses(self, host, port): 652 # DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled 653 if ":" in host: 654 # This is likely a literal IPv6 address, so we can *only* ask for 655 # IPv6 addresses or getaddrinfo will complain 656 family_mask = socket.AF_INET6 657 elif socket.has_ipv6 and self.options["ipv6"]: 658 # Accept either IPv4 or IPv6 addresses 659 family_mask = 0 660 else: 661 # IPv4 only 662 family_mask = socket.AF_INET 663 addresses = socket.getaddrinfo(host, port, family=family_mask, 664 type=socket.SOCK_STREAM) 665 # Sort addresses so IPv6 ones come first 666 addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True) 667 668 return addresses 669 670 def _is_cached(self, url): 671 if url not in self.cache: 672 return False 673 now = time.time() 674 cached = self.cache_timestamps[url] 675 if now - cached > _MAX_CACHE_AGE_SECS: 676 self._debug("Expiring old cached copy of resource.") 677 self._remove_from_cache(url) 678 return False 679 self._debug("Found cached copy of resource.") 680 return True 681 682 def _remove_from_cache(self, url): 683 self.cache_timestamps.pop(url) 684 mime, filename = self.cache.pop(url) 685 os.unlink(filename) 686 self._validate_cache() 687 688 def _add_to_cache(self, url, mime, filename): 689 690 self.cache_timestamps[url] = time.time() 691 self.cache[url] = (mime, filename) 692 if len(self.cache) > _MAX_CACHE_SIZE: 693 self._trim_cache() 694 self._validate_cache() 695 696 def _trim_cache(self): 697 # Order cache entries by age 698 lru = [(t, u) for (u, t) in self.cache_timestamps.items()] 699 lru.sort() 700 # Drop the oldest entry no matter what 701 _, url = lru[0] 702 self._debug("Dropping cached copy of {} from full cache.".format(url)) 703 self._remove_from_cache(url) 704 # Drop other entries if they are older than the limit 705 now = time.time() 706 for cached, url in lru[1:]: 707 if now - cached > _MAX_CACHE_AGE_SECS: 708 self._debug("Dropping cached copy of {} from full cache.".format(url)) 709 self._remove_from_cache(url) 710 else: 711 break 712 self._validate_cache() 713 714 def _get_cached(self, url): 715 mime, filename = self.cache[url] 716 self.log["cache_hits"] += 1 717 if mime.startswith("text/gemini"): 718 with open(filename, "r") as fp: 719 body = fp.read() 720 return mime, body, filename 721 else: 722 return mime, None, filename 723 724 def _empty_cache(self): 725 for mime, filename in self.cache.values(): 726 if os.path.exists(filename): 727 os.unlink(filename) 728 729 def _validate_cache(self): 730 assert self.cache.keys() == self.cache_timestamps.keys() 731 for _, filename in self.cache.values(): 732 assert os.path.isfile(filename) 733 734 def _handle_cert_request(self, meta): 735 736 # Don't do client cert stuff in restricted mode, as in principle 737 # it could be used to fill up the disk by creating a whole lot of 738 # certificates 739 if self.restricted: 740 print("The server is requesting a client certificate.") 741 print("These are not supported in restricted mode, sorry.") 742 raise UserAbortException() 743 744 print("SERVER SAYS: ", meta) 745 # Present different messages for different 6x statuses, but 746 # handle them the same. 747 if status in ("64", "65"): 748 print("The server rejected your certificate because it is either expired or not yet valid.") 749 elif status == "63": 750 print("The server did not accept your certificate.") 751 print("You may need to e.g. coordinate with the admin to get your certificate fingerprint whitelisted.") 752 else: 753 print("The site {} is requesting a client certificate.".format(gi.host)) 754 print("This will allow the site to recognise you across requests.") 755 756 # Give the user choices 757 print("What do you want to do?") 758 print("1. Give up.") 759 print("2. Generate a new transient certificate.") 760 print("3. Generate a new persistent certificate.") 761 print("4. Load a previously generated persistent.") 762 print("5. Load certificate from an external file.") 763 choice = input("> ").strip() 764 if choice == "2": 765 self._generate_transient_cert_cert() 766 elif choice == "3": 767 self._generate_persistent_client_cert() 768 elif choice == "4": 769 self._choose_client_cert() 770 elif choice == "5": 771 self._load_client_cert() 772 else: 773 print("Giving up.") 774 raise UserAbortException() 775 776 def _validate_cert(self, address, host, cert): 777 """ 778 Validate a TLS certificate in TOFU mode. 779 780 If the cryptography module is installed: 781 - Check the certificate Common Name or SAN matches `host` 782 - Check the certificate's not valid before date is in the past 783 - Check the certificate's not valid after date is in the future 784 785 Whether the cryptography module is installed or not, check the 786 certificate's fingerprint against the TOFU database to see if we've 787 previously encountered a different certificate for this IP address and 788 hostname. 789 """ 790 now = datetime.datetime.utcnow() 791 if _HAS_CRYPTOGRAPHY: 792 # Using the cryptography module we can get detailed access 793 # to the properties of even self-signed certs, unlike in 794 # the standard ssl library... 795 c = x509.load_der_x509_certificate(cert, _BACKEND) 796 797 sha = hashlib.sha256() 798 sha.update(cert) 799 fingerprint = sha.hexdigest() 800 801 # Have we been here before? 802 self.db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count 803 FROM cert_cache 804 WHERE hostname=? AND address=?""", (host, address)) 805 cached_certs = self.db_cur.fetchall() 806 807 # If so, check for a match 808 if cached_certs: 809 max_count = 0 810 most_frequent_cert = None 811 for cached_fingerprint, first, last, count in cached_certs: 812 if count > max_count: 813 max_count = count 814 most_frequent_cert = cached_fingerprint 815 if fingerprint == cached_fingerprint: 816 # Matched! 817 self._debug("TOFU: Accepting previously seen ({} times) certificate {}".format(count, fingerprint)) 818 self.db_cur.execute("""UPDATE cert_cache 819 SET last_seen=?, count=? 820 WHERE hostname=? AND address=? AND fingerprint=?""", 821 (now, count+1, host, address, fingerprint)) 822 self.db_conn.commit() 823 break 824 else: 825 if _HAS_CRYPTOGRAPHY: 826 # Load the most frequently seen certificate to see if it has 827 # expired 828 certdir = os.path.join(self.config_dir, "cert_cache") 829 with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp: 830 previous_cert = fp.read() 831 previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND) 832 previous_ttl = previous_cert.not_valid_after - now 833 print(previous_ttl) 834 835 self._debug("TOFU: Unrecognised certificate {}! Raising the alarm...".format(fingerprint)) 836 print("****************************************") 837 print("[SECURITY WARNING] Unrecognised certificate!") 838 print("The certificate presented for {} ({}) has never been seen before.".format(host, address)) 839 print("This MIGHT be a Man-in-the-Middle attack.") 840 print("A different certificate has previously been seen {} times.".format(max_count)) 841 if _HAS_CRYPTOGRAPHY: 842 if previous_ttl < datetime.timedelta(): 843 print("That certificate has expired, which reduces suspicion somewhat.") 844 else: 845 print("That certificate is still valid for: {}".format(previous_ttl)) 846 print("****************************************") 847 print("Attempt to verify the new certificate fingerprint out-of-band:") 848 print(fingerprint) 849 choice = input("Accept this new certificate? Y/N ").strip().lower() 850 if choice in ("y", "yes"): 851 self.db_cur.execute("""INSERT INTO cert_cache 852 VALUES (?, ?, ?, ?, ?, ?)""", 853 (host, address, fingerprint, now, now, 1)) 854 self.db_conn.commit() 855 with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp: 856 fp.write(cert) 857 else: 858 raise Exception("TOFU Failure!") 859 860 # If not, cache this cert 861 else: 862 self._debug("TOFU: Blindly trusting first ever certificate for this host!") 863 self.db_cur.execute("""INSERT INTO cert_cache 864 VALUES (?, ?, ?, ?, ?, ?)""", 865 (host, address, fingerprint, now, now, 1)) 866 self.db_conn.commit() 867 certdir = os.path.join(self.config_dir, "cert_cache") 868 if not os.path.exists(certdir): 869 os.makedirs(certdir) 870 with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp: 871 fp.write(cert) 872 873 def _get_handler_cmd(self, mimetype): 874 # Now look for a handler for this mimetype 875 # Consider exact matches before wildcard matches 876 exact_matches = [] 877 wildcard_matches = [] 878 for handled_mime, cmd_str in _MIME_HANDLERS.items(): 879 if "*" in handled_mime: 880 wildcard_matches.append((handled_mime, cmd_str)) 881 else: 882 exact_matches.append((handled_mime, cmd_str)) 883 for handled_mime, cmd_str in exact_matches + wildcard_matches: 884 if fnmatch.fnmatch(mimetype, handled_mime): 885 break 886 else: 887 # Use "xdg-open" as a last resort. 888 cmd_str = "xdg-open %s" 889 self._debug("Using handler: %s" % cmd_str) 890 return cmd_str 891 892 def _handle_gemtext(self, body, menu_gi, display=True): 893 self.options["width"] = os.get_terminal_size().columns 894 self.index = [] 895 preformatted = False 896 if self.idx_filename: 897 os.unlink(self.idx_filename) 898 tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False) 899 self.idx_filename = tmpf.name 900 for line in body.splitlines(): 901 if line.startswith("```"): 902 preformatted = not preformatted 903 elif preformatted: 904 tmpf.write(line + "\n") 905 elif line.startswith("=>"): 906 try: 907 gi = GeminiItem.from_map_line(line, menu_gi) 908 self.index.append(gi) 909 tmpf.write(self._format_geminiitem(len(self.index), gi) + "\n") 910 except: 911 self._debug("Skipping possible link: %s" % line) 912 elif line.startswith("* "): 913 line = line[1:].lstrip("\t ") 914 tmpf.write(textwrap.fill(line, self.options["width"], 915 initial_indent = "• ", subsequent_indent=" ") + "\n") 916 elif line.startswith(">"): 917 line = line[1:].lstrip("\t ") 918 tmpf.write(textwrap.fill(line, self.options["width"], 919 initial_indent = "> ", subsequent_indent="> ") + "\n") 920 elif line.startswith("###"): 921 line = line[3:].lstrip("\t ") 922 tmpf.write("\x1b[4m" + line + "\x1b[0m""\n") 923 elif line.startswith("##"): 924 line = line[2:].lstrip("\t ") 925 tmpf.write("\x1b[1m" + line + "\x1b[0m""\n") 926 elif line.startswith("#"): 927 line = line[1:].lstrip("\t ") 928 tmpf.write("\x1b[1m\x1b[4m" + line + "\x1b[0m""\n") 929 else: 930 tmpf.write(textwrap.fill(line, self.options["width"]) + "\n") 931 tmpf.close() 932 933 self.lookup = self.index 934 self.page_index = 0 935 self.index_index = -1 936 937 if display: 938 cmd_str = self._get_handler_cmd("text/gemini") 939 subprocess.call(shlex.split(cmd_str % self.idx_filename)) 940 941 def _format_geminiitem(self, index, gi, url=False): 942 protocol = "" if gi.scheme == "gemini" else " %s" % gi.scheme 943 line = f"[{index}{protocol}] {gi.url} {gi.name}" if gi.name and url else f"[{index}{protocol}] {gi.name}" 944 return line 945 946 def _show_lookup(self, offset=0, end=None, url=False): 947 for n, gi in enumerate(self.lookup[offset:end]): 948 print(self._format_geminiitem(n+offset+1, gi, url)) 949 950 def _update_history(self, gi): 951 # Don't duplicate 952 if self.history and self.history[self.hist_index] == gi: 953 return 954 self.history = self.history[0:self.hist_index+1] 955 self.history.append(gi) 956 self.hist_index = len(self.history) - 1 957 958 def _log_visit(self, gi, address, size): 959 if not address: 960 return 961 self.log["requests"] += 1 962 self.log["bytes_recvd"] += size 963 self.visited_hosts.add(address) 964 if address[0] == socket.AF_INET: 965 self.log["ipv4_requests"] += 1 966 self.log["ipv4_bytes_recvd"] += size 967 elif address[0] == socket.AF_INET6: 968 self.log["ipv6_requests"] += 1 969 self.log["ipv6_bytes_recvd"] += size 970 971 def _get_active_tmpfile(self): 972 if self.mime == "text/gemini": 973 return self.idx_filename 974 else: 975 return self.tmp_filename 976 977 def _debug(self, debug_text): 978 if not self.options["debug"]: 979 return 980 debug_text = "\x1b[0;32m[DEBUG] " + debug_text + "\x1b[0m" 981 print(debug_text) 982 983 def _load_client_cert(self): 984 """ 985 Interactively load a TLS client certificate from the filesystem in PEM 986 format. 987 """ 988 print("Loading client certificate file, in PEM format (blank line to cancel)") 989 certfile = input("Certfile path: ").strip() 990 if not certfile: 991 print("Aborting.") 992 return 993 certfile = os.path.expanduser(certfile) 994 if not os.path.isfile(certfile): 995 print("Certificate file {} does not exist.".format(certfile)) 996 return 997 print("Loading private key file, in PEM format (blank line to cancel)") 998 keyfile = input("Keyfile path: ").strip() 999 if not keyfile: 1000 print("Aborting.") 1001 return 1002 keyfile = os.path.expanduser(keyfile) 1003 if not os.path.isfile(keyfile): 1004 print("Private key file {} does not exist.".format(keyfile)) 1005 return 1006 self._activate_client_cert(certfile, keyfile) 1007 1008 def _generate_transient_cert_cert(self): 1009 """ 1010 Use `openssl` command to generate a new transient client certificate 1011 with 24 hours of validity. 1012 """ 1013 certdir = os.path.join(self.config_dir, "transient_certs") 1014 name = str(uuid.uuid4()) 1015 self._generate_client_cert(certdir, name, transient=True) 1016 self.active_is_transient = True 1017 self.transient_certs_created.append(name) 1018 1019 def _generate_persistent_client_cert(self): 1020 """ 1021 Interactively use `openssl` command to generate a new persistent client 1022 certificate with one year of validity. 1023 """ 1024 certdir = os.path.join(self.config_dir, "client_certs") 1025 print("What do you want to name this new certificate?") 1026 print("Answering `mycert` will create `{0}/mycert.crt` and `{0}/mycert.key`".format(certdir)) 1027 name = input("> ") 1028 if not name.strip(): 1029 print("Aborting.") 1030 return 1031 self._generate_client_cert(certdir, name) 1032 1033 def _generate_client_cert(self, certdir, basename, transient=False): 1034 """ 1035 Use `openssl` binary to generate a client certificate (which may be 1036 transient or persistent) and save the certificate and private key to the 1037 specified directory with the specified basename. 1038 """ 1039 if not os.path.exists(certdir): 1040 os.makedirs(certdir) 1041 certfile = os.path.join(certdir, basename+".crt") 1042 keyfile = os.path.join(certdir, basename+".key") 1043 cmd = "openssl req -x509 -newkey rsa:2048 -days {} -nodes -keyout {} -out {}".format(1 if transient else 365, keyfile, certfile) 1044 if transient: 1045 cmd += " -subj '/CN={}'".format(basename) 1046 os.system(cmd) 1047 self._activate_client_cert(certfile, keyfile) 1048 1049 def _choose_client_cert(self): 1050 """ 1051 Interactively select a previously generated client certificate and 1052 activate it. 1053 """ 1054 certdir = os.path.join(self.config_dir, "client_certs") 1055 certs = glob.glob(os.path.join(certdir, "*.crt")) 1056 if len(certs) == 0: 1057 print("There are no previously generated certificates.") 1058 return 1059 certdir = {} 1060 for n, cert in enumerate(certs): 1061 certdir[str(n+1)] = (cert, os.path.splitext(cert)[0] + ".key") 1062 print("{}. {}".format(n+1, os.path.splitext(os.path.basename(cert))[0])) 1063 choice = input("> ").strip() 1064 if choice in certdir: 1065 certfile, keyfile = certdir[choice] 1066 self._activate_client_cert(certfile, keyfile) 1067 else: 1068 print("What?") 1069 1070 def _activate_client_cert(self, certfile, keyfile): 1071 self.client_certs["active"] = (certfile, keyfile) 1072 self.active_cert_domains = [] 1073 self.prompt = self.cert_prompt + "+" + os.path.basename(certfile).replace('.crt','') + "> " + "\x1b[0m" 1074 self._debug("Using ID {} / {}.".format(*self.client_certs["active"])) 1075 1076 def _deactivate_client_cert(self): 1077 if self.active_is_transient: 1078 for filename in self.client_certs["active"]: 1079 os.remove(filename) 1080 for domain in self.active_cert_domains: 1081 self.client_certs.pop(domain) 1082 self.client_certs["active"] = None 1083 self.active_cert_domains = [] 1084 self.prompt = self.no_cert_prompt 1085 self.active_is_transient = False 1086 1087 # Cmd implementation follows 1088 1089 def default(self, line): 1090 if line.strip() == "EOF": 1091 return self.onecmd("quit") 1092 elif line.strip() == "..": 1093 return self.do_up() 1094 elif line.startswith("/"): 1095 return self.do_search(line[1:]) 1096 1097 # Expand abbreviated commands 1098 first_word = line.split()[0].strip() 1099 if first_word in _ABBREVS: 1100 full_cmd = _ABBREVS[first_word] 1101 expanded = line.replace(first_word, full_cmd, 1) 1102 return self.onecmd(expanded) 1103 1104 # Try to parse numerical index for lookup table 1105 try: 1106 n = int(line.strip()) 1107 except ValueError: 1108 print("What?") 1109 return 1110 1111 try: 1112 gi = self.lookup[n-1] 1113 except IndexError: 1114 print ("Index too high!") 1115 return 1116 1117 self.index_index = n 1118 self._go_to_gi(gi) 1119 1120 ### Settings 1121 @restricted 1122 def do_set(self, line): 1123 """View or set various options.""" 1124 if not line.strip(): 1125 # Show all current settings 1126 for option in sorted(self.options.keys()): 1127 print("%s %s" % (option, self.options[option])) 1128 elif len(line.split()) == 1: 1129 # Show current value of one specific setting 1130 option = line.strip() 1131 if option in self.options: 1132 print("%s %s" % (option, self.options[option])) 1133 else: 1134 print("Unrecognised option %s" % option) 1135 else: 1136 # Set value of one specific setting 1137 option, value = line.split(" ", 1) 1138 if option not in self.options: 1139 print("Unrecognised option %s" % option) 1140 return 1141 # Validate / convert values 1142 if option == "gopher_proxy": 1143 if ":" not in value: 1144 value += ":1965" 1145 else: 1146 host, port = value.rsplit(":",1) 1147 if not port.isnumeric(): 1148 print("Invalid proxy port %s" % port) 1149 return 1150 elif option == "tls_mode": 1151 if value.lower() not in ("ca", "tofu"): 1152 print("TLS mode must be `ca` or `tofu`!") 1153 return 1154 elif value.isnumeric(): 1155 value = int(value) 1156 elif value.lower() == "false": 1157 value = False 1158 elif value.lower() == "true": 1159 value = True 1160 else: 1161 try: 1162 value = float(value) 1163 except ValueError: 1164 pass 1165 self.options[option] = value 1166 1167 @restricted 1168 def do_cert(self, line): 1169 """Manage client certificates""" 1170 print("Managing client certificates") 1171 if self.client_certs["active"]: 1172 print("Active certificate: {}".format(self.client_certs["active"][0])) 1173 print("1. Deactivate client certificate.") 1174 print("2. Generate new certificate.") 1175 print("3. Load previously generated certificate.") 1176 print("4. Load externally created client certificate from file.") 1177 print("Enter blank line to exit certificate manager.") 1178 choice = input("> ").strip() 1179 if choice == "1": 1180 print("Deactivating client certificate.") 1181 self._deactivate_client_cert() 1182 elif choice == "2": 1183 self._generate_persistent_client_cert() 1184 elif choice == "3": 1185 self._choose_client_cert() 1186 elif choice == "4": 1187 self._load_client_cert() 1188 else: 1189 print("Aborting.") 1190 1191 @restricted 1192 def do_handler(self, line): 1193 """View or set handler commands for different MIME types.""" 1194 if not line.strip(): 1195 # Show all current handlers 1196 for mime in sorted(_MIME_HANDLERS.keys()): 1197 print("%s %s" % (mime, _MIME_HANDLERS[mime])) 1198 elif len(line.split()) == 1: 1199 mime = line.strip() 1200 if mime in _MIME_HANDLERS: 1201 print("%s %s" % (mime, _MIME_HANDLERS[mime])) 1202 else: 1203 print("No handler set for MIME type %s" % mime) 1204 else: 1205 mime, handler = line.split(" ", 1) 1206 _MIME_HANDLERS[mime] = handler 1207 if "%s" not in handler: 1208 print("Are you sure you don't want to pass the filename to the handler?") 1209 1210 def do_abbrevs(self, *args): 1211 """Print all AV-98 command abbreviations.""" 1212 header = "Command Abbreviations:" 1213 self.stdout.write("\n{}\n".format(str(header))) 1214 if self.ruler: 1215 self.stdout.write("{}\n".format(str(self.ruler * len(header)))) 1216 for k, v in _ABBREVS.items(): 1217 self.stdout.write("{:<7} {}\n".format(k, v)) 1218 self.stdout.write("\n") 1219 1220 ### Stuff for getting around 1221 def do_go(self, line): 1222 """Go to a gemini URL or marked item.""" 1223 line = line.strip() 1224 if not line: 1225 print("Go where?") 1226 # First, check for possible marks 1227 elif line in self.marks: 1228 gi = self.marks[line] 1229 self._go_to_gi(gi) 1230 # or a local file 1231 elif os.path.exists(os.path.expanduser(line)): 1232 self._go_to_gi(GeminiItem(line)) 1233 # If this isn't a mark, treat it as a URL 1234 else: 1235 self._go_to_gi(GeminiItem(line)) 1236 1237 @needs_gi 1238 def do_reload(self, *args): 1239 """Reload the current URL.""" 1240 self._go_to_gi(self.gi, check_cache=False) 1241 1242 @needs_gi 1243 def do_up(self, *args): 1244 """Go up one directory in the path.""" 1245 self._go_to_gi(self.gi.up()) 1246 1247 def do_back(self, *args): 1248 """Go back to the previous gemini item.""" 1249 if not self.history or self.hist_index == 0: 1250 return 1251 self.hist_index -= 1 1252 gi = self.history[self.hist_index] 1253 self._go_to_gi(gi, update_hist=False) 1254 1255 def do_forward(self, *args): 1256 """Go forward to the next gemini item.""" 1257 if not self.history or self.hist_index == len(self.history) - 1: 1258 return 1259 self.hist_index += 1 1260 gi = self.history[self.hist_index] 1261 self._go_to_gi(gi, update_hist=False) 1262 1263 def do_next(self, *args): 1264 """Go to next item after current in index.""" 1265 return self.onecmd(str(self.index_index+1)) 1266 1267 def do_previous(self, *args): 1268 """Go to previous item before current in index.""" 1269 self.lookup = self.index 1270 return self.onecmd(str(self.index_index-1)) 1271 1272 @needs_gi 1273 def do_root(self, *args): 1274 """Go to root selector of the server hosting current item.""" 1275 self._go_to_gi(self.gi.root()) 1276 1277 def do_tour(self, line): 1278 """Add index items as waypoints on a tour, which is basically a FIFO 1279 queue of gemini items. 1280 1281 Items can be added with `tour 1 2 3 4` or ranges like `tour 1-4`. 1282 All items in current menu can be added with `tour *`. 1283 Current tour can be listed with `tour ls` and scrubbed with `tour clear`.""" 1284 line = line.strip() 1285 if not line: 1286 # Fly to next waypoint on tour 1287 if not self.waypoints: 1288 print("End of tour.") 1289 else: 1290 gi = self.waypoints.pop(0) 1291 self._go_to_gi(gi) 1292 elif line == "ls": 1293 old_lookup = self.lookup 1294 self.lookup = self.waypoints 1295 self._show_lookup() 1296 self.lookup = old_lookup 1297 elif line == "clear": 1298 self.waypoints = [] 1299 elif line == "*": 1300 self.waypoints.extend(self.lookup) 1301 elif looks_like_url(line): 1302 self.waypoints.append(GeminiItem(line)) 1303 else: 1304 for index in line.split(): 1305 try: 1306 pair = index.split('-') 1307 if len(pair) == 1: 1308 # Just a single index 1309 n = int(index) 1310 gi = self.lookup[n-1] 1311 self.waypoints.append(gi) 1312 elif len(pair) == 2: 1313 # Two endpoints for a range of indices 1314 if int(pair[0]) < int(pair[1]): 1315 for n in range(int(pair[0]), int(pair[1]) + 1): 1316 gi = self.lookup[n-1] 1317 self.waypoints.append(gi) 1318 else: 1319 for n in range(int(pair[0]), int(pair[1]) - 1, -1): 1320 gi = self.lookup[n-1] 1321 self.waypoints.append(gi) 1322 1323 else: 1324 # Syntax error 1325 print("Invalid use of range syntax %s, skipping" % index) 1326 except ValueError: 1327 print("Non-numeric index %s, skipping." % index) 1328 except IndexError: 1329 print("Invalid index %d, skipping." % n) 1330 1331 @needs_gi 1332 def do_mark(self, line): 1333 """Mark the current item with a single letter. This letter can then 1334 be passed to the 'go' command to return to the current item later. 1335 Think of it like marks in vi: 'mark a'='ma' and 'go a'=''a'.""" 1336 line = line.strip() 1337 if not line: 1338 for mark, gi in self.marks.items(): 1339 print("[%s] %s (%s)" % (mark, gi.name, gi.url)) 1340 elif line.isalpha() and len(line) == 1: 1341 self.marks[line] = self.gi 1342 else: 1343 print("Invalid mark, must be one letter") 1344 1345 def do_version(self, line): 1346 """Display version information.""" 1347 print("AV-98 " + _VERSION) 1348 1349 ### Stuff that modifies the lookup table 1350 def do_ls(self, line): 1351 """List contents of current index. 1352 Use 'ls -l' to see URLs.""" 1353 self.lookup = self.index 1354 self._show_lookup(url = "-l" in line) 1355 self.page_index = 0 1356 1357 def do_gus(self, line): 1358 """Submit a search query to the geminispace.info search engine.""" 1359 gus = GeminiItem("gemini://geminispace.info/search") 1360 self._go_to_gi(gus.query(line)) 1361 1362 def do_history(self, *args): 1363 """Display history.""" 1364 self.lookup = self.history 1365 self._show_lookup(url=True) 1366 self.page_index = 0 1367 1368 def do_search(self, searchterm): 1369 """Search index (case insensitive).""" 1370 results = [ 1371 gi for gi in self.lookup if searchterm.lower() in gi.name.lower()] 1372 if results: 1373 self.lookup = results 1374 self._show_lookup() 1375 self.page_index = 0 1376 else: 1377 print("No results found.") 1378 1379 def emptyline(self): 1380 """Page through index ten lines at a time.""" 1381 i = self.page_index 1382 if i > len(self.lookup): 1383 return 1384 self._show_lookup(offset=i, end=i+10) 1385 self.page_index += 10 1386 1387 ### Stuff that does something to most recently viewed item 1388 @needs_gi 1389 def do_cat(self, *args): 1390 """Run most recently visited item through "cat" command.""" 1391 subprocess.call(shlex.split("cat %s" % self._get_active_tmpfile())) 1392 1393 @needs_gi 1394 def do_less(self, *args): 1395 """Run most recently visited item through "less" command.""" 1396 cmd_str = self._get_handler_cmd(self.mime) 1397 cmd_str = cmd_str % self._get_active_tmpfile() 1398 subprocess.call("%s | less -R" % cmd_str, shell=True) 1399 1400 @needs_gi 1401 def do_fold(self, *args): 1402 """Run most recently visited item through "fold" command.""" 1403 cmd_str = self._get_handler_cmd(self.mime) 1404 cmd_str = cmd_str % self._get_active_tmpfile() 1405 subprocess.call("%s | fold -w 70 -s" % cmd_str, shell=True) 1406 1407 @restricted 1408 @needs_gi 1409 def do_shell(self, line): 1410 """'cat' most recently visited item through a shell pipeline.""" 1411 subprocess.call(("cat %s |" % self._get_active_tmpfile()) + line, shell=True) 1412 1413 @restricted 1414 @needs_gi 1415 def do_save(self, line): 1416 """Save an item to the filesystem. 1417 'save n filename' saves menu item n to the specified filename. 1418 'save filename' saves the last viewed item to the specified filename. 1419 'save n' saves menu item n to an automagic filename.""" 1420 args = line.strip().split() 1421 1422 # First things first, figure out what our arguments are 1423 if len(args) == 0: 1424 # No arguments given at all 1425 # Save current item, if there is one, to a file whose name is 1426 # inferred from the gemini path 1427 if not self.tmp_filename: 1428 print("You need to visit an item first!") 1429 return 1430 else: 1431 index = None 1432 filename = None 1433 elif len(args) == 1: 1434 # One argument given 1435 # If it's numeric, treat it as an index, and infer the filename 1436 try: 1437 index = int(args[0]) 1438 filename = None 1439 # If it's not numeric, treat it as a filename and 1440 # save the current item 1441 except ValueError: 1442 index = None 1443 filename = os.path.expanduser(args[0]) 1444 elif len(args) == 2: 1445 # Two arguments given 1446 # Treat first as an index and second as filename 1447 index, filename = args 1448 try: 1449 index = int(index) 1450 except ValueError: 1451 print("First argument is not a valid item index!") 1452 return 1453 filename = os.path.expanduser(filename) 1454 else: 1455 print("You must provide an index, a filename, or both.") 1456 return 1457 1458 # Next, fetch the item to save, if it's not the current one. 1459 if index: 1460 last_gi = self.gi 1461 try: 1462 gi = self.lookup[index-1] 1463 self._go_to_gi(gi, update_hist = False, handle = False) 1464 except IndexError: 1465 print ("Index too high!") 1466 self.gi = last_gi 1467 return 1468 else: 1469 gi = self.gi 1470 1471 # Derive filename from current GI's path, if one hasn't been set 1472 if not filename: 1473 filename = os.path.basename(gi.path) 1474 1475 # Check for filename collisions and actually do the save if safe 1476 if os.path.exists(filename): 1477 print("File %s already exists!" % filename) 1478 else: 1479 # Don't use _get_active_tmpfile() here, because we want to save the 1480 # "source code" of menus, not the rendered view - this way AV-98 1481 # can navigate to it later. 1482 shutil.copyfile(self.tmp_filename, filename) 1483 print("Saved to %s" % filename) 1484 1485 # Restore gi if necessary 1486 if index != None: 1487 self._go_to_gi(last_gi, handle=False) 1488 1489 @needs_gi 1490 def do_url(self, *args): 1491 """Print URL of most recently visited item.""" 1492 print(self.gi.url) 1493 1494 ### Bookmarking stuff 1495 @restricted 1496 @needs_gi 1497 def do_add(self, line): 1498 """Add the current URL to the bookmarks menu. 1499 Optionally, specify the new name for the bookmark.""" 1500 with open(os.path.join(self.config_dir, "bookmarks.gmi"), "a") as fp: 1501 fp.write(self.gi.to_map_line(line)) 1502 1503 def do_bookmarks(self, line): 1504 """Show or access the bookmarks menu. 1505 'bookmarks' shows all bookmarks. 1506 'bookmarks n' navigates immediately to item n in the bookmark menu. 1507 Bookmarks are stored using the 'add' command.""" 1508 bm_file = os.path.join(self.config_dir, "bookmarks.gmi") 1509 if not os.path.exists(bm_file): 1510 print("You need to 'add' some bookmarks, first!") 1511 return 1512 args = line.strip() 1513 if len(args.split()) > 1 or (args and not args.isnumeric()): 1514 print("bookmarks command takes a single integer argument!") 1515 return 1516 with open(bm_file, "r") as fp: 1517 body = fp.read() 1518 gi = GeminiItem("localhost/" + bm_file) 1519 self._handle_gemtext(body, gi, display = not args) 1520 if args: 1521 # Use argument as a numeric index 1522 self.default(line) 1523 1524 ### Help 1525 def do_help(self, arg): 1526 """ALARM! Recursion detected! ALARM! Prepare to eject!""" 1527 if arg == "!": 1528 print("! is an alias for 'shell'") 1529 elif arg == "?": 1530 print("? is an alias for 'help'") 1531 else: 1532 cmd.Cmd.do_help(self, arg) 1533 1534 ### Flight recorder 1535 def do_blackbox(self, *args): 1536 """Display contents of flight recorder, showing statistics for the 1537 current gemini browsing session.""" 1538 lines = [] 1539 # Compute flight time 1540 now = time.time() 1541 delta = now - self.log["start_time"] 1542 hours, remainder = divmod(delta, 3600) 1543 minutes, seconds = divmod(remainder, 60) 1544 # Count hosts 1545 ipv4_hosts = len([host for host in self.visited_hosts if host[0] == socket.AF_INET]) 1546 ipv6_hosts = len([host for host in self.visited_hosts if host[0] == socket.AF_INET6]) 1547 # Assemble lines 1548 lines.append(("Patrol duration", "%02d:%02d:%02d" % (hours, minutes, seconds))) 1549 lines.append(("Requests sent:", self.log["requests"])) 1550 lines.append((" IPv4 requests:", self.log["ipv4_requests"])) 1551 lines.append((" IPv6 requests:", self.log["ipv6_requests"])) 1552 lines.append(("Bytes received:", self.log["bytes_recvd"])) 1553 lines.append((" IPv4 bytes:", self.log["ipv4_bytes_recvd"])) 1554 lines.append((" IPv6 bytes:", self.log["ipv6_bytes_recvd"])) 1555 lines.append(("Unique hosts visited:", len(self.visited_hosts))) 1556 lines.append((" IPv4 hosts:", ipv4_hosts)) 1557 lines.append((" IPv6 hosts:", ipv6_hosts)) 1558 lines.append(("DNS failures:", self.log["dns_failures"])) 1559 lines.append(("Timeouts:", self.log["timeouts"])) 1560 lines.append(("Refused connections:", self.log["refused_connections"])) 1561 lines.append(("Reset connections:", self.log["reset_connections"])) 1562 lines.append(("Cache hits:", self.log["cache_hits"])) 1563 # Print 1564 for key, value in lines: 1565 print(key.ljust(24) + str(value).rjust(8)) 1566 1567 ### The end! 1568 def do_quit(self, *args): 1569 """Exit AV-98.""" 1570 # Close TOFU DB 1571 self.db_conn.commit() 1572 self.db_conn.close() 1573 # Clean up after ourself 1574 self._empty_cache() 1575 if self.tmp_filename and os.path.exists(self.tmp_filename): 1576 os.unlink(self.tmp_filename) 1577 if self.idx_filename and os.path.exists(self.idx_filename): 1578 os.unlink(self.idx_filename) 1579 1580 for cert in self.transient_certs_created: 1581 for ext in (".crt", ".key"): 1582 certfile = os.path.join(self.config_dir, "transient_certs", cert+ext) 1583 if os.path.exists(certfile): 1584 os.remove(certfile) 1585 print() 1586 print("Thank you for flying AV-98!") 1587 sys.exit() 1588 1589 do_exit = do_quit 1590 1591 # Main function 1592 def main(): 1593 1594 # Parse args 1595 parser = argparse.ArgumentParser(description='A command line gemini client.') 1596 parser.add_argument('--bookmarks', action='store_true', 1597 help='start with your list of bookmarks') 1598 parser.add_argument('--tls-cert', metavar='FILE', help='TLS client certificate file') 1599 parser.add_argument('--tls-key', metavar='FILE', help='TLS client certificate private key file') 1600 parser.add_argument('--restricted', action="store_true", help='Disallow shell, add, and save commands') 1601 parser.add_argument('--version', action='store_true', 1602 help='display version information and quit') 1603 parser.add_argument('url', metavar='URL', nargs='*', 1604 help='start with this URL') 1605 args = parser.parse_args() 1606 1607 # Handle --version 1608 if args.version: 1609 print("AV-98 " + _VERSION) 1610 sys.exit() 1611 1612 # Instantiate client 1613 gc = GeminiClient(args.restricted) 1614 1615 # Process config file 1616 rcfile = os.path.join(gc.config_dir, "av98rc") 1617 if os.path.exists(rcfile): 1618 print("Using config %s" % rcfile) 1619 with open(rcfile, "r") as fp: 1620 for line in fp: 1621 line = line.strip() 1622 if ((args.bookmarks or args.url) and 1623 any((line.startswith(x) for x in ("go", "g", "tour", "t"))) 1624 ): 1625 if args.bookmarks: 1626 print("Skipping rc command \"%s\" due to --bookmarks option." % line) 1627 else: 1628 print("Skipping rc command \"%s\" due to provided URLs." % line) 1629 continue 1630 gc.cmdqueue.append(line) 1631 1632 # Say hi 1633 print("Welcome to AV-98!") 1634 if args.restricted: 1635 print("Restricted mode engaged!") 1636 print("Enjoy your patrol through Geminispace...") 1637 1638 # Act on args 1639 if args.tls_cert: 1640 # If tls_key is None, python will attempt to load the key from tls_cert. 1641 gc._activate_client_cert(args.tls_cert, args.tls_key) 1642 if args.bookmarks: 1643 gc.cmdqueue.append("bookmarks") 1644 elif args.url: 1645 if len(args.url) == 1: 1646 gc.cmdqueue.append("go %s" % args.url[0]) 1647 else: 1648 for url in args.url: 1649 if not url.startswith("gemini://"): 1650 url = "gemini://" + url 1651 gc.cmdqueue.append("tour %s" % url) 1652 gc.cmdqueue.append("tour") 1653 1654 # Endless interpret loop 1655 while True: 1656 try: 1657 gc.cmdloop() 1658 except KeyboardInterrupt: 1659 print("") 1660 1661 if __name__ == '__main__': 1662 main()