338 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			338 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|  | #!/usr/bin/env python3 | ||
|  | 
 | ||
|  | """
 | ||
|  | Storage to PostgreSQL. | ||
|  | """
 | ||
|  | 
 | ||
|  | import datetime | ||
|  | import json | ||
|  | import re | ||
|  | import time | ||
|  | from collections import defaultdict | ||
|  | from traceback import format_exc | ||
|  | from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | ||
|  | import psycopg2 | ||
|  | import psycopg2.extras | ||
|  | from systemd import journal | ||
|  | import settings | ||
|  | from storage_setup import ( | ||
|  |     get_create_table_stmts, | ||
|  |     get_sql_prepared_statement, | ||
|  |     get_sql_execute_prepared_statement, | ||
|  |     table_fields, | ||
|  | ) | ||
|  | 
 | ||
|  | 
 | ||
|  | def get_latest_timestamp(curs: psycopg2.extras.RealDictCursor) -> int: | ||
|  |     """
 | ||
|  |     Fetch the latest timestamp from the database. | ||
|  | 
 | ||
|  |     Return the latest timestamp of a message transfer from the database. | ||
|  |     If there are no records yet, return 0. | ||
|  |     """
 | ||
|  |     last = 0 | ||
|  |     curs.execute( | ||
|  |         "SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_from" | ||
|  |     ) | ||
|  |     last1 = curs.fetchone()['last'] | ||
|  |     if last1: | ||
|  |         last = max( | ||
|  |             last, (last1 - datetime.datetime(1970, 1, 1)).total_seconds() | ||
|  |         ) | ||
|  |     curs.execute( | ||
|  |         "SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_to" | ||
|  |     ) | ||
|  |     last2 = curs.fetchone()['last'] | ||
|  |     if last2: | ||
|  |         last = max( | ||
|  |             last, (last2 - datetime.datetime(1970, 1, 1)).total_seconds() | ||
|  |         ) | ||
|  |     return last | ||
|  | 
 | ||
|  | 
 | ||
|  | def delete_old_deliveries(curs: psycopg2.extras.RealDictCursor) -> None: | ||
|  |     """
 | ||
|  |     Delete deliveries older than the configured number of days. | ||
|  | 
 | ||
|  |     See config param *delete_deliveries_after_days*. | ||
|  |     """
 | ||
|  |     max_days = settings.delete_deliveries_after_days | ||
|  |     if max_days: | ||
|  |         now = datetime.datetime.utcnow() | ||
|  |         dt = datetime.timedelta(days=max_days) | ||
|  |         t0 = now - dt | ||
|  |         curs.execute("DELETE FROM delivery_from WHERE t_i < %s", (t0,)) | ||
|  |         curs.execute("DELETE FROM delivery_to WHERE t_i < %s", (t0,)) | ||
|  |         curs.execute("DELETE FROM noqueue WHERE t < %s", (t0,)) | ||
|  | 
 | ||
|  | 
 | ||
|  | def store_delivery_items( | ||
|  |     cursor, | ||
|  |     cache: List[dict], | ||
|  |     debug: List[str] = [] | ||
|  | ) -> None: | ||
|  |     """
 | ||
|  |     Store cached delivery items into the database. | ||
|  | 
 | ||
|  |     Find queue_ids in *cache* and group delivery items by | ||
|  |     them, but separately for delivery types 'from' and 'to'. | ||
|  |     In addition, collect delivery items with queue_id is None. | ||
|  | 
 | ||
|  |     After grouping we merge all items withing a group into a | ||
|  |     single item. So we can combine several SQL queries into | ||
|  |     a single one, which improves performance significantly. | ||
|  | 
 | ||
|  |     Then store the merged items and the deliveries with | ||
|  |     queue_id is None. | ||
|  |     """
 | ||
|  |     if 'all' in debug or 'sql' in debug: | ||
|  |         print(f'Storing {len(cache)} messages.') | ||
|  |     if not cache: | ||
|  |         return | ||
|  |     from_items, to_items, noqueue_items = _group_delivery_items(cache) | ||
|  |     deliveries_from = _merge_delivery_items(from_items, item_type='from') | ||
|  |     deliveries_to = _merge_delivery_items(to_items, item_type='to') | ||
|  |     _store_deliveries(cursor, 'delivery_from', deliveries_from, debug=debug) | ||
|  |     _store_deliveries(cursor, 'delivery_to', deliveries_to, debug=debug) | ||
|  |     _store_deliveries(cursor, 'noqueue', noqueue_items, debug=debug) | ||
|  | 
 | ||
|  | 
 | ||
|  | FromItems = Dict[str, List[dict]] | ||
|  | 
 | ||
|  | 
 | ||
|  | ToItems = Dict[Tuple[str, Optional[str]], List[dict]] | ||
|  | 
 | ||
|  | 
 | ||
|  | NoqueueItems = Dict[int, dict] | ||
|  | 
 | ||
|  | 
 | ||
|  | def _group_delivery_items( | ||
|  |     cache: List[dict] | ||
|  | ) -> Tuple[FromItems, ToItems, NoqueueItems]: | ||
|  |     """
 | ||
|  |     Group delivery items by type and queue_id. | ||
|  | 
 | ||
|  |     Return items of type 'from', of type 'to' and items without | ||
|  |     queue_id. | ||
|  |     """
 | ||
|  |     delivery_from_items: FromItems = defaultdict(list) | ||
|  |     delivery_to_items: ToItems = defaultdict(list) | ||
|  |     noqueue_items: NoqueueItems = {} | ||
|  |     noqueue_i = 1 | ||
|  |     for item in cache: | ||
|  |         if item.get('queue_id'): | ||
|  |             queue_id = item['queue_id'] | ||
|  |             if item.get('type') == 'from': | ||
|  |                 delivery_from_items[queue_id].append(item) | ||
|  |             else: | ||
|  |                 recipient = item.get('recipient') | ||
|  |                 delivery_to_items[(queue_id, recipient)].append(item) | ||
|  |         else: | ||
|  |             noqueue_items[noqueue_i] = item | ||
|  |             noqueue_i += 1 | ||
|  |     return delivery_from_items, delivery_to_items, noqueue_items | ||
|  | 
 | ||
|  | 
 | ||
|  | def _merge_delivery_items( | ||
|  |     delivery_items: Union[FromItems, ToItems], | ||
|  |     item_type: str = 'from', | ||
|  | ) -> Dict[Union[str, Tuple[str, Optional[str]]], dict]: | ||
|  |     """
 | ||
|  |     Compute deliveries by combining multiple delivery items. | ||
|  | 
 | ||
|  |     Take lists of delivery items for each queue_id (in case | ||
|  |     of item_type=='from') or for (queue_id, recipient)-pairs | ||
|  |     (in case of item_type='to'). | ||
|  |     Each delivery item is a dict obtained from one log message. | ||
|  |     The dicts are consecutively updated (merged), except for the | ||
|  |     raw log messages (texts) which are collected into a list. | ||
|  |     The fields of the resulting delivery are filtered according | ||
|  |     to the target table. | ||
|  |     Returned is a dict mapping queue_ids (in case | ||
|  |     of item_type=='from') or (queue_id, recipient)-pairs | ||
|  |     (in case of item_type='to') to deliveries. | ||
|  |     """
 | ||
|  |     deliveries = {} | ||
|  |     for group, items in delivery_items.items(): | ||
|  |         delivery = {} | ||
|  |         messages = [] | ||
|  |         for item in items: | ||
|  |             message = item.pop('message') | ||
|  |             identifier = item.pop('identifier') | ||
|  |             pid = item.pop('pid') | ||
|  |             messages.append(f'{identifier}[{pid}]: {message}') | ||
|  |             delivery.update(item) | ||
|  |         delivery['messages'] = messages | ||
|  |         deliveries[group] = delivery | ||
|  |     return deliveries | ||
|  | 
 | ||
|  | 
 | ||
|  | def _store_deliveries( | ||
|  |     cursor: psycopg2.extras.RealDictCursor, | ||
|  |     table_name: str, | ||
|  |     deliveries: Dict[Any, dict], | ||
|  |     debug: List[str] = [], | ||
|  | ) -> None: | ||
|  |     """
 | ||
|  |     Store grouped and merged delivery items. | ||
|  |     """
 | ||
|  |     if not deliveries: | ||
|  |         return | ||
|  |     n = len(deliveries.values()) | ||
|  |     t0 = time.time() | ||
|  |     cursor.execute('BEGIN') | ||
|  |     _store_deliveries_batch(cursor, table_name, deliveries.values()) | ||
|  |     cursor.execute('COMMIT') | ||
|  |     t1 = time.time() | ||
|  |     if 'all' in debug or 'sql' in debug: | ||
|  |         milliseconds = (t1 - t0) * 1000 | ||
|  |         print( | ||
|  |             '*' * 10, | ||
|  |             f'SQL transaction time {table_name}: ' | ||
|  |             f'{milliseconds:.2f} ms ({n} deliveries)', | ||
|  |         ) | ||
|  | 
 | ||
|  | 
 | ||
|  | def _store_deliveries_batch( | ||
|  |     cursor: psycopg2.extras.RealDictCursor, | ||
|  |     table_name: str, | ||
|  |     deliveries: Iterable[dict] | ||
|  | ) -> None: | ||
|  |     """
 | ||
|  |     Store *deliveries* (i.e., grouped and merged delivery items). | ||
|  | 
 | ||
|  |     We use a prepared statement and execute_batch() from | ||
|  |     psycopg2.extras to improve performance. | ||
|  |     """
 | ||
|  |     rows = [] | ||
|  |     for delivery in deliveries: | ||
|  |         # get values for all fields of the table | ||
|  |         field_values: List[Any] = [] | ||
|  |         t = delivery.get('t') | ||
|  |         delivery['t_i'] = t | ||
|  |         delivery['t_f'] = t | ||
|  |         for field in table_fields[table_name]: | ||
|  |             if field in delivery: | ||
|  |                 if field == 'messages': | ||
|  |                     field_values.append(json.dumps(delivery[field])) | ||
|  |                 else: | ||
|  |                     field_values.append(delivery[field]) | ||
|  |             else: | ||
|  |                 field_values.append(None) | ||
|  |         rows.append(field_values) | ||
|  |     sql = get_sql_execute_prepared_statement(table_name) | ||
|  |     try: | ||
|  |         psycopg2.extras.execute_batch(cursor, sql, rows) | ||
|  |     except Exception as err: | ||
|  |         msg = f'SQL statement failed: "{sql}" -- the values were: {rows}' | ||
|  |         journal.send(msg, PRIORITY=journal.LOG_ERR) | ||
|  | 
 | ||
|  | 
 | ||
|  | def init_db(config: dict) -> Optional[str]: | ||
|  |     """
 | ||
|  |     Initialize database; if ok return DSN, else None. | ||
|  | 
 | ||
|  |     Try to get parameters for database access, | ||
|  |     check existence of tables and possibly create them. | ||
|  |     """
 | ||
|  |     dsn = _get_dsn(config) | ||
|  |     if dsn: | ||
|  |         ok = _create_tables(dsn) | ||
|  |         if not ok: | ||
|  |             return None | ||
|  |     return dsn | ||
|  | 
 | ||
|  | 
 | ||
|  | def _get_dsn(config: dict) -> Optional[str]: | ||
|  |     """
 | ||
|  |     Return the DSN (data source name) from the *config*. | ||
|  |     """
 | ||
|  |     try: | ||
|  |         postgresql_config = config['postgresql'] | ||
|  |         hostname = postgresql_config['hostname'] | ||
|  |         port = postgresql_config['port'] | ||
|  |         database = postgresql_config['database'] | ||
|  |         username = postgresql_config['username'] | ||
|  |         password = postgresql_config['password'] | ||
|  |     except Exception: | ||
|  |         msg = f"""ERROR: invalid config in {settings.main_config_file}
 | ||
|  | The config file must contain a section like this: | ||
|  | 
 | ||
|  | postgresql: | ||
|  |     hostname: <HOSTNAME_OR_IP> | ||
|  |     port: <PORT> | ||
|  |     database: <DATABASE_NAME> | ||
|  |     username: <USERNAME> | ||
|  |     password: <PASSWORD> | ||
|  | """
 | ||
|  |         journal.send(msg, PRIORITY=journal.LOG_CRIT) | ||
|  |         return None | ||
|  |     dsn = f'host={hostname} port={port} dbname={database} '\ | ||
|  |           f'user={username} password={password}' | ||
|  |     return dsn | ||
|  | 
 | ||
|  | 
 | ||
|  | def _create_tables(dsn: str) -> bool: | ||
|  |     """
 | ||
|  |     Check existence of tables and possibly create them, returning success. | ||
|  |     """
 | ||
|  |     try: | ||
|  |         with psycopg2.connect(dsn) as conn: | ||
|  |             with conn.cursor() as curs: | ||
|  |                 for table_name, sql_stmts in get_create_table_stmts().items(): | ||
|  |                     ok = _create_table(curs, table_name, sql_stmts) | ||
|  |                     if not ok: | ||
|  |                         return False | ||
|  |     except Exception: | ||
|  |         journal.send( | ||
|  |             f'ERROR: cannot connect to database, check params' | ||
|  |             f' in {settings.main_config_file}', | ||
|  |             PRIORITY=journal.LOG_CRIT, | ||
|  |         ) | ||
|  |         return False | ||
|  |     return True | ||
|  | 
 | ||
|  | 
 | ||
|  | def _create_table( | ||
|  |     cursor: psycopg2.extras.RealDictCursor, | ||
|  |     table_name: str, | ||
|  |     sql_stmts: List[str] | ||
|  | ) -> bool: | ||
|  |     """
 | ||
|  |     Try to create a table if it does not exist and return whether it exists. | ||
|  | 
 | ||
|  |     If creation failed, emit an error to the journal. | ||
|  |     """
 | ||
|  |     cursor.execute("SELECT EXISTS(SELECT * FROM " | ||
|  |                    "information_schema.tables WHERE table_name=%s)", | ||
|  |                    (table_name,)) | ||
|  |     table_exists = cursor.fetchone()[0] | ||
|  |     if not table_exists: | ||
|  |         for sql_stmt in sql_stmts: | ||
|  |             try: | ||
|  |                 cursor.execute(sql_stmt) | ||
|  |             except Exception: | ||
|  |                 journal.send( | ||
|  |                     'ERROR: database user needs privilege to create tables.\n' | ||
|  |                     'Alternatively, you can create the table manually like' | ||
|  |                     ' this:\n\n' | ||
|  |                     + '\n'.join([sql + ';' for sql in sql_stmts]), | ||
|  |                     PRIORITY=journal.LOG_CRIT, | ||
|  |                 ) | ||
|  |                 return False | ||
|  |     return True | ||
|  | 
 | ||
|  | 
 | ||
|  | def init_session(cursor: psycopg2.extras.RealDictCursor) -> None: | ||
|  |     """
 | ||
|  |     Init a database session. | ||
|  | 
 | ||
|  |     Define prepared statements. | ||
|  |     """
 | ||
|  |     stmt = get_sql_prepared_statement('delivery_from') | ||
|  |     cursor.execute(stmt) | ||
|  |     stmt = get_sql_prepared_statement('delivery_to') | ||
|  |     cursor.execute(stmt) | ||
|  |     stmt = get_sql_prepared_statement('noqueue') | ||
|  |     cursor.execute(stmt) |