Files
fail2ban-p2p/fail2ban-p2p-client.py
2026-04-22 23:24:29 +02:00

131 lines
3.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""This script can be used to send a ban message to the own node."""
import argparse
import json
import os
import socket
import sys
from time import time
import M2Crypto
sys.path.insert(1, "./fail2ban-p2p")
sys.path.insert(2, "/usr/share/fail2ban-p2p/fail2ban-p2p")
import config
import util
import version
parser = argparse.ArgumentParser(description='fail2ban-p2p-client help.')
parser.add_argument('-b', help='IP address to ban', metavar='IP')
parser.add_argument('-c', default='/etc/fail2ban-p2p/', help='Read configuration from DIR.', metavar='DIR')
parser.add_argument('-d', help='Dump table of blocked hosts in the format <FORMAT> (table, json or count).', metavar='FORMAT')
parser.add_argument('-t', help='The list of blocked hosts should go back that many seconds.', metavar='SECONDS')
parser.add_argument('-q', action='store_true', help='Quiet, no output')
parser.add_argument('-v', action='store_true', help='Verbose output')
args = parser.parse_args()
c = config.Config()
c.configPath = args.c or "/etc/fail2ban-p2p"
c.privkey = os.path.join(c.configPath, 'private.pem')
c.pubkey = os.path.join(c.configPath, 'public.pem')
if c.loadConfig() is False:
sys.exit(1)
if not args.d and not args.b:
print("Please use the -b argument to specify an IP to ban or -d to request information about banned nodes.")
sys.exit(1)
dump = False
if args.d:
dump = True
if args.d not in ("table", "json", "count"):
print("invalid value for -d argument!")
sys.exit(1)
timeframe = 3600
if args.t:
try:
timeframe = int(args.t)
except ValueError:
print("Invalid Timeframe specified, only use integers! Using default value of 3600 instead")
timeframe = 3600
quiet = bool(args.q)
if dump:
unordered_dict = {
"msgType": 2,
"parameter": {"TimeFrame": timeframe},
"hops": ['local'],
}
serializable_dict = util.sort_recursive(unordered_dict)
elif args.b:
unordered_dict = {
"msgType": 1,
"parameter": {"Timestamp": int(time()), "AttackerIP": args.b, "Trustlevel": 100},
"hops": ['local'],
}
serializable_dict = util.sort_recursive(unordered_dict)
signed_message = json.dumps(serializable_dict)
signer = M2Crypto.EVP.load_key(c.privkey)
signer.sign_init()
signer.sign_update(signed_message.encode("utf-8"))
string_signature = signer.sign_final().hex()
signed_dict = {
"msg": serializable_dict,
"signature": string_signature,
"protocolVersion": version.protocolVersion,
}
cmdsigned = json.dumps(signed_dict)
ret = None
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect((c.addresses[0], int(c.port)))
s.sendall(cmdsigned.encode("utf-8"))
ret = s.recv(1048576).decode("utf-8")
if not quiet and args.v:
print("Message sent: " + cmdsigned)
except Exception:
if not quiet:
print("could not connect to " + c.name + " (" + c.addresses[0] + ":" + str(c.port) + ")")
finally:
if s is not None:
s.close()
if ret:
if args.d:
if "ERROR" in ret:
print("An error occured:\n")
print(ret)
elif args.d == "json":
print(ret)
elif args.d == "count":
print(len(json.loads(ret)))
else:
banList = json.loads(ret)
if len(banList) > 0:
print("IP".ljust(15, ' ') + "\tTimestamp\t\tBantime\t\tTrustlevel\tStatus")
for ban in banList:
status = "PENDING"
if int(c.threshold) <= int(ban['Trustlevel']):
status = "BANNED"
print(
ban['AttackerIP'].ljust(15, ' ') + "\t" + str(ban['Timestamp']) + "\t\t" +
str(ban['BanTime']) + "\t\t" + str(ban['Trustlevel']) + "\t\t" + status
)
else:
print("No hosts in banlist")
else:
print("Answer: " + ret)