from abc import ABC, abstractmethod
from datetime import datetime
import json
from http.client import HTTPResponse
import asyncio
import httpx
try:
from ..general_utilities import (
LOGGER,
get_env_variable,
wait_for_process_completion,
create_and_activate_network_interface,
mirror_network_traffic_to_interface,
remove_network_interface,
stop_process,
)
except ImportError: # allow running as a top-level module in tests
from general_utilities import (
LOGGER,
get_env_variable,
wait_for_process_completion,
create_and_activate_network_interface,
mirror_network_traffic_to_interface,
remove_network_interface,
stop_process,
)
import ast
"""
Module to provide generic base classes foir the IDS containers to implement their functionality and parse log lines into the common Alert format
"""
[docs]
class Alert:
"""
Class which contains the most important fields of an alert (one line of anomaly).
It presents a standardized interface for the different IDS to map their distinct alerts to.
"""
[docs]
def __init__(
self,
time=None,
source_ip=None,
source_port=None,
destination_ip=None,
destination_port=None,
severity=None,
type=None,
message=None,
):
"""
Initializes an Alert object with optional attributes.
Args:
time (str, optional): Timestamp of the alert.
source_ip (str, optional): Source IP address.
source_port (str, optional): Source port number.
destination_ip (str, optional): Destination IP address.
destination_port (str, optional): Destination port number.
severity (float, optional): Severity level of the alert.
type (str, optional): Type of the alert.
message (str, optional): Description of the alert.
"""
self.time = time
self.source_ip = source_ip
self.source_port = source_port
self.destination_ip = destination_ip
self.destination_port = destination_port
self.severity = severity
self.type = type
self.message = message
def __eq__(self, other):
if not isinstance(other, Alert):
return False
return (
self.time == other.time
and self.source_ip == other.source_ip
and self.source_port == other.source_port
and self.destination_ip == other.destination_ip
and self.destination_port == other.destination_port
)
def __hash__(self):
return hash(
(
self.time,
self.source_ip,
self.source_port,
self.destination_ip,
self.destination_port,
)
)
[docs]
@classmethod
def from_json(cls, json_alert: str):
"""
Creates an Alert object from a JSON string.
Args:
json_alert (str): JSON representation of an alert.
Returns:
Alert: An instance of the Alert class.
"""
try:
alert_dict = ast.literal_eval(json_alert)
except Exception as e:
json_str = json_alert.replace("None", "null").replace("'", '"')
alert_dict = json.loads(json_str)
return Alert(
time=alert_dict["time"],
source_ip=alert_dict["source_ip"],
source_port=alert_dict["source_port"],
destination_ip=alert_dict["destination_ip"],
destination_port=alert_dict["destination_port"],
severity=alert_dict["severity"],
type=alert_dict["type"],
message=alert_dict["message"],
)
[docs]
def __str__(self):
"""
Returns a string representation of the alert.
Returns:
str: Readable format of the alert.
"""
return f"{self.time}, From: {self.source_ip}:{self.source_port}, To: {self.destination_ip}:{self.destination_port}, Type: {self.type}, Content: {self.message}, Severity: {self.severity}"
[docs]
def to_dict(self):
"""
Converts the alert object to a dictionary.
Returns:
dict: Dictionary representation of the alert.
"""
return {
"time": self.time,
"source_ip": self.source_ip,
"source_port": self.source_port,
"destination_ip": self.destination_ip,
"destination_port": self.destination_port,
"severity": self.severity,
"type": self.type,
"message": self.message,
}
[docs]
def to_json(self):
"""
Converts the alert object to a JSON string.
Returns:
str: JSON representation of the alert.
"""
return json.dumps(self.to_dict())
[docs]
class IDSParser(ABC):
"""
Abstract base class for parsing alerts from IDS logs.
"""
# use the isoformat as printed below to return the timestamps of the parsed lines
timestamp_format = "%Y-%m-%dT%H:%M:%S.%f%z"
@property
@abstractmethod
async def alert_file_location(self):
"""Abstract property for specifying the location of the alert file."""
pass
[docs]
@abstractmethod
async def parse_alerts(self) -> list[Alert]:
"""
Method triggered once after the static analysis is complete or periodically for a network analysis.
Takes in the whole file, reads it, parses it, deletes it.
Returns:
list[Alert]: List of parsed alerts.
"""
pass
[docs]
@abstractmethod
async def parse_line(self, line) -> Alert:
"""
Parses a single line into an Alert object.
Args:
line (str): A single log line.
Returns:
Alert: Parsed alert object.
"""
pass
[docs]
@abstractmethod
async def normalize_threat_levels(self, threat: int) -> float:
"""
Normalizes threat levels to a range of 0 to 1.
Args:
threat (int): Threat level from the IDS.
Returns:
float: Normalized threat level rounded to two decimals.
"""
pass
[docs]
class IDSBase(ABC):
"""
Abstract base class for all IDS supported by BICEP
Each IDS involved needs to inherit from this base class and implement the following methods and attributes
"""
[docs]
def __init__(
self,
container_id: int = None,
container_name: str = None,
ensemble_id: int = None,
pids: list[int] = [],
dataset_id: int = None,
static_analysis_running: bool = False,
send_alerts_periodically_task=None,
tap_interface_name: str = None,
background_tasks: set = set(),
):
"""
Constructor of the IDSBase class
Args:
container_id (int): = None,
container_name (str): = None,
ensemble_id (int): = None,
pids (list[int]): = [],
dataset_id (int): = None,
static_analysis_running (bool): = False,
send_alerts_periodically_task : = None,
tap_interface_name (str): = None,
background_tasks (set): = set(),
"""
self.container_id: int = container_id
self.container_name: str = container_name
self.ensemble_id: int = ensemble_id
self.pids: list[int] = pids
# Id of the dataset used to trigger a static analysis
self.dataset_id: int = dataset_id
self.static_analysis_running: bool = static_analysis_running
self.send_alerts_periodically_task = send_alerts_periodically_task
self.tap_interface_name: str = tap_interface_name
self.background_tasks = background_tasks
self.analysis_start_time = None
self.analysis_stop_time = None
@property
@abstractmethod
async def parser(self):
"""
Abstract property to reference the repsective IDS Parser.
"""
pass
@property
@abstractmethod
async def log_location(self):
"""Abstract property specifying the log location."""
pass
@property
@abstractmethod
async def configuration_location(self):
"""Abstract property specifying the configuration location."""
pass
[docs]
@abstractmethod
async def execute_static_analysis_command(self, file_path: str) -> int:
"""
Executes the IDS command for static analysis using a pcap file.
Args:
file_path (str): Path to the pcap file.
Returns:
int: Process ID of the spawned IDS process.
"""
pass
[docs]
@abstractmethod
async def execute_network_analysis_command(self) -> int:
"""
Method that takes all actions necessary to execute the IDS command for a network analysis on the self.tap_interface.
Returns:
int: Process ID of the spawned IDS process.
"""
pass
[docs]
async def stop_all_processes(self):
"""
Stops all running IDS processes (static or network analysis tasks).
"""
remove_process_ids = []
for pid in self.pids:
await stop_process(pid)
remove_process_ids.append(pid)
for removed_pid in remove_process_ids:
self.pids.remove(removed_pid)
[docs]
async def send_alerts_to_core_periodically(self, period: float = 300):
"""
Background method to collect all currently available alerts, parses them and sends them to the Core.
The method will erase all logfiles so far after the collection to ensure that the same alerts are not send twice.
Method stops only when the analysis gets stopped.
Args:
period (float): The period in seconds when to send the next batch to the core
"""
try:
if self.ensemble_id == None:
endpoint = f"/ids/publish/alerts"
else:
endpoint = f"/ensemble/publish/alerts"
# tell the core to stop/set status to idle again
core_url = await get_env_variable("CORE_URL")
while True:
alerts: list[Alert] = await self.parser.parse_alerts()
json_alerts = [a.to_dict() for a in alerts]
data = {
"container_id": self.container_id,
"ensemble_id": self.ensemble_id,
"alerts": json_alerts,
"analysis_type": "network",
"dataset_id": None,
}
try:
async with httpx.AsyncClient() as client:
# set timeout to 90 seconds to be able to send all alerts
response: HTTPResponse = await client.post(
core_url + endpoint, json=data, timeout=90
)
except Exception as e:
LOGGER.error(
"Something went wrong during alert sending... retrying on next iteration"
)
await asyncio.sleep(period)
except asyncio.CancelledError as e:
LOGGER.info(f"Canceled the sending of alerts")
[docs]
async def send_alerts_to_core(self) -> HTTPResponse:
"""
Method to collect all currently available alerts, parses them and sends them to the Core.
The method will erase all logfiles so far after the collection to ensure that the same alerts are not send twice.
This method will be executed once after a static analysis.
"""
if self.ensemble_id == None:
endpoint = f"/ids/publish/alerts"
else:
endpoint = f"/ensemble/publish/alerts"
# tell the core to stop/set status to idle again
core_url = await get_env_variable("CORE_URL")
alerts: list[Alert] = await self.parser.parse_alerts()
json_alerts = [a.to_dict() for a in alerts]
data = {
"container_id": self.container_id,
"ensemble_id": self.ensemble_id,
"alerts": json_alerts,
"analysis_type": "static",
"dataset_id": self.dataset_id,
"start_time": self.analysis_start_time,
"stop_time": self.analysis_stop_time,
}
async with httpx.AsyncClient() as client:
# set timeout to 600, to be able to send all alerts
response: HTTPResponse = await client.post(
core_url + endpoint, json=data, timeout=300
)
LOGGER.info("Send all alerts to the core")
# remove dataset here, becasue removing it in tell_core function removes the id before using it here otehrwise
if self.dataset_id != None:
self.dataset_id = None
return response
# TODO 0: make prints to correct log statements
[docs]
async def finish_static_analysis_in_background(self):
await self.send_alerts_to_core()
await self.tell_core_analysis_has_finished()
[docs]
async def tell_core_analysis_has_finished(self) -> HTTPResponse:
"""
Method to tell the Core that the analysis has been finished.
"""
if self.ensemble_id == None:
endpoint = f"/ids/analysis/finished"
else:
endpoint = f"/ensemble/analysis/finished"
data = {"container_id": self.container_id, "ensemble_id": self.ensemble_id}
# tell the core to stop/set status to idle again
core_url = await get_env_variable("CORE_URL")
# reset ensemble id to wait if next analysis is for ensemble or ids solo
async with httpx.AsyncClient() as client:
response: HTTPResponse = await client.post(
core_url + endpoint, json=data
)
# reset ensemble id after each analysis is completed to keep track if analysis has been triggered for ensemble or not
if self.ensemble_id != None:
self.ensemble_id = None
if self.analysis_start_time != None:
self.analysis_start_time = None
if self.analysis_stop_time != None:
self.analysis_stop_time = None
return response
[docs]
async def start_network_analysis(self) -> str:
"""
Method to start a network anaylsis. Ensures that necessary tap interface is available and that traffic replication has started for that tap interface.
Returns:
str: Confirmation string that the analysis has been started.
"""
# set tap name if not done already
if self.tap_interface_name is None:
self.tap_interface_name = f"tap{self.container_id}"
await create_and_activate_network_interface(self.tap_interface_name)
default_interface = await self.get_default_interface_name()
pid = await mirror_network_traffic_to_interface(
default_interface=default_interface, tap_interface=self.tap_interface_name
)
self.pids.append(pid)
start_ids = await self.execute_network_analysis_command()
self.pids.append(start_ids)
self.send_alerts_periodically_task = asyncio.create_task(
self.send_alerts_to_core_periodically()
)
LOGGER.debug(f"started network analysis for container with {self.container_id}")
return f"started network analysis for container with {self.container_id}"
[docs]
async def get_default_interface_name(self) -> str:
"""
Method to receive the name of the main interface by looking into the ip routes.
Returns:
interface_name (str): The interface name of the main network interface
"""
# command retourns the device of the default route configured
# As the container is mounted in the host network, this is alwas the hosts primary interface
command = "ip route list | grep default | awk '{print $5} '"
try:
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Command failed: {stderr.decode().strip()}")
interface_name = stdout.decode().strip()
return interface_name
except Exception as e:
LOGGER.error(
f"During the command execution something went wrong in the environment"
)
raise e
[docs]
async def start_static_analysis(self, file_path):
"""
Method to start a static analysis
Args:
file_path (str): The file path to the dataset file to trigger the static analysis on.
"""
pid = await self.execute_static_analysis_command(file_path)
self.pids.append(pid)
self.analysis_start_time = datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f")
await wait_for_process_completion(pid)
self.analysis_stop_time = datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f")
if pid in self.pids:
self.pids.remove(pid)
else:
print(
f"PID {pid} was already removed from pid list {self.pids} via another subprocess"
)
LOGGER.info(f"Process for static analysis finished")
if self.static_analysis_running:
task = asyncio.create_task(self.finish_static_analysis_in_background())
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
self.static_analysis_running = False
else:
await self.stop_analysis()
# overrides the default method
[docs]
async def stop_analysis(self):
"""
Method to stop any analysis by stopping all processes in the background.
Afterward, tells the core that the analysis has been comlpeted.
"""
self.static_analysis_running = False
await self.stop_all_processes()
if self.send_alerts_periodically_task != None:
if not self.send_alerts_periodically_task.done():
self.send_alerts_periodically_task.cancel()
self.send_alerts_periodically_task = None
if self.tap_interface_name != None:
await remove_network_interface(self.tap_interface_name)
await self.tell_core_analysis_has_finished()