Source code for app.models.dataset

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from app.database import Base
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession

[docs] class Dataset(Base): __tablename__ = "dataset" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), nullable=False) data_file_path = Column(String(1024), nullable=False) labels_file_path = Column(String(1024), nullable=False) description = Column(String(2048), nullable=False) ammount_benign = Column(Integer, nullable=False) ammount_malicious = Column(Integer, nullable=False) timestamp_precision = Column(String(64), nullable=False) dataset_type_id = Column(Integer, ForeignKey("dataset_type.id"), nullable=False) dataset_type = relationship('DatasetType', back_populates="dataset", lazy="selectin")
[docs] async def get_dataset_by_id(db: AsyncSession, dataset_id: int): stmt = select(Dataset).where(Dataset.id == dataset_id) result = await db.execute(stmt) return result.scalar_one_or_none()
[docs] async def get_all_datasets(db: AsyncSession): stmt = select(Dataset) result = await db.execute(stmt) return result.scalars().all()
[docs] async def remove_dataset_by_id(db: AsyncSession, id: int): from ..utils import remove_directory, directory_is_empty dataset: Dataset = await get_dataset_by_id(db, id) if dataset: dataset_directory = "/".join(dataset.labels_file_path.split("/")[:-1]) print(dataset_directory) await remove_directory(dataset_directory) # dataset_parent_directory: # datasets are saved like so: dataset-name/uuid/files.fileending dataset_parent_directory = "/".join(dataset.labels_file_path.split("/")[:-2]) print(dataset_parent_directory) if directory_is_empty(dataset_parent_directory): print("remove") await remove_directory(dataset_parent_directory) await db.delete(dataset) await db.commit()
[docs] async def add_dataset(db: AsyncSession, dataset: Dataset): db.add(dataset) await db.commit() await db.refresh(dataset)