Source code for app.models.ensemble_technique
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from app.database import Base
from app.logger import LOGGER
from app.models.ensemble_techniques_implementation import *
import importlib
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
[docs]
class EnsembleTechnique(Base):
__tablename__ = "ensemble_technique"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(64), nullable=False)
description = Column(String(2048), nullable=False)
function_name = Column(String(128), nullable=False)
ensemble = relationship('Ensemble', back_populates='ensemble_technique', lazy="selectin")
[docs]
async def execute_technique_by_name_on_alerts(self, alerts_dict: dict, ensemble):
module = self._import_ensemble_technique_module()
func = getattr(module, self.function_name)
# common_alerts = await combine_alerts_for_ids_in_alert_dict(alerts_dict)
return await func(alerts_dict=alerts_dict, ensemble=ensemble)
def _import_ensemble_technique_module(self):
# in the container the code is injected as backend, not as app, therefor backend.models....
module_name = f"app.models.ensemble_techniques_implementation.{self.function_name.lower()}"
try:
module = importlib.import_module(module_name)
return module
except ModuleNotFoundError as e:
LOGGER.error(f"Module {module_name} not found: {e}")
raise
[docs]
async def get_all_ensemble_techniques(db: AsyncSession):
stmt = select(EnsembleTechnique)
result = await db.execute(stmt)
return result.scalars().all()
[docs]
async def get_ensemble_technique_by_id(db: AsyncSession, id: int):
stmt = select(EnsembleTechnique).where(EnsembleTechnique.id == id)
return await db.execute(stmt).scalar_one_or_none()