commit 7609836e3cd7b13f041996c2711853916aba3abe
parent af61f0143bc32d5dcbb8af0f854e5fe8140c6e85
Author: Remco <me@rwv.io>
Date: Fri, 13 Nov 2020 14:24:36 +0100
Move all whoosh related stuff into separate module
Signed-off-by: Natalie Pendragon <natpen@natpen.net>
Diffstat:
5 files changed, 165 insertions(+), 169 deletions(-)
diff --git a/gus/build_index.py b/gus/build_index.py
@@ -2,15 +2,9 @@ import argparse
import logging
from datetime import datetime, timedelta
-import os
-import pathlib
from urllib.parse import uses_relative, uses_netloc
-from whoosh.analysis import FancyAnalyzer
-from whoosh.fields import Schema, TEXT, DATETIME, STORED, ID, NUMERIC
-from whoosh.filedb.filestore import FileStorage
-from whoosh.index import open_dir
-
+from . import constants
from gus.crawl import EXCLUDED_URL_PREFIXES
from gus.lib.db_model import init_db, Page
from gus.lib.index_statistics import (
@@ -18,57 +12,17 @@ from gus.lib.index_statistics import (
persist_statistics,
log_index_statistics,
)
-from gus.lib.whoosh_extensions import UrlAnalyzer
import gus.lib.logging
+from gus.lib.logging import strip_control_chars
+import gus.lib.search as search
# hack: the built-in methods in urllib need to know the
# Gemini protocol exists
uses_relative.append("gemini")
uses_netloc.append("gemini")
-INDEX_DIR_CURRENT = "index"
-# INDEX_DIR_BACKUP = INDEX_DIR_CURRENT + ".bak"
-INDEX_DIR_NEW = INDEX_DIR_CURRENT + ".new"
-
-# def backup_old_index(index_dir, backup_dir):
-# last_index_modification_time = datetime.fromtimestamp(os.path.getmtime(index_dir))
-# print("Backing up last index from {:%Y-%m-%d}...".format(last_index_modification_time))
-# print("--------------------------")
-# backup_index_dir = backup_dir + "/{:%Y-%m-%d}".format(last_index_modification_time)
-# shutil.rmtree(backup_index_dir, ignore_errors=True)
-# shutil.copytree(index_dir, backup_index_dir)
-
-
-def create_index(index_dir):
- # shutil.rmtree(index_dir, ignore_errors=True)
- pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
- schema = Schema(
- url_id=ID(unique=True,),
- url=TEXT(field_boost=2.0, stored=True, analyzer=UrlAnalyzer(),),
- fetchable_url=STORED(),
- domain=TEXT(analyzer=UrlAnalyzer(),),
- port=NUMERIC(int, 32, signed=False, stored=True,),
- content_type=TEXT(stored=True,),
- charset=ID(stored=True,),
- lang=ID(stored=True,),
- content=TEXT(analyzer=FancyAnalyzer(), spelling=True, stored=True,),
- prompt=TEXT(analyzer=FancyAnalyzer(), stored=True,),
- size=NUMERIC(
- int,
- # this means GUS will have problems indexing responses over ~2GB
- 32,
- signed=False,
- stored=True,
- ),
- backlink_count=NUMERIC(
- int, 16, signed=False, stored=True, # num bits, so max value is 65k
- ),
- indexed_at=DATETIME(stored=True,),
- )
- index_storage.create_index(schema)
-
-def index_page(page, indexed_urls):
+def index_page(index, page, indexed_urls):
should_skip = False
for excluded_prefix in EXCLUDED_URL_PREFIXES:
if page.normalized_url.startswith(excluded_prefix):
@@ -77,17 +31,17 @@ def index_page(page, indexed_urls):
if should_skip:
logging.debug(
"URL prefix matches exclusion list, skipping: %s",
- gus.lib.logging.strip_control_chars(page.url),
+ strip_control_chars(page.url),
)
return False
if page.fetchable_url in indexed_urls:
logging.debug(
"Page already indexed, skipping: %s",
- gus.lib.logging.strip_control_chars(page.url),
+ strip_control_chars(page.url),
)
return False
- logging.info("Indexing page: %s", gus.lib.logging.strip_control_chars(page.url))
+ logging.info("Indexing page: %s", strip_control_chars(page.url))
u = page.url.rstrip("/")
external_backlinks = Page.raw(
@@ -108,7 +62,6 @@ GROUP BY p_from.normalized_url""",
backlink_urls = [b.url for b in external_backlinks.execute()]
backlink_count = len(backlink_urls)
- logging.info("Indexing page: %s", gus.lib.logging.strip_control_chars(page.url))
document = {
"url_id": page.url,
@@ -126,54 +79,35 @@ GROUP BY p_from.normalized_url""",
"content": page.content,
}
try:
- index_writer.add_document(**document)
+ index.add_document(document)
return True
- except:
- logging.warn(
- "Failed to index page: %s", gus.lib.logging.strip_control_chars(page.url)
+ except Exception as e:
+ logging.exception(
+ "Failed to index page: %s: %s",
+ strip_control_chars(page.url),
+ e
)
return False
-def load_indexed_urls(index_dir):
- indexed_urls = []
- ix = open_dir(index_dir)
- with ix.reader() as reader:
- all_stored_fields = reader.all_stored_fields()
- # TODO: change this (back) to normalized url
- # indexed_urls = [GeminiResource(f["url"]).normalized_url for f in all_stored_fields]
- indexed_urls = [f["fetchable_url"] for f in all_stored_fields]
- return indexed_urls
-
-
-def invalidate_recent_results(invalidation_window):
+def invalidate_recent_results(index, invalidation_window):
recency_minimum = datetime.now() - timedelta(hours=invalidation_window)
pages = Page.select().where(
Page.indexed_at.is_null(False), Page.indexed_at > recency_minimum
)
+ logging.debug('Invalidating %d pages %s', pages.count(), recency_minimum)
for page in pages:
- index_writer.delete_by_term("url_id", page.url, searcher=None)
+ index.delete_by_term("url_id", page.url)
def build_index(should_run_destructive=False, invalidation_window=0):
- global index_dir
- index_dir = INDEX_DIR_NEW if should_run_destructive else INDEX_DIR_CURRENT
- global index_storage
- index_storage = FileStorage(index_dir)
- if should_run_destructive:
- # backup_old_index(INDEX_DIR_CURRENT, INDEX_DIR_BACKUP)
- create_index(index_dir)
- global db
+ index_dir = constants.INDEX_DIR_NEW if should_run_destructive else constants.INDEX_DIR
+
db = init_db(index_dir + "/gus.sqlite")
- global ix
- ix = index_storage.open_index()
- global index_writer
- index_writer = ix.writer()
-
- invalidate_recent_results(invalidation_window)
- indexed_urls = (
- [] if should_run_destructive else load_indexed_urls(INDEX_DIR_CURRENT)
- )
+ index = search.Index(index_dir, should_run_destructive)
+
+ invalidate_recent_results(index, invalidation_window)
+ indexed_urls = ([] if should_run_destructive else index.indexed_urls())
pages = Page.raw(
"""SELECT p.*, MAX(c.timestamp) AS crawl_timestamp
@@ -183,31 +117,14 @@ ON p.id == c.page_id
GROUP BY p.normalized_url"""
)
- i = 0
for page in pages.iterator():
- was_indexed = index_page(page, indexed_urls)
- if was_indexed:
- i += 1
- # NOTE(np): Whoosh's index writing doesn't do any intermediate
- # flushing of index segments to disk, which can cause OOM
- # errors as Geminispace has grown. This bit of code will force
- # it to flush segments to disk every 5000 documents, which
- # should scale well with Geminispace going forward.
- if i % 5000 == 0:
- logging.debug("Committing index.")
- index_writer.commit()
- index_writer = ix.writer()
- logging.debug("Committing index for the last time.")
- index_writer.commit()
+ index_page(index, page, indexed_urls)
index_statistics = compute_index_statistics(db)
log_index_statistics(index_statistics)
persist_statistics(index_statistics, None, should_run_destructive, "statistics.csv")
- # if should_run_destructive:
- # # replace current index with new index
- # shutil.rmtree(INDEX_DIR_CURRENT, ignore_errors=True)
- # shutil.move(INDEX_DIR_NEW, INDEX_DIR_CURRENT)
+ index.close()
logging.info("Finished!")
diff --git a/gus/constants.py b/gus/constants.py
@@ -1,4 +1,5 @@
INDEX_DIR = "index"
+INDEX_DIR_NEW = "index.new"
SEED_REQUEST_FILE = "seed-requests.txt"
STATISTICS_FILE = "statistics.csv"
DB_FILENAME = "gus.sqlite"
diff --git a/gus/crawl.py b/gus/crawl.py
@@ -21,10 +21,6 @@ import gus.lib.logging
uses_relative.append("gemini")
uses_netloc.append("gemini")
-INDEX_DIR_CURRENT = "index"
-INDEX_DIR_BACKUP = INDEX_DIR_CURRENT + ".bak"
-INDEX_DIR_NEW = INDEX_DIR_CURRENT + ".new"
-
# These are checked against normalized_url, so they should be
# prepended with the gemini:// protocol, be all lowercased, and
# not have the port specified if it is 1965.
@@ -722,13 +718,13 @@ def resolve_feed_content_urls(feed_file=constants.FEED_FILE):
def recrawl_feeds():
content_urls = resolve_feed_content_urls()
global index_dir
- index_dir = INDEX_DIR_CURRENT
+ index_dir = constants.INDEX_DIR
global db
db = init_db(f"{index_dir}/{constants.DB_FILENAME}")
global max_crawl_depth
max_crawl_depth = 0
global robot_file_map
- robot_file_map = unpickle_robot_file_map(INDEX_DIR_CURRENT)
+ robot_file_map = unpickle_robot_file_map(constants.INDEX_DIR)
global domain_hit_timings
domain_hit_timings = {}
@@ -747,14 +743,14 @@ def run_crawl(should_run_destructive=False, seed_urls=[]):
# TODO: track failed domain/page attempts, and don't reattempt for 15mins
global index_dir
- index_dir = INDEX_DIR_NEW if should_run_destructive else INDEX_DIR_CURRENT
+ index_dir = constants.INDEX_DIR_NEW if should_run_destructive else constants.INDEX_DIR
pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
global db
db = init_db(f"{index_dir}/{constants.DB_FILENAME}")
global robot_file_map
robot_file_map = (
- {} if should_run_destructive else unpickle_robot_file_map(INDEX_DIR_CURRENT)
+ {} if should_run_destructive else unpickle_robot_file_map(constants.INDEX_DIR)
)
global domain_hit_timings
domain_hit_timings = {}
diff --git a/gus/lib/search.py b/gus/lib/search.py
@@ -0,0 +1,125 @@
+from urllib.parse import quote
+import pathlib
+import logging
+
+import whoosh.qparser
+import whoosh.highlight
+from whoosh.analysis import FancyAnalyzer
+from whoosh.fields import Schema, TEXT, DATETIME, STORED, ID, NUMERIC
+from whoosh.filedb.filestore import FileStorage
+
+from gus.lib.whoosh_extensions import UrlAnalyzer, GeminiFormatter, GeminiScorer
+
+
+class Index:
+ def __init__(self, index_dir, should_run_destructive=False):
+ index_storage = FileStorage(index_dir)
+
+ if should_run_destructive:
+ pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
+ self._index = self._create(index_storage)
+ else:
+ self._index = index_storage.open_index()
+
+ self._searcher = self._index.searcher()
+
+ self._query_parser = whoosh.qparser.MultifieldParser(
+ ["content", "url", "prompt"],
+ self._index.schema,
+ group=whoosh.qparser.OrGroup.factory(0.99),
+ )
+ self._query_parser.add_plugin(whoosh.qparser.RegexPlugin())
+ self._query_parser.add_plugin(whoosh.qparser.GtLtPlugin())
+ self._query_parser.remove_plugin_class(whoosh.qparser.WildcardPlugin)
+ self._query_parser.remove_plugin_class(whoosh.qparser.BoostPlugin)
+ self._query_parser.remove_plugin_class(whoosh.qparser.RangePlugin)
+
+ self._highlighter = whoosh.highlight.Highlighter(
+ formatter=GeminiFormatter(),
+ fragmenter=whoosh.highlight.ContextFragmenter(maxchars=160, surround=80),
+ scorer=GeminiScorer(),
+ order=whoosh.highlight.SCORE,
+ )
+
+ self._write_counter = 0
+ self._writer = None
+
+ def _create(self, index_storage):
+ schema = Schema(
+ url_id=ID(unique=True),
+ url=TEXT(field_boost=2.0, stored=True, analyzer=UrlAnalyzer()),
+ fetchable_url=STORED(),
+ domain=TEXT(analyzer=UrlAnalyzer()),
+ port=NUMERIC(int, 32, signed=False, stored=True),
+ content_type=TEXT(stored=True),
+ charset=ID(stored=True),
+ lang=ID(stored=True),
+ content=TEXT(analyzer=FancyAnalyzer(), spelling=True, stored=True),
+ prompt=TEXT(analyzer=FancyAnalyzer(), stored=True),
+ size=NUMERIC(
+ int,
+ # this means GUS will have problems indexing responses over ~2GB
+ 32,
+ signed=False,
+ stored=True,
+ ),
+ backlink_count=NUMERIC(
+ int, 16, signed=False, stored=True, # num bits, so max value is 65k
+ ),
+ indexed_at=DATETIME(stored=True),
+ )
+ return index_storage.create_index(schema)
+
+ def close(self):
+ if self._writer:
+ self._writer.commit()
+ self._index.close()
+
+ def _rolling_writer(self):
+ self._write_counter += 1
+ if self._writer and self._write_counter % 10 == 0:
+ logging.debug("Committing index.")
+ self._writer.commit()
+ self._writer = None
+
+ if not self._writer:
+ self._writer = self._index.writer()
+
+ return self._writer
+
+ def add_document(self, document):
+ self._rolling_writer().add_document(**document)
+
+ def delete_by_term(self, key, val): # TODO delete_document
+ self._rolling_writer().delete_by_term(key, val, searcher=None)
+
+ def indexed_urls(self):
+ indexed_urls = []
+ with self._index.reader() as reader:
+ logging.debug("Loading list of known URLs from index")
+ all_stored_fields = reader.all_stored_fields()
+ indexed_urls = [f["fetchable_url"] for f in all_stored_fields]
+ return indexed_urls
+
+ def parse_query(self, query):
+ return self._query_parser.parse(query)
+
+ def highlight(self, result):
+ if "content" in result:
+ return self._highlighter.highlight_hit(result, "content", top=1)
+ else:
+ return ""
+
+ def search(self, query, pagenr, pagelen=10):
+ return self._searcher.search_page(query, pagenr, pagelen)
+
+ def suggestions(self, query):
+ suggestions = []
+ corrector = self._searcher.corrector("content")
+ for query_part in query.split(" "):
+ query_part_suggestions = corrector.suggest(query_part, limit=3)
+ suggestions.extend(
+ {"raw": suggestion, "quoted": quote(suggestion)}
+ for suggestion in query_part_suggestions
+ )
+ return suggestions
diff --git a/serve/models.py b/serve/models.py
@@ -1,56 +1,32 @@
import re
from datetime import datetime
-from urllib.parse import quote
-
-from peewee import fn, SqliteDatabase
-from whoosh import highlight, qparser
-from whoosh.index import open_dir
from . import constants
-from gus.lib.db_model import init_db, Crawl, Link, Page, Search, Thread
+from gus.lib.db_model import init_db, Page, Search, Thread
from gus.lib.gemini import GeminiResource
from gus.lib.index_statistics import (
compute_index_statistics,
load_all_statistics_from_file,
)
from gus.lib.misc import bytes2human
-from gus.lib.whoosh_extensions import GeminiFormatter, GeminiScorer
+import gus.lib.search as search
+
+TEXT_CONTENT_TYPE = ["text/plain", "text/gemini", "text/markdown"]
class GUS:
def __init__(self):
- self.ix = open_dir(constants.INDEX_DIR)
- self.searcher = self.ix.searcher()
- self.query_parser = GUS.init_query_parser(self.ix)
- self.gemini_highlighter = highlight.Highlighter(
- formatter=GeminiFormatter(),
- fragmenter=highlight.ContextFragmenter(maxchars=160, surround=80),
- scorer=GeminiScorer(),
- order=highlight.SCORE,
- )
-
+ self.index = search.Index(constants.INDEX_DIR)
self.db = init_db(f"{constants.INDEX_DIR}/{constants.DB_FILENAME}")
self.statistics = compute_index_statistics(self.db)
self.statistics_historical_overall = load_all_statistics_from_file(
constants.STATISTICS_FILE
)
- def init_query_parser(ix):
- or_group = qparser.OrGroup.factory(0.99)
- query_parser = qparser.MultifieldParser(
- ["content", "url", "prompt"], ix.schema, group=or_group
- )
- query_parser.add_plugin(qparser.RegexPlugin())
- query_parser.add_plugin(qparser.GtLtPlugin())
- query_parser.remove_plugin_class(qparser.WildcardPlugin)
- query_parser.remove_plugin_class(qparser.BoostPlugin)
- query_parser.remove_plugin_class(qparser.RangePlugin)
- return query_parser
-
def search_index(self, query, requested_page):
Search.create(query=query, timestamp=datetime.utcnow())
- query = self.query_parser.parse(query)
- results = self.searcher.search_page(query, requested_page, pagelen=10)
+ query = self.index.parse_query(query)
+ results = self.index.search(query, requested_page, pagelen=10)
return (
len(results),
[
@@ -63,13 +39,7 @@ class GUS:
"charset": result["charset"] if "charset" in result else "none",
"size": result["size"] if "size" in result else 0,
"prompt": result["prompt"] if "prompt" in result else "",
- "highlights": self.gemini_highlighter.highlight_hit(
- result, "content", top=1
- )
- if "content" in result
- and result["content_type"]
- in ["text/plain", "text/gemini", "text/markdown"]
- else "",
+ "highlights": self.index.highlight(result) if result["content_type"] in TEXT_CONTENT_TYPE else "",
"link_text": GUS._get_link_text(result),
"backlink_count": result["backlink_count"],
}
@@ -167,12 +137,12 @@ ORDER BY t.thread_length DESC, t.updated_at DESC, t.id ASC, tp.address ASC"""
for thread_member in threads_query.iterator():
if thread_member.updated_at.date() != last_date:
threads.append(
- {"threads": [], "date": thread_member.updated_at,}
+ {"threads": [], "date": thread_member.updated_at}
)
last_date = thread_member.updated_at.date()
if thread_member.id != last_id:
threads[-1]["threads"].append(
- {"members": [], "updated_at": thread_member.updated_at,}
+ {"members": [], "updated_at": thread_member.updated_at}
)
last_id = thread_member.id
threads[-1]["threads"][-1]["members"].append(
@@ -197,11 +167,6 @@ ORDER BY t.thread_length DESC, t.updated_at DESC, t.id ASC, tp.address ASC"""
result["url"][9:], result["content_type"], prompt_suffix
)
else:
- lang_str = (
- ", {}".format(result["lang"])
- if "lang" in result and result["lang"] != "none"
- else ""
- )
link_text = "{} ({}, {})".format(
result["url"][9:],
result["content_type"],
@@ -255,15 +220,7 @@ LIMIT 50
return newest_pages_query.execute()
def get_search_suggestions(self, query):
- suggestions = []
- corrector = self.searcher.corrector("content")
- for query_part in query.split(" "):
- query_part_suggestions = corrector.suggest(query_part, limit=3)
- suggestions.extend(
- {"raw": suggestion, "quoted": quote(suggestion)}
- for suggestion in query_part_suggestions
- )
- return suggestions
+ return self.index.suggestions(query)
def compute_requested_results_page(request_path):