Source code for app.models.ensemble_techniques_implementation.majority_vote
from app.bicep_utils.models.ids_base import Alert
from app.logger import LOGGER
from app.utils import extract_ts_srcip_srcport_dstip_dstport_from_alert
[docs]
async def majority_vote(alerts_dict: dict, ensemble) -> list[Alert]:
"""
Method to calculate which alerts of an ensemble are majority voted ones
Args:
alerts_dict (dict): Dict that holds for each IDS in the ensemble a list of alerts
ensemble: (Ensemble): Ensemble Object according to the ORM
Returns:
majority_voted_alerts (list[Alert]): List of alerts the ensemble voted for
"""
common_alerts = await combine_alerts_for_ids_in_alert_dict(alerts_dict)
ids_container_count = len(ensemble.ensemble_ids)
majority_threshold = ids_container_count / 2
majority_voted_alerts = []
for alert_key, container_dict in common_alerts.items():
# get ammount of container that have at least 1 alert for the alert key left
container_voting_for_alert = sum(1 for alerts in container_dict.values() if len(alerts) > 0)
while container_voting_for_alert > majority_threshold:
cummulative_severity = 0
# there are potentially multiple alerts for each alert key recognized by the IDS
# Iterate over each container alerting and combine alerts and avg severity until no majority is voting for the alert
for container_name, alerts in container_dict.items():
alert: Alert = alerts.pop()
# add alert severity if not none, if none add 0
cummulative_severity += alert.severity if alert.severity is not None else 0
avg_severity = cummulative_severity / container_voting_for_alert
alert.severity = avg_severity
majority_voted_alerts.append(alert)
container_voting_for_alert = sum(1 for alerts in container_dict.values() if len(alerts) > 0)
LOGGER.debug(f"length of total majority voted alerts is {len(majority_voted_alerts)}")
return majority_voted_alerts
[docs]
async def combine_alerts_for_ids_in_alert_dict(alerts_dict: dict) -> dict:
"""
Transforms a dictionary that holds alerts for each IDS in the ensemble into a structured format.
The returned dictionary maps a key composed of `timestamp`, `source_ip`, `source_port`,
`destination_ip`, and `destination_port` to another dictionary. This inner dictionary
contains IDS names as keys and lists of `Alert` objects as values.
Args:
alerts_dict (dict): A dictionary where each key is an IDS name and the
value is a list of `Alert` objects.
Returns:
dict: A dictionary grouping alerts by their common attributes. Each key
is a tuple containing `(timestamp, source_ip, source_port,
destination_ip, destination_port)`, and each value maps IDS names to
lists of matching alerts.
"""
common_alerts = {}
for container_name, alerts in alerts_dict.items():
for alert in alerts:
timestamp, source_ip, source_port, destination_ip, destination_port = extract_ts_srcip_srcport_dstip_dstport_from_alert(alert)
key = (timestamp, source_ip, source_port, destination_ip, destination_port)
common_alerts.setdefault(key, {}).setdefault(container_name, []).extend([alert])
return common_alerts