Source code for app.models.ensemble_ids
from sqlalchemy import Column, ForeignKey, Integer, String
from app.database import Base
from app.utils import ANALYSIS_STATUS
from sqlalchemy.future import select
from app.logger import LOGGER
from sqlalchemy.ext.asyncio import AsyncSession
[docs]
class EnsembleIds(Base):
__tablename__ = "ensemble_ids"
id = Column(Integer, primary_key=True, autoincrement=True)
ensemble_id = Column(Integer, ForeignKey("ensemble.id"))
ids_system_id = Column(Integer, ForeignKey("ids_system.id"))
status = Column(String(32))
[docs]
async def get_ensemble_ids_by_ids(db: AsyncSession, ensemble_id: int, system_id: int):
stmt = select(EnsembleIds).where(
EnsembleIds.ensemble_id == ensemble_id, EnsembleIds.ids_system_id == system_id
)
result = await db.execute(stmt)
return result.scalar_one_or_none()
[docs]
async def get_all_ensemble_container(db: AsyncSession):
stmt = select(EnsembleIds)
result = await db.execute(stmt)
return result.scalars().all()
[docs]
async def last_container_sending_logs(db: AsyncSession, container, ensemble):
stmt = select(EnsembleIds).where(
EnsembleIds.ensemble_id == ensemble.id,
EnsembleIds.ids_system_id != container.id,
)
result = await db.execute(stmt)
analysis_status_of_other_containers_in_ensemble = result.scalars().all()
for entry in analysis_status_of_other_containers_in_ensemble:
await db.refresh(entry)
if entry.status == ANALYSIS_STATUS.PROCESSING.value:
return False
return True
[docs]
async def update_sendig_logs_status(
db: AsyncSession, container, ensemble, status: ANALYSIS_STATUS
):
stmt = select(EnsembleIds).where(
EnsembleIds.ensemble_id == ensemble.id,
EnsembleIds.ids_system_id == container.id,
)
result = await db.execute(stmt)
entry: EnsembleIds = result.scalar_one_or_none()
if entry:
await db.refresh(entry)
entry.status = status
await db.commit()
await db.refresh(entry)