213 lines
6.8 KiB
Python
213 lines
6.8 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
"""
|
||
|
Main script to be run as a systemd unit or manually.
|
||
|
"""
|
||
|
|
||
|
import argparse
|
||
|
import datetime
|
||
|
import os
|
||
|
import sys
|
||
|
from pprint import pprint
|
||
|
from typing import Iterable, List, Optional, Tuple, Union
|
||
|
import psycopg2
|
||
|
import psycopg2.extras
|
||
|
from systemd import journal
|
||
|
import settings
|
||
|
from parser import init_parser, parse_entry, extract_delivery
|
||
|
from sources import (
|
||
|
iter_journal_messages_since,
|
||
|
iter_journal_messages_follow,
|
||
|
iter_logfile_messages,
|
||
|
)
|
||
|
from storage import (
|
||
|
init_db,
|
||
|
init_session,
|
||
|
get_latest_timestamp,
|
||
|
delete_old_deliveries,
|
||
|
store_delivery_items,
|
||
|
)
|
||
|
|
||
|
|
||
|
exit_code_without_restart = 97
|
||
|
|
||
|
|
||
|
def run(
|
||
|
dsn: str,
|
||
|
verp_marker: Optional[str] = None,
|
||
|
filepath: Optional[str] = None,
|
||
|
year: Optional[int] = None,
|
||
|
debug: List[str] = [],
|
||
|
) -> None:
|
||
|
"""
|
||
|
Determine loop(s) and run them within a database context.
|
||
|
"""
|
||
|
init_parser(verp_marker=verp_marker)
|
||
|
with psycopg2.connect(dsn) as conn:
|
||
|
with conn.cursor(
|
||
|
cursor_factory=psycopg2.extras.RealDictCursor
|
||
|
) as curs:
|
||
|
init_session(curs)
|
||
|
if filepath and year:
|
||
|
run_loop(
|
||
|
iter_logfile_messages(filepath, year), curs, debug=debug
|
||
|
)
|
||
|
else:
|
||
|
begin_timestamp = get_latest_timestamp(curs)
|
||
|
run_loop(
|
||
|
iter_journal_messages_since(begin_timestamp),
|
||
|
curs,
|
||
|
debug=debug,
|
||
|
)
|
||
|
begin_timestamp = get_latest_timestamp(curs)
|
||
|
run_loop(
|
||
|
iter_journal_messages_follow(begin_timestamp),
|
||
|
curs,
|
||
|
debug=debug,
|
||
|
)
|
||
|
|
||
|
|
||
|
def run_loop(
|
||
|
iterable: Iterable[Tuple[bool, Optional[dict]]],
|
||
|
curs: psycopg2.extras.RealDictCursor,
|
||
|
debug: List[str] = []
|
||
|
) -> None:
|
||
|
"""
|
||
|
Loop over log entries obtained from *iterable*.
|
||
|
|
||
|
Parse the message, extract delivery information from it and store
|
||
|
that delivery information.
|
||
|
|
||
|
For performance reasons delivery items are collected in a cache
|
||
|
before writing them (i.e., committing a database transaction).
|
||
|
"""
|
||
|
cache = []
|
||
|
msg_count = settings.max_messages_per_commit
|
||
|
last_delete = None
|
||
|
for commit, msg_details in iterable:
|
||
|
parsed_entry = None
|
||
|
if msg_details:
|
||
|
parsed_entry = parse_entry(msg_details)
|
||
|
if 'all' in debug or (
|
||
|
parsed_entry and parsed_entry.get('comp') in debug
|
||
|
):
|
||
|
print('_' * 80)
|
||
|
print('MSG_DETAILS:', msg_details)
|
||
|
print('PARSED_ENTRY:', parsed_entry)
|
||
|
if parsed_entry:
|
||
|
errors, delivery = extract_delivery(msg_details, parsed_entry)
|
||
|
if not errors and delivery:
|
||
|
if 'all' in debug or parsed_entry.get('comp') in debug:
|
||
|
print('DELIVERY:')
|
||
|
pprint(delivery)
|
||
|
# it may happen that a delivery of type 'from' has
|
||
|
# a recipient; in this case add a second delivery
|
||
|
# of type 'to' to the cache, but only for deliveries
|
||
|
# with queue_id
|
||
|
if (
|
||
|
delivery['type'] == 'from'
|
||
|
and 'recipient' in delivery
|
||
|
and delivery.get('queue_id')
|
||
|
):
|
||
|
delivery2 = delivery.copy()
|
||
|
delivery2['type'] = 'to'
|
||
|
cache.append(delivery2)
|
||
|
del delivery['recipient']
|
||
|
cache.append(delivery)
|
||
|
msg_count -= 1
|
||
|
if msg_count == 0:
|
||
|
commit = True
|
||
|
elif errors:
|
||
|
msg = (
|
||
|
f'Extracting delivery from parsed entry failed: '
|
||
|
f'errors={errors}; msg_details={msg_details}; '
|
||
|
f'parsed_entry={parsed_entry}'
|
||
|
)
|
||
|
journal.send(msg, PRIORITY=journal.LOG_CRIT)
|
||
|
if 'all' in debug or parsed_entry.get('comp') in debug:
|
||
|
print('EXTRACTION ERRORS:', errors)
|
||
|
if commit:
|
||
|
if 'all' in debug:
|
||
|
print('.' * 40, 'committing')
|
||
|
# store cache, clear cache, reset message counter
|
||
|
store_delivery_items(curs, cache, debug=debug)
|
||
|
cache = []
|
||
|
msg_count = settings.max_messages_per_commit
|
||
|
now = datetime.datetime.utcnow()
|
||
|
if last_delete is None or last_delete < now - settings.delete_interval:
|
||
|
delete_old_deliveries(curs)
|
||
|
last_delete = now
|
||
|
if 'all' in debug:
|
||
|
print('.' * 40, 'deleting old deliveries')
|
||
|
else:
|
||
|
store_delivery_items(curs, cache, debug=debug)
|
||
|
|
||
|
|
||
|
def main() -> None:
|
||
|
parser = argparse.ArgumentParser()
|
||
|
parser.add_argument(
|
||
|
'--debug',
|
||
|
help='Comma-separated list of components to be debugged; '
|
||
|
'valid component names are the Postfix components '
|
||
|
'plus "sql" plus "all".',
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'--file',
|
||
|
help='File path of a Postfix logfile in syslog '
|
||
|
'format to be parsed instead of the journal',
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'--year',
|
||
|
help='If --file is given, we need to know '
|
||
|
'the year of the first line in the logfile',
|
||
|
)
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
config = settings.get_config()
|
||
|
if config:
|
||
|
# check if startup is enabled or fail
|
||
|
msg = None
|
||
|
if 'startup' not in config:
|
||
|
msg = 'Parameter "startup" is not configured.'
|
||
|
elif not config['startup']:
|
||
|
msg = 'Startup is not enabled in the config file.'
|
||
|
if msg:
|
||
|
journal.send(msg, PRIORITY=journal.LOG_CRIT)
|
||
|
sys.exit(exit_code_without_restart)
|
||
|
# check more params and call run
|
||
|
try:
|
||
|
verp_marker = config['postfix']['verp_marker']
|
||
|
except Exception:
|
||
|
verp_marker = None
|
||
|
debug: List[str] = []
|
||
|
if args.debug:
|
||
|
debug = args.debug.split(',')
|
||
|
filepath = None
|
||
|
year = None
|
||
|
if args.file:
|
||
|
filepath = args.file
|
||
|
if not args.year:
|
||
|
print(
|
||
|
'If --file is given, we need to know the year'
|
||
|
' of the first line in the logfile. Please use --year.'
|
||
|
)
|
||
|
sys.exit(1)
|
||
|
else:
|
||
|
year = int(args.year)
|
||
|
dsn = init_db(config)
|
||
|
if dsn:
|
||
|
run(
|
||
|
dsn,
|
||
|
verp_marker=verp_marker,
|
||
|
filepath=filepath,
|
||
|
year=year,
|
||
|
debug=debug,
|
||
|
)
|
||
|
else:
|
||
|
print('Config invalid, see journal.')
|
||
|
sys.exit(exit_code_without_restart)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|