204 lines
6.7 KiB
Python
204 lines
6.7 KiB
Python
"""
|
|
atextcrawler application.
|
|
"""
|
|
|
|
import asyncio
|
|
import importlib
|
|
import logging
|
|
import signal
|
|
import sys
|
|
|
|
from systemd.journal import JournalHandler
|
|
|
|
from .config import Config
|
|
from .crawl import CrawlWorker
|
|
from .db import PGPool
|
|
from .search import shutdown_engine, startup_engine
|
|
from .site import load_seeds, process_site_queue
|
|
|
|
plugin_names = ['filter_site', 'filter_site_path', 'filter_resource_path']
|
|
|
|
|
|
class Application:
|
|
"""
|
|
atextcrawler application.
|
|
|
|
The basic structure of the application is this:
|
|
* one site crawler works just on the site_queue: fetching start pages
|
|
of sites and storing updated site information in table sites
|
|
* N other CrawlWorkers each do this in a loop:
|
|
checkout a site that is due for crawl and crawl its resources;
|
|
they fill the site_queue
|
|
"""
|
|
|
|
running = True
|
|
|
|
def __init__(self, config=None):
|
|
if config is None:
|
|
config = Config().get()
|
|
self.config = config
|
|
self.instance_name = config['instance_name']
|
|
self.instance_type = config['instance_type']
|
|
log_level = getattr(
|
|
logging, config['log_level'].upper(), logging.CRITICAL
|
|
)
|
|
self.logger = logging.getLogger('atextcrawler')
|
|
self.logger.setLevel(log_level)
|
|
if self.instance_type == 'dev':
|
|
self.logger.addHandler(logging.StreamHandler())
|
|
else:
|
|
self.logger.addHandler(
|
|
JournalHandler(SYSLOG_IDENTIFIER=self.instance_name)
|
|
)
|
|
self.logger.propagate = False
|
|
self.channel = 'atextcrawler_' + self.config['instance_name']
|
|
msg = f'Instance "{self}" initializing'
|
|
self.logger.info(msg)
|
|
self.plugins = self._load_plugins()
|
|
|
|
def __str__(self):
|
|
return self.instance_name
|
|
|
|
def _load_plugins(self):
|
|
"""
|
|
Return a dict mapping plugin names to modules.
|
|
"""
|
|
modules = {}
|
|
old_path = sys.path
|
|
for name in plugin_names:
|
|
try:
|
|
plugins_dir = self.config['plugins_dir']
|
|
sys.path.insert(0, plugins_dir)
|
|
module = importlib.import_module(name)
|
|
msg = f'Loading plugin "{name}" from {plugins_dir}'
|
|
except:
|
|
module = importlib.import_module(
|
|
'atextcrawler.plugin_defaults.' + name
|
|
)
|
|
msg = f'Loading plugin "{name}" from default location'
|
|
self.logger.info(msg)
|
|
modules[name] = module
|
|
sys.path = old_path
|
|
return modules
|
|
|
|
async def run(self):
|
|
"""
|
|
Application lifecycle.
|
|
"""
|
|
await asyncio.gather(self.wait_for_shutdown(), self.startup())
|
|
await self.shutdown()
|
|
|
|
async def startup(self):
|
|
"""
|
|
Asynchronous startup.
|
|
"""
|
|
msg = f'Instance "{self}" starting components'
|
|
self.logger.info(msg)
|
|
self.search_engine = await startup_engine(self.config)
|
|
self.pgpool = await PGPool(self.config['postgresql'])
|
|
self.pool = self.pgpool.pool
|
|
await load_seeds(self.config, self.pool)
|
|
await reset_site_locks(self.pool)
|
|
worker_count = self.config['crawl']['workers']
|
|
self.workers = []
|
|
for worker_number in range(worker_count):
|
|
worker = await CrawlWorker(self, worker_number, self.pool)
|
|
self.workers.append(worker)
|
|
worker_coros = [worker.run() for worker in self.workers]
|
|
await asyncio.gather(
|
|
process_site_queue(self, self.pool),
|
|
self.handle_notifications(),
|
|
*worker_coros,
|
|
)
|
|
|
|
async def wait_for_shutdown(self):
|
|
"""
|
|
Create a shutdown event (:class:`asyncio.Event`) and wait for it.
|
|
|
|
The event will be set by a signal handler for SIGINT
|
|
and SIGTERM signals (see :meth:`Application.handle_shutdown_signal`).
|
|
"""
|
|
self.shutdown_event = asyncio.Event()
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
asyncio.get_running_loop().add_signal_handler(
|
|
sig, self.handle_shutdown_signal
|
|
)
|
|
self.logger.debug(f'{self} waiting for shutdown event')
|
|
await self.shutdown_event.wait()
|
|
self.logger.info(f'Instance "{self}" shutdown event')
|
|
|
|
def handle_shutdown_signal(self):
|
|
"""
|
|
Handle shutdown signal.
|
|
"""
|
|
if self.shutdown_event.is_set():
|
|
return
|
|
self.shutdown_event.set()
|
|
self.running = False
|
|
|
|
async def shutdown(self):
|
|
"""
|
|
Asynchronous shutdown.
|
|
"""
|
|
self.logger.debug(f'Instance "{self}" shutting down')
|
|
await self.notify_conn.remove_listener(
|
|
self.channel, self.listen_callback
|
|
)
|
|
await self.pool.release(self.notify_conn)
|
|
for worker in self.workers:
|
|
await worker.shutdown()
|
|
await shutdown_engine(self.search_engine)
|
|
await self.pgpool.shutdown()
|
|
self.logger.info(f'Instance "{self}" shutdown completed')
|
|
|
|
async def handle_notifications(self):
|
|
"""
|
|
Handle notifications using PostgreSQL's NOTIFY/LISTEN.
|
|
"""
|
|
self.notify_conn = await self.pool.acquire()
|
|
await self.notify_conn.add_listener(self.channel, self.listen_callback)
|
|
|
|
def listen_callback(self, *args):
|
|
"""
|
|
Handle notify event from PostgreSQL.
|
|
"""
|
|
channel = args[2]
|
|
if channel != self.channel:
|
|
return
|
|
message = args[3]
|
|
if message.startswith('site_update '):
|
|
try:
|
|
site_id = int(message.removeprefix('site_update '))
|
|
for worker in self.workers:
|
|
if worker.site and site_id == worker.site.id_:
|
|
msg = (
|
|
f'Cancelling worker {worker.worker_number}'
|
|
f' (site={site_id}) due to site_update'
|
|
)
|
|
self.logger.info(msg)
|
|
worker.running = False
|
|
except:
|
|
pass
|
|
|
|
async def sleep(self, duration, t_slice=3):
|
|
"""
|
|
Sleep for *duration* seconds while self.running.
|
|
|
|
Check self.running every *t_slice* seconds.
|
|
"""
|
|
remaining = duration
|
|
while remaining > 0 and self.running:
|
|
await asyncio.sleep(min(t_slice, remaining))
|
|
remaining -= t_slice
|
|
|
|
|
|
async def reset_site_locks(pool):
|
|
"""
|
|
Remove locks leftover from last run: Set crawl_active=false for all sites.
|
|
|
|
This is relevant when the application was not shutdown properly (e.g.
|
|
when the process was killed).
|
|
"""
|
|
async with pool.acquire() as conn:
|
|
sql = "UPDATE site SET crawl_active = false WHERE crawl_active = true"
|
|
await conn.execute(sql)
|