179 lines
5.5 KiB
Python
179 lines
5.5 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
"""
|
||
|
Data sources.
|
||
|
|
||
|
Note: python-systemd journal docs are at
|
||
|
https://www.freedesktop.org/software/systemd/python-systemd/journal.html
|
||
|
"""
|
||
|
|
||
|
import datetime
|
||
|
import select
|
||
|
from typing import Iterable, Optional, Tuple, Union
|
||
|
from systemd import journal
|
||
|
import settings
|
||
|
|
||
|
|
||
|
def iter_journal_messages_since(
|
||
|
timestamp: Union[int, float]
|
||
|
) -> Iterable[Tuple[bool, dict]]:
|
||
|
"""
|
||
|
Yield False and message details from the journal since *timestamp*.
|
||
|
|
||
|
This is the loading phase (loading messages that already existed
|
||
|
when we start).
|
||
|
|
||
|
Argument *timestamp* is a UNIX timestamp.
|
||
|
|
||
|
Only journal entries for systemd unit settings.systemd_unitname with
|
||
|
loglevel INFO and above are retrieved.
|
||
|
"""
|
||
|
timestamp = float(timestamp)
|
||
|
sdj = journal.Reader()
|
||
|
sdj.log_level(journal.LOG_INFO)
|
||
|
sdj.add_match(_SYSTEMD_UNIT=settings.systemd_unitname)
|
||
|
sdj.seek_realtime(timestamp)
|
||
|
for entry in sdj:
|
||
|
yield False, _get_msg_details(entry)
|
||
|
|
||
|
|
||
|
def iter_journal_messages_follow(
|
||
|
timestamp: Union[int, float]
|
||
|
) -> Iterable[Tuple[bool, Optional[dict]]]:
|
||
|
"""
|
||
|
Yield commit and message details from the journal through polling.
|
||
|
|
||
|
This is the polling phase (after we have read pre-existing messages
|
||
|
in the loading phase).
|
||
|
|
||
|
Argument *timestamp* is a UNIX timestamp.
|
||
|
|
||
|
Only journal entries for systemd unit settings.systemd_unitname with
|
||
|
loglevel INFO and above are retrieved.
|
||
|
|
||
|
*commit* (bool) tells whether it is time to store the delivery
|
||
|
information obtained from the messages yielded by us.
|
||
|
It is set to True if settings.max_delay_before_commit has elapsed.
|
||
|
After this delay delivery information will be written; to be exact:
|
||
|
the delay may increase by up to one settings.journal_poll_interval.
|
||
|
"""
|
||
|
sdj = journal.Reader()
|
||
|
sdj.log_level(journal.LOG_INFO)
|
||
|
sdj.add_match(_SYSTEMD_UNIT=settings.systemd_unitname)
|
||
|
sdj.seek_realtime(timestamp)
|
||
|
p = select.poll()
|
||
|
p.register(sdj, sdj.get_events())
|
||
|
last_commit = datetime.datetime.utcnow()
|
||
|
interval_ms = settings.journal_poll_interval * 1000
|
||
|
while True:
|
||
|
p.poll(interval_ms)
|
||
|
commit = False
|
||
|
now = datetime.datetime.utcnow()
|
||
|
if last_commit + settings.max_delay_before_commit < now:
|
||
|
commit = True
|
||
|
last_commit = now
|
||
|
if sdj.process() == journal.APPEND:
|
||
|
for entry in sdj:
|
||
|
yield commit, _get_msg_details(entry)
|
||
|
elif commit:
|
||
|
yield commit, None
|
||
|
|
||
|
|
||
|
def iter_logfile_messages(
|
||
|
filepath: str,
|
||
|
year: int,
|
||
|
commit_after_lines=settings.max_messages_per_commit,
|
||
|
) -> Iterable[Tuple[bool, dict]]:
|
||
|
"""
|
||
|
Yield messages and a commit flag from a logfile.
|
||
|
|
||
|
Loop through all lines of the file with given *filepath* and
|
||
|
extract the time and log message. If the log message starts
|
||
|
with 'postfix/', then extract the syslog_identifier, pid and
|
||
|
message text.
|
||
|
|
||
|
Since syslog lines do not contain the year, the *year* to which
|
||
|
the first log line belongs must be given.
|
||
|
|
||
|
Return a commit flag and a dict with these keys:
|
||
|
't': timestamp
|
||
|
'message': message text
|
||
|
'identifier': syslog identifier (e.g., 'postfix/smtpd')
|
||
|
'pid': process id
|
||
|
|
||
|
The commit flag will be set to True for every
|
||
|
(commit_after_lines)-th filtered message and serves
|
||
|
as a signal to the caller to commit this chunk of data
|
||
|
to the database.
|
||
|
"""
|
||
|
dt = None
|
||
|
with open(filepath, 'r') as fh:
|
||
|
cnt = 0
|
||
|
while True:
|
||
|
line = fh.readline()
|
||
|
if not line:
|
||
|
break
|
||
|
|
||
|
# get datetime
|
||
|
timestamp = line[:15]
|
||
|
dt_prev = dt
|
||
|
dt = _parse_logfile_timestamp(timestamp, year)
|
||
|
if dt is None:
|
||
|
continue # discard log message with invalid timestamp
|
||
|
|
||
|
# if we transgress a year boundary, then increment the year
|
||
|
if dt_prev and dt + datetime.timedelta(days=1) < dt_prev:
|
||
|
year += 1
|
||
|
dt = _parse_logfile_timestamp(timestamp, year)
|
||
|
|
||
|
# filter postfix messages
|
||
|
msg = line[21:].strip()
|
||
|
if 'postfix/' in msg:
|
||
|
cnt += 1
|
||
|
syslog_identifier, msg_ = msg.split('[', 1)
|
||
|
pid, msg__ = msg_.split(']', 1)
|
||
|
message = msg__[2:]
|
||
|
commit = cnt % commit_after_lines == 0
|
||
|
yield commit, {
|
||
|
't': dt,
|
||
|
'message': message,
|
||
|
'identifier': syslog_identifier,
|
||
|
'pid': pid,
|
||
|
}
|
||
|
|
||
|
|
||
|
def _get_msg_details(journal_entry: dict) -> dict:
|
||
|
"""
|
||
|
Return information extracted from a journal entry object as a dict.
|
||
|
"""
|
||
|
return {
|
||
|
't': journal_entry['__REALTIME_TIMESTAMP'],
|
||
|
'message': journal_entry['MESSAGE'],
|
||
|
'identifier': journal_entry.get('SYSLOG_IDENTIFIER'),
|
||
|
'pid': journal_entry.get('SYSLOG_PID'),
|
||
|
}
|
||
|
|
||
|
|
||
|
def _parse_logfile_timestamp(
|
||
|
timestamp: Optional[str],
|
||
|
year: int
|
||
|
) -> Optional[datetime.datetime]:
|
||
|
"""
|
||
|
Parse a given syslog *timestamp* and return a datetime.
|
||
|
|
||
|
Since the timestamp does not contain the year, it is an
|
||
|
extra argument.
|
||
|
|
||
|
Note: Successful parsing og the month's name depends on
|
||
|
the locale under which this script runs.
|
||
|
"""
|
||
|
if timestamp is None:
|
||
|
return None
|
||
|
try:
|
||
|
timestamp = timestamp.replace(' ', ' ')
|
||
|
t1 = datetime.datetime.strptime(timestamp, '%b %d %H:%M:%S')
|
||
|
t2 = t1.replace(year=year)
|
||
|
return t2
|
||
|
except Exception:
|
||
|
return None
|