337 lines
10 KiB
Python
Executable file
337 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Storage to PostgreSQL.
|
|
"""
|
|
|
|
import datetime
|
|
import json
|
|
import re
|
|
import time
|
|
from collections import defaultdict
|
|
from traceback import format_exc
|
|
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from systemd import journal
|
|
import settings
|
|
from storage_setup import (
|
|
get_create_table_stmts,
|
|
get_sql_prepared_statement,
|
|
get_sql_execute_prepared_statement,
|
|
table_fields,
|
|
)
|
|
|
|
|
|
def get_latest_timestamp(curs: psycopg2.extras.RealDictCursor) -> int:
|
|
"""
|
|
Fetch the latest timestamp from the database.
|
|
|
|
Return the latest timestamp of a message transfer from the database.
|
|
If there are no records yet, return 0.
|
|
"""
|
|
last = 0
|
|
curs.execute(
|
|
"SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_from"
|
|
)
|
|
last1 = curs.fetchone()['last']
|
|
if last1:
|
|
last = max(
|
|
last, (last1 - datetime.datetime(1970, 1, 1)).total_seconds()
|
|
)
|
|
curs.execute(
|
|
"SELECT greatest(max(t_i), max(t_f)) AS last FROM delivery_to"
|
|
)
|
|
last2 = curs.fetchone()['last']
|
|
if last2:
|
|
last = max(
|
|
last, (last2 - datetime.datetime(1970, 1, 1)).total_seconds()
|
|
)
|
|
return last
|
|
|
|
|
|
def delete_old_deliveries(curs: psycopg2.extras.RealDictCursor) -> None:
|
|
"""
|
|
Delete deliveries older than the configured number of days.
|
|
|
|
See config param *delete_deliveries_after_days*.
|
|
"""
|
|
max_days = settings.delete_deliveries_after_days
|
|
if max_days:
|
|
now = datetime.datetime.utcnow()
|
|
dt = datetime.timedelta(days=max_days)
|
|
t0 = now - dt
|
|
curs.execute("DELETE FROM delivery_from WHERE t_i < %s", (t0,))
|
|
curs.execute("DELETE FROM delivery_to WHERE t_i < %s", (t0,))
|
|
curs.execute("DELETE FROM noqueue WHERE t < %s", (t0,))
|
|
|
|
|
|
def store_delivery_items(
|
|
cursor,
|
|
cache: List[dict],
|
|
debug: List[str] = []
|
|
) -> None:
|
|
"""
|
|
Store cached delivery items into the database.
|
|
|
|
Find queue_ids in *cache* and group delivery items by
|
|
them, but separately for delivery types 'from' and 'to'.
|
|
In addition, collect delivery items with queue_id is None.
|
|
|
|
After grouping we merge all items withing a group into a
|
|
single item. So we can combine several SQL queries into
|
|
a single one, which improves performance significantly.
|
|
|
|
Then store the merged items and the deliveries with
|
|
queue_id is None.
|
|
"""
|
|
if 'all' in debug or 'sql' in debug:
|
|
print(f'Storing {len(cache)} messages.')
|
|
if not cache:
|
|
return
|
|
from_items, to_items, noqueue_items = _group_delivery_items(cache)
|
|
deliveries_from = _merge_delivery_items(from_items, item_type='from')
|
|
deliveries_to = _merge_delivery_items(to_items, item_type='to')
|
|
_store_deliveries(cursor, 'delivery_from', deliveries_from, debug=debug)
|
|
_store_deliveries(cursor, 'delivery_to', deliveries_to, debug=debug)
|
|
_store_deliveries(cursor, 'noqueue', noqueue_items, debug=debug)
|
|
|
|
|
|
FromItems = Dict[str, List[dict]]
|
|
|
|
|
|
ToItems = Dict[Tuple[str, Optional[str]], List[dict]]
|
|
|
|
|
|
NoqueueItems = Dict[int, dict]
|
|
|
|
|
|
def _group_delivery_items(
|
|
cache: List[dict]
|
|
) -> Tuple[FromItems, ToItems, NoqueueItems]:
|
|
"""
|
|
Group delivery items by type and queue_id.
|
|
|
|
Return items of type 'from', of type 'to' and items without
|
|
queue_id.
|
|
"""
|
|
delivery_from_items: FromItems = defaultdict(list)
|
|
delivery_to_items: ToItems = defaultdict(list)
|
|
noqueue_items: NoqueueItems = {}
|
|
noqueue_i = 1
|
|
for item in cache:
|
|
if item.get('queue_id'):
|
|
queue_id = item['queue_id']
|
|
if item.get('type') == 'from':
|
|
delivery_from_items[queue_id].append(item)
|
|
else:
|
|
recipient = item.get('recipient')
|
|
delivery_to_items[(queue_id, recipient)].append(item)
|
|
else:
|
|
noqueue_items[noqueue_i] = item
|
|
noqueue_i += 1
|
|
return delivery_from_items, delivery_to_items, noqueue_items
|
|
|
|
|
|
def _merge_delivery_items(
|
|
delivery_items: Union[FromItems, ToItems],
|
|
item_type: str = 'from',
|
|
) -> Dict[Union[str, Tuple[str, Optional[str]]], dict]:
|
|
"""
|
|
Compute deliveries by combining multiple delivery items.
|
|
|
|
Take lists of delivery items for each queue_id (in case
|
|
of item_type=='from') or for (queue_id, recipient)-pairs
|
|
(in case of item_type='to').
|
|
Each delivery item is a dict obtained from one log message.
|
|
The dicts are consecutively updated (merged), except for the
|
|
raw log messages (texts) which are collected into a list.
|
|
The fields of the resulting delivery are filtered according
|
|
to the target table.
|
|
Returned is a dict mapping queue_ids (in case
|
|
of item_type=='from') or (queue_id, recipient)-pairs
|
|
(in case of item_type='to') to deliveries.
|
|
"""
|
|
deliveries = {}
|
|
for group, items in delivery_items.items():
|
|
delivery = {}
|
|
messages = []
|
|
for item in items:
|
|
message = item.pop('message')
|
|
identifier = item.pop('identifier')
|
|
pid = item.pop('pid')
|
|
messages.append(f'{identifier}[{pid}]: {message}')
|
|
delivery.update(item)
|
|
delivery['messages'] = messages
|
|
deliveries[group] = delivery
|
|
return deliveries
|
|
|
|
|
|
def _store_deliveries(
|
|
cursor: psycopg2.extras.RealDictCursor,
|
|
table_name: str,
|
|
deliveries: Dict[Any, dict],
|
|
debug: List[str] = [],
|
|
) -> None:
|
|
"""
|
|
Store grouped and merged delivery items.
|
|
"""
|
|
if not deliveries:
|
|
return
|
|
n = len(deliveries.values())
|
|
t0 = time.time()
|
|
cursor.execute('BEGIN')
|
|
_store_deliveries_batch(cursor, table_name, deliveries.values())
|
|
cursor.execute('COMMIT')
|
|
t1 = time.time()
|
|
if 'all' in debug or 'sql' in debug:
|
|
milliseconds = (t1 - t0) * 1000
|
|
print(
|
|
'*' * 10,
|
|
f'SQL transaction time {table_name}: '
|
|
f'{milliseconds:.2f} ms ({n} deliveries)',
|
|
)
|
|
|
|
|
|
def _store_deliveries_batch(
|
|
cursor: psycopg2.extras.RealDictCursor,
|
|
table_name: str,
|
|
deliveries: Iterable[dict]
|
|
) -> None:
|
|
"""
|
|
Store *deliveries* (i.e., grouped and merged delivery items).
|
|
|
|
We use a prepared statement and execute_batch() from
|
|
psycopg2.extras to improve performance.
|
|
"""
|
|
rows = []
|
|
for delivery in deliveries:
|
|
# get values for all fields of the table
|
|
field_values: List[Any] = []
|
|
t = delivery.get('t')
|
|
delivery['t_i'] = t
|
|
delivery['t_f'] = t
|
|
for field in table_fields[table_name]:
|
|
if field in delivery:
|
|
if field == 'messages':
|
|
field_values.append(json.dumps(delivery[field]))
|
|
else:
|
|
field_values.append(delivery[field])
|
|
else:
|
|
field_values.append(None)
|
|
rows.append(field_values)
|
|
sql = get_sql_execute_prepared_statement(table_name)
|
|
try:
|
|
psycopg2.extras.execute_batch(cursor, sql, rows)
|
|
except Exception as err:
|
|
msg = f'SQL statement failed: "{sql}" -- the values were: {rows}'
|
|
journal.send(msg, PRIORITY=journal.LOG_ERR)
|
|
|
|
|
|
def init_db(config: dict) -> Optional[str]:
|
|
"""
|
|
Initialize database; if ok return DSN, else None.
|
|
|
|
Try to get parameters for database access,
|
|
check existence of tables and possibly create them.
|
|
"""
|
|
dsn = _get_dsn(config)
|
|
if dsn:
|
|
ok = _create_tables(dsn)
|
|
if not ok:
|
|
return None
|
|
return dsn
|
|
|
|
|
|
def _get_dsn(config: dict) -> Optional[str]:
|
|
"""
|
|
Return the DSN (data source name) from the *config*.
|
|
"""
|
|
try:
|
|
postgresql_config = config['postgresql']
|
|
hostname = postgresql_config['hostname']
|
|
port = postgresql_config['port']
|
|
database = postgresql_config['database']
|
|
username = postgresql_config['username']
|
|
password = postgresql_config['password']
|
|
except Exception:
|
|
msg = f"""ERROR: invalid config in {settings.main_config_file}
|
|
The config file must contain a section like this:
|
|
|
|
postgresql:
|
|
hostname: <HOSTNAME_OR_IP>
|
|
port: <PORT>
|
|
database: <DATABASE_NAME>
|
|
username: <USERNAME>
|
|
password: <PASSWORD>
|
|
"""
|
|
journal.send(msg, PRIORITY=journal.LOG_CRIT)
|
|
return None
|
|
dsn = f'host={hostname} port={port} dbname={database} '\
|
|
f'user={username} password={password}'
|
|
return dsn
|
|
|
|
|
|
def _create_tables(dsn: str) -> bool:
|
|
"""
|
|
Check existence of tables and possibly create them, returning success.
|
|
"""
|
|
try:
|
|
with psycopg2.connect(dsn) as conn:
|
|
with conn.cursor() as curs:
|
|
for table_name, sql_stmts in get_create_table_stmts().items():
|
|
ok = _create_table(curs, table_name, sql_stmts)
|
|
if not ok:
|
|
return False
|
|
except Exception:
|
|
journal.send(
|
|
f'ERROR: cannot connect to database, check params'
|
|
f' in {settings.main_config_file}',
|
|
PRIORITY=journal.LOG_CRIT,
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
def _create_table(
|
|
cursor: psycopg2.extras.RealDictCursor,
|
|
table_name: str,
|
|
sql_stmts: List[str]
|
|
) -> bool:
|
|
"""
|
|
Try to create a table if it does not exist and return whether it exists.
|
|
|
|
If creation failed, emit an error to the journal.
|
|
"""
|
|
cursor.execute("SELECT EXISTS(SELECT * FROM "
|
|
"information_schema.tables WHERE table_name=%s)",
|
|
(table_name,))
|
|
table_exists = cursor.fetchone()[0]
|
|
if not table_exists:
|
|
for sql_stmt in sql_stmts:
|
|
try:
|
|
cursor.execute(sql_stmt)
|
|
except Exception:
|
|
journal.send(
|
|
'ERROR: database user needs privilege to create tables.\n'
|
|
'Alternatively, you can create the table manually like'
|
|
' this:\n\n'
|
|
+ '\n'.join([sql + ';' for sql in sql_stmts]),
|
|
PRIORITY=journal.LOG_CRIT,
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
def init_session(cursor: psycopg2.extras.RealDictCursor) -> None:
|
|
"""
|
|
Init a database session.
|
|
|
|
Define prepared statements.
|
|
"""
|
|
stmt = get_sql_prepared_statement('delivery_from')
|
|
cursor.execute(stmt)
|
|
stmt = get_sql_prepared_statement('delivery_to')
|
|
cursor.execute(stmt)
|
|
stmt = get_sql_prepared_statement('noqueue')
|
|
cursor.execute(stmt)
|