Source code for app.models.configuration

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from app.models.ids_system import IdsSystem
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import Base
import aiofiles
import base64


[docs] class Configuration(Base): __tablename__ = "configuration" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), nullable=False) file_path = Column(String(1024), nullable=False) file_type = Column(String(32), nullable=False) description = Column(String(2048)) config_type = Column(String(32), nullable=False, default="CONFIGURATION") __mapper_args__ = { "polymorphic_identity": "CONFIGURATION", "polymorphic_on": config_type, } container = relationship("IdsSystem", foreign_keys=[IdsSystem.configuration_id]) containerRuleset = relationship("IdsSystem", foreign_keys=[IdsSystem.ruleset_id]) # TODO add methods to retrieve configurations content
[docs] async def read_content(self) -> str: async with aiofiles.open(self.file_path, mode="r") as f: content = await f.read() return content
[docs] class DeploymentConfig(Configuration): __mapper_args__ = {"polymorphic_identity": "DEPLOYMENT"}
[docs] class RuntimeConfig(Configuration): __mapper_args__ = {"polymorphic_identity": "RUNTIME"}
[docs] class RulesetConfig(Configuration): __mapper_args__ = {"polymorphic_identity": "RULESET"}
[docs] async def get_config_by_id(db: AsyncSession, config_id: int): stmt = select(Configuration).where(Configuration.id == config_id) result = await db.execute(stmt) return result.scalar_one_or_none()
[docs] async def get_all_configurations(db: AsyncSession): stmt = select(Configuration) result = await db.execute(stmt) return result.scalars().all()
[docs] async def remove_configuration_by_id(db: AsyncSession, config_id: int): config = await get_config_by_id(db, config_id) if config: await db.delete(config) await db.commit()
[docs] async def add_config(db: AsyncSession, configuration: Configuration): db.add(configuration) await db.commit() await db.refresh(configuration)
[docs] async def get_all_configurations_by_type(db: AsyncSession, file_type: str): stmt = select(Configuration).where(Configuration.file_type == file_type) result = await db.execute(stmt) return result.scalars().all()
[docs] async def get_serialized_configuration(configuration): serialized_config = { "id": configuration.id, "name": configuration.name, "file_content": base64.b64encode(await configuration.read_content()).decode( "utf-8" ), # Encode binary data to Base64, otherwise error when returning pcap files "file_type": configuration.file_type, "file_path": configuration.file_path, "description": configuration.description, "config_type": configuration.config_type, } return serialized_config