Files
fail2ban-p2p/fail2ban-p2p/node.py
2026-04-23 01:55:12 +02:00

432 lines
18 KiB
Python

# Copyright 2013 Johannes Fuermann <johannes at fuermann.cc>
# Copyright 2013 Manuel Munz <manu at somakoma.de>
#
# This file is part of fail2ban-p2p.
#
# Licensed under the GNU GENERAL PUBLIC LICENSE Version 3. For details
# see the file COPYING or http://www.gnu.org/licenses/gpl-3.0.en.html.
import hashlib
import json
import os
import re
import socket
import threading
from select import select
from time import time
from M2Crypto import EVP, RSA
import config
import crypto
import customexceptions
import friend
import log
import server
import validators
import version
from command import Command
logger = log.initialize_logging("fail2ban-p2p." + __name__)
class Node:
"""Handles the self-awareness of the program."""
__shared_state = {}
uid = 0
name = ""
addresses = []
port = 0
ownerMail = ""
banTime = 0
banList = []
messageQueue = []
friends = []
running = True
lock = threading.Lock()
def __init__(self):
self.__dict__ = self.__shared_state
def openSocket(self):
logger.info("This is node %s (uid=%s) coming up", self.name, self.uid)
logger.debug("running version: %s", version.version)
try:
sockets = []
for address in self.addresses:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((address, int(self.port)))
sock.listen(1)
sockets.append(sock)
except Exception as e:
logger.warning("Couldn't bind to address %s (Reason: %s)", address, e)
while self.running:
readable, _, _ = select(sockets, [], [])
for sock in readable:
client_socket, address = sock.accept()
logger.debug("connection from %s", address[0])
thread = threading.Thread(
target=server.serve,
args=(self, client_socket, address),
daemon=True,
)
thread.start()
except Exception as e:
print(e)
def closeSocket(self):
logger.debug("closing socket")
self.running = False
def processMessages(self):
self.lock.acquire()
logger.debug("begin message handling")
for command in self.messageQueue:
if self.uid not in command.hops:
if command.hops[0] == "local":
del command.hops[0]
command.hops.append(self.uid)
if command.msgType == 1:
if "Trustlevel" not in command.parameter:
logger.warning("Incoming message has no Trustlevel, I won't trust it.")
command.parameter["Trustlevel"] = 0
if command.sender != "local":
command.parameter["Trustlevel"] = int(
(float(command.sender.trustLevel) / 100.0)
* (float(command.parameter["Trustlevel"]) / 100.0)
* 100
)
logger.debug("Message now has trust level %s", command.parameter["Trustlevel"])
relay = True
ipindb = False
if self.banList:
for ban in self.banList:
if ban["AttackerIP"] == command.parameter["AttackerIP"]:
ipindb = True
logger.debug("IP already in database.")
if int(ban["Timestamp"]) != int(command.parameter["Timestamp"]):
if command.hops[0] not in ban["Hops"]:
trustold = ban["Trustlevel"]
trustnew = int(trustold) + int(command.parameter["Trustlevel"])
if trustnew > 100:
trustnew = 100
ban["Trustlevel"] = trustnew
ban["Hops"].append(command.hops[0])
logger.debug("TrustLevel for this IP is now %s", trustnew)
command.parameter["Trustlevel"] = trustnew
else:
relay = False
logger.debug(
"There is already an entry from %s in our database, do nothing with this message.",
command.hops[0],
)
else:
relay = False
logger.debug("Timestamp has not changed, do nothing with this message")
if not ipindb:
self.banList.append(
{
"AttackerIP": command.parameter["AttackerIP"],
"Timestamp": command.parameter["Timestamp"],
"BanTime": self.banTime,
"Trustlevel": command.parameter["Trustlevel"],
"Hops": [command.hops[0]],
}
)
logger.debug("Added %s to internal banlist", command.parameter["AttackerIP"])
if relay:
if int(command.parameter["Trustlevel"]) >= int(config.Config().threshold):
logger.ban(command.parameter["AttackerIP"])
else:
logger.debug(
"Message's trust level (%s) was below our threshold (%s)",
command.parameter["Trustlevel"],
config.Config().threshold,
)
for peer in self.friends:
logger.debug("sending message to all friends")
peer.sendCommand(command)
if command.msgType == 3:
sender_uid = command.hops[0]
for peer in self.friends:
logger.debug(
"Comparing senders uid (%s) with one of our friends uid (%s)",
sender_uid,
peer.uid,
)
if peer.uid == sender_uid:
logger.debug("The message is from our friend %s (uid: %s)", peer.name, peer.uid)
logger.debug("Dumping banlist to %s (uid: %s)", peer.name, peer.uid)
if self.banList:
for ban in self.banList:
dump_cmd = Command()
dump_cmd.msgType = 1
dump_cmd.hops = [self.uid]
dump_cmd.protocolVersion = version.protocolVersion
dump_cmd.parameter = {
"AttackerIP": ban["AttackerIP"],
"Timestamp": ban["Timestamp"],
"Trustlevel": ban["Trustlevel"],
}
peer.sendCommand(dump_cmd)
else:
logger.debug("I know this message, I won't resend it to prevent loops")
logger.debug("end message handling")
self.messageQueue = []
logger.debug("deleted processed messages")
self.lock.release()
def addMessage(self, command):
logger.debug("command added to queue")
self.lock.acquire()
self.messageQueue.append(command)
self.lock.release()
self.processMessages()
def loadConfig(self):
c = config.Config()
self.configPath = c.configPath
self.configFile = c.configFile
pubkey_file = open(c.pubkey, "r", encoding="utf-8").read()
pubkey = re.findall(
"-----BEGIN PUBLIC KEY-----(.*?)-----END PUBLIC KEY-----",
pubkey_file,
re.DOTALL | re.M,
)[0]
logger.debug("our own pubkey is: %s", pubkey)
self.uid = hashlib.sha224(pubkey.encode("utf-8")).hexdigest()
logger.debug("that makes our own uid: %s", self.uid)
self.addresses = c.addresses
self.port = c.port
self.ownerMail = c.ownermail
self.banTime = int(c.banTime)
self.name = c.name
def getFriends(self):
error = False
friendPath = os.path.join(self.configPath, "friends")
friend_files = [f for f in os.listdir(friendPath) if os.path.isfile(os.path.join(friendPath, f))]
if not friend_files:
logger.warning("No friends found. In order to properly use fail2ban-p2p add at least one friend.")
for file in friend_files:
with open(os.path.join(self.configPath, "friends", file), "r", encoding="utf-8") as handle:
friendinfo = str(handle.read())
pubkey = None
try:
pubkey = re.findall(
"-----BEGIN PUBLIC KEY-----(.*?)-----END PUBLIC KEY-----",
friendinfo,
re.DOTALL | re.M,
)[0]
except IndexError:
logger.warning("No pubkey found in config for %s", file)
error = True
if pubkey:
logger.debug("read friend's public key: %s", pubkey)
uid = hashlib.sha224(pubkey.encode("utf-8")).hexdigest()
try:
address = re.search(r"address\s*=\s*(.*)", friendinfo).group(1)
except AttributeError:
logger.warning("address not found in config for %s", file)
error = True
try:
port = re.search(r"port\s*=\s*(.*)", friendinfo).group(1)
if not 0 < int(port) < 65536:
logger.warning("Port is invalid in '%s' friend file, must be between 0 and 65535", file)
error = True
except AttributeError:
logger.warning("port not found in config for %s", file)
error = True
try:
trustlevel = re.search(r"trustlevel\s*=\s*(.*)", friendinfo).group(1)
except AttributeError:
logger.warning("trustlevel not found in config for %s", file)
error = True
if not error:
obj = friend.Friend(
name=file,
uid=uid,
address=address,
port=int(port),
trustLevel=int(trustlevel),
publicKey=pubkey,
)
obj.configpath = os.path.join(self.configPath, "friends", file)
logger.debug(
"added friend %s (uid=%s, address=%s, port=%s, trustLevel=%s)",
file,
uid,
address,
port,
trustlevel,
)
self.friends.append(obj)
else:
logger.error("Could not add friend '%s' due to errors in the config file", file)
def verifyMessage(self, message):
logger.debug("signature in command class is: %s", message.signature)
logger.debug("attempting to verify command")
if not message.msgType:
logger.warning("Required parameter 'msgType' is missing in received message.")
raise customexceptions.InvalidMessage
if not validators.isInteger(message.msgType):
logger.warning("Invalid parameter 'msgType' in received message, can only be an integer.")
raise customexceptions.InvalidMessage
if version.protocolVersion != message.protocolVersion:
logger.warning(
"The protocol version of the received message (%s) does not match the protocol version of this node (%s).",
message.protocolVersion,
version.protocolVersion,
)
raise customexceptions.InvalidProtocolVersion
if not message.signature:
logger.warning("Signature is missing in received message")
raise customexceptions.InvalidMessage
for hop in message.hops:
if not validators.isAlphaNumeric(hop):
logger.warning("Invalid characters in hops. Only alphanumeric characters are allowed.")
raise customexceptions.InvalidMessage
if not message.parameter:
logger.warning("Message contains no parameters!")
raise customexceptions.InvalidMessage
if message.msgType == 1:
if "AttackerIP" not in message.parameter:
logger.warning("Required parameter 'AttackerIP' is missing in received message.")
raise customexceptions.InvalidMessage
if not validators.isIPv4address(message.parameter["AttackerIP"]):
logger.warning('Invalid parameter "AttackerIP" in received message.')
raise customexceptions.InvalidMessage
if "Timestamp" not in message.parameter:
logger.warning("Required parameter 'Timestamp' is missing in received message.")
raise customexceptions.InvalidMessage
if not validators.isInteger(message.parameter["Timestamp"]):
logger.warning('Invalid parameter "Timestamp" in received message.')
raise customexceptions.InvalidMessage
if "Trustlevel" not in message.parameter:
logger.warning('Required parameter "Trustlevel" in missing in received message.')
raise customexceptions.InvalidMessage
if not (
validators.isInteger(message.parameter["Trustlevel"])
and 0 <= int(message.parameter["Trustlevel"]) <= 100
):
logger.warning('Invalid parameter "Trustlevel" in received message.')
raise customexceptions.InvalidMessage
elif message.msgType in (2, 3):
if "TimeFrame" not in message.parameter:
logger.warning("Required parameter 'TimeFrame' is missing in received message.")
raise customexceptions.InvalidMessage
if not validators.isInteger(message.parameter["TimeFrame"]):
logger.warning('Invalid parameter "TimeFrame" in received message.')
raise customexceptions.InvalidMessage
else:
logger.warning("Unknown message type: %s", message.msgType)
raise customexceptions.InvalidMessage
logger.debug("attempting to verify signature")
last_hop_uid = message.hops[len(message.hops) - 1]
logger.debug("Last hop's uid is: %s", last_hop_uid)
sender = None
pk = None
for peer in self.friends:
logger.debug("Comparing last hops uid (%s) with one of our friends uid (%s)", last_hop_uid, peer.uid)
if peer.uid == last_hop_uid:
logger.debug("The message seems to be from our friend %s (uid: %s)", peer.name, peer.uid)
sender = peer
pk = RSA.load_pub_key(sender.configpath)
break
if last_hop_uid == "local":
logger.debug("This message was signed with our own key.")
c = config.Config()
pk = RSA.load_pub_key(c.pubkey)
sender = "local"
if sender is None or pk is None:
logger.warning("The message could not be mapped to one of our friends!")
raise customexceptions.InvalidSignature
verifier = EVP.PKey()
verifier.assign_rsa(pk)
verifier.verify_init()
verifier.verify_update(json.dumps(message.toSerializableDict()).encode("utf-8"))
if verifier.verify_final(bytes.fromhex(message.signature)) != 1:
logger.warning("Signature doesnt match!")
return False
logger.debug("Signature verified successfully")
message.sender = sender
return True
def dumpBanlist(self, timeframe):
banlist = []
if not timeframe:
timeframe = 3600
timeframestart = int(time()) - int(timeframe)
logger.debug("Dumping all nodes that were inserted after %s", timeframestart)
for entry in self.banList:
if int(entry["Timestamp"]) > int(timeframestart):
banlist.append(entry)
return json.dumps(banlist)
def requestBanlist(self):
for peer in self.friends:
logger.debug("Sending dump request to %s", peer.name)
command = Command()
command.msgType = 3
command.hops = [self.uid]
command.protocolVersion = version.protocolVersion
command.parameter = {"TimeFrame": self.banTime or 3600}
peer.sendCommand(command)
def cleanBanlist(self):
logger.debug("Purging all entries from banlist that are older than %s seconds.", self.banTime)
if self.banList:
banListKeep = []
for ban in self.banList:
if ban["Timestamp"] + self.banTime > time():
banListKeep.append(ban)
else:
logger.info("Removed %s from internal banlist because the ban has expired.", ban["AttackerIP"])
self.banList = banListKeep
global cleaner
cleaner = threading.Timer(60, self.cleanBanlist)
cleaner.start()
def cleanBanlistStop(self):
cleaner.cancel()