fail2ban-analyze/main.py

82 lines
3.2 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import re
from datetime import datetime
from itertools import islice, chain
import argparse
import numpy as np
import termplotlib as tpl
parser = argparse.ArgumentParser(description='Analyze fail2ban logs')
subparsers = parser.add_subparsers(dest='subparser_name', help='sub-command help')
parser_histogram = subparsers.add_parser('histogram', help='Show a cli histogram of the hits in time')
parser_histogram.add_argument('--bucket-size', type=int, help='histogram bucket size in hours, min 1 hour', default=6)
parser_histogram.add_argument('--only-bans', help='only count ban hits', action='store_true', default=False)
parser_rank = subparsers.add_parser('rank', help='Rank the ip\'s by number of hits', description='''
Rank the ip\'s by number of hits. For example passing 2 as subnets means that all the
ips that start with the same two subnets will be counted as one in the final rank''')
parser_rank.add_argument('subnets', type=int, help='Number of consecutive subnets to match.')
parser_rank.add_argument('--count', type=int, help='Show the first count entries', default=10)
parser_rank.add_argument('--only-bans', help='only count ban hits', action='store_true', default=False)
args = parser.parse_args()
if args.subparser_name is None:
print("Must choose a subprogram {rank, histogram}", file=sys.stderr)
parser.print_help()
exit(1)
RE_DATE = re.compile('^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})')
RE_FOUND = re.compile('\[sshd\] (Found) (\d+.\d+.\d+.\d+)')
RE_BAN = re.compile('\[sshd\] (Ban) (\d+.\d+.\d+.\d+)')
DATE_FORMAT = '%Y-%m-%d %H-%M-%S'
found_entries = dict()
ban_entries = dict()
if args.subparser_name == 'rank':
ip_masks = max(min(args.subnets, 4), 1)
else:
ip_masks = 4
min_date = None
max_date = None
for _, line in enumerate(sys.stdin):
match_entry = RE_FOUND.search(line)
if match_entry is None:
match_entry = RE_BAN.search(line)
if match_entry is not None:
match_date = RE_DATE.search(line)
d_text = match_date.group(1).replace(',', '.')
d = datetime.fromisoformat(d_text)
entry_type = match_entry.group(1)
ip = '.'.join(match_entry.group(2).split('.')[0:ip_masks])
min_date = min(d, min_date) if min_date is not None else d
max_date = max(d, max_date) if max_date is not None else d
if entry_type =='Found':
found_entries[ip] = found_entries.get(ip, []) + [d]
else:
ban_entries[ip] = ban_entries.get(ip, []) + [d]
entries = ban_entries if args.only_bans else found_entries
if args.subparser_name == 'rank':
counted_ips = ((k, len(v)) for k, v in entries.items())
sorted_ips = sorted(counted_ips, key=lambda x: x[1], reverse=True)
for ip, count in islice(sorted_ips, args.count):
print(f'{ip}: found {count} times')
else:
dates = chain.from_iterable(v for v in entries.values())
timestamps = list(d.timestamp() for d in dates)
bucket_size = max(args.bucket_size, 1)
num_buckets = (max_date - min_date).total_seconds() / (bucket_size * 3600)
num_buckets = max(1, int(num_buckets))
counts, bin_edges = np.histogram(timestamps, bins=num_buckets)
fig = tpl.figure()
fig.hist(counts, bin_edges)
fig.show()