Source code for app.models.docker_host_system

from app.database import Base, SessionLocal
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.utils import (
    DOCKER_HOST_STATUS,
    METRIC_SERVICE_STATUS,
    get_core_external_ip,
    get_external_prometheus_push_gateway_url,
    get_core_host_ip,
    get_core_url,
    get_prometheus_push_gateway_url,
)
from app.deployment.deployment_plugins.docker import (
    DOCKER_CONTROL_CLIENT_TIMEOUT,
    DOCKER_DEPLOYMENT_CLIENT_TIMEOUT,
    ensure_image_present,
    get_docker_client,
)
from app.logger import LOGGER
import asyncio
import httpx
import os
import random
import socket
from datetime import datetime, timezone
from urllib.parse import urlparse
import docker as docker_sdk
from app.models.metric_service import (
    get_metric_service_by_host_id,
    get_or_create_metric_service,
    update_metric_service,
)
METRIC_SERVICE_DEFAULT_IMAGE_NAME = "ghcr.io/bicep-pump/metric-service"
METRIC_SERVICE_DEFAULT_IMAGE_VERSION = "latest"
METRIC_SERVICE_DEFAULT_NAME = "bicep-metric-service"
METRIC_SERVICE_DEFAULT_SCRAPE_INTERVAL = "5"
METRIC_SERVICE_DEFAULT_BATCH_SIZE = "10"
METRIC_SERVICE_DEFAULT_EXPORT_MODE = "prometheus"
METRIC_SERVICE_HEALTHCHECK_PATH = "/health"
METRIC_SERVICE_REGISTRATION_STUCK_TIMEOUT_SECONDS = int(
    os.getenv("METRIC_SERVICE_REGISTRATION_STUCK_TIMEOUT_SECONDS", "45")
)
METRIC_SERVICE_REDEPLOY_TIMEOUT_SECONDS = int(
    os.getenv("METRIC_SERVICE_REDEPLOY_TIMEOUT_SECONDS", "120")
)
METRIC_SERVICE_PORT_RANGE_START = 20000
METRIC_SERVICE_PORT_RANGE_END = 60000
METRIC_SERVICE_PORT_SELECTION_ATTEMPTS = 25
METRIC_SERVICE_DEPLOYMENT_SCHEDULED = "scheduled"
METRIC_SERVICE_DEPLOYMENT_ALREADY_RUNNING = "already_running"
METRIC_SERVICE_DEPLOYMENT_FAILED = "failed"

_metric_service_deployment_tasks: dict[int, asyncio.Task] = {}
_metric_service_unhealthy_since: dict[int, datetime] = {}


[docs] class DockerHostSystem(Base): __tablename__ = "docker_host_system" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(128), nullable=False) host = Column(String(1024), nullable=False) docker_port = Column(Integer) status = Column(String(64)) container = relationship("IdsSystem", back_populates="host_system", lazy="selectin") metric_service = relationship( "MetricService", back_populates="docker_host_system", lazy="selectin", uselist=False, cascade="all, delete-orphan", )
[docs] def get_metric_service_container_name(self) -> str: return f"bicep-metric-service-{self.id}"
[docs] def get_metric_service_prometheus_job_name(self) -> str: return f"metric_service_host_{self.id}"
[docs] def get_metric_service_image_name(self) -> str: return os.environ.get( "METRIC_SERVICE_IMAGE_NAME", METRIC_SERVICE_DEFAULT_IMAGE_NAME )
[docs] def get_metric_service_image_version(self) -> str: return os.environ.get( "METRIC_SERVICE_IMAGE_VERSION", METRIC_SERVICE_DEFAULT_IMAGE_VERSION )
[docs] def get_metric_service_image(self) -> str: return ( f"{self.get_metric_service_image_name()}:" f"{self.get_metric_service_image_version()}" )
[docs] def is_core_host(self) -> bool: return self.host == "localhost" or "Core" in self.name
[docs] def get_metric_service_core_base_url(self) -> str: if self.is_core_host(): return f"http://127.0.0.1:{os.getenv('EXTERNAL_FASTAPI_PORT', '8000')}" return get_core_url()
[docs] def get_metric_service_pushgateway_base_url(self) -> str: if self.is_core_host(): parsed = urlparse(get_prometheus_push_gateway_url()) scheme = parsed.scheme or "http" port = parsed.port or 9091 path = parsed.path.rstrip("/") suffix = path if path else "" return f"{scheme}://127.0.0.1:{port}{suffix}" return get_external_prometheus_push_gateway_url()
[docs] def get_metric_service_metric_endpoint(self) -> str: return ( f"{self.get_metric_service_pushgateway_base_url()}/metrics/job/" f"{self.get_metric_service_prometheus_job_name()}" )
[docs] def get_host_and_docker_port(self) -> tuple: if self.is_core_host(): core_host = get_core_host_ip() return (core_host, self.docker_port) else: return (self.host, self.docker_port)
[docs] def resolve_host_aliases(self) -> set[str]: aliases = {self.host.lower()} accessible_host, _ = self.get_host_and_docker_port() aliases.add(accessible_host.lower()) registration_ip = self.get_metric_service_registration_ip() aliases.add(registration_ip.lower()) if self.host == "localhost": aliases.add("127.0.0.1") aliases.add("localhost") for candidate in {self.host, accessible_host, registration_ip}: try: _, _, ips = socket.gethostbyname_ex(candidate) aliases.update(ip.lower() for ip in ips) except Exception: continue return aliases
[docs] def get_metric_service_registration_ip(self) -> str: accessible_host, _ = self.get_host_and_docker_port() try: _, _, ips = socket.gethostbyname_ex(accessible_host) if ips: return ips[0] except Exception: pass return accessible_host
[docs] async def get_metric_service_registration_ip_async(self) -> str: return await asyncio.to_thread(self.get_metric_service_registration_ip)
[docs] def get_metric_service_registration_endpoint(self) -> str: base_endpoint = f"{self.get_metric_service_core_base_url()}/metric-services/register" if self.id is None: return base_endpoint return f"{base_endpoint}/{self.id}"
[docs] async def is_metric_service_port_available( self, port: int, timeout: float = 0.5 ) -> bool: host, _ = self.get_host_and_docker_port() try: _, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout ) except Exception: return True writer.close() await writer.wait_closed() return False
[docs] async def choose_metric_service_port(self) -> int: for _ in range(METRIC_SERVICE_PORT_SELECTION_ATTEMPTS): candidate = random.randint( METRIC_SERVICE_PORT_RANGE_START, METRIC_SERVICE_PORT_RANGE_END ) if await self.is_metric_service_port_available(candidate): return candidate raise RuntimeError("Could not allocate an available port for the metric service.")
async def _deploy_metric_service(self, db: AsyncSession): metric_service = await get_or_create_metric_service( db, self.id, name=METRIC_SERVICE_DEFAULT_NAME, status=METRIC_SERVICE_STATUS.DEPLOYING.value, status_message="Metric service missing. Deploying it now.", ) client = get_docker_client( self, timeout=DOCKER_DEPLOYMENT_CLIENT_TIMEOUT, ) try: image_name = self.get_metric_service_image() await ensure_image_present(client, image_name) selected_port = await self.choose_metric_service_port() registration_ip = await self.get_metric_service_registration_ip_async() await update_metric_service( db, metric_service, name=METRIC_SERVICE_DEFAULT_NAME, port=selected_port, status=METRIC_SERVICE_STATUS.DEPLOYING.value, status_message="Metric service missing. Deploying it now.", clear_registration=True, ) container = await asyncio.to_thread( client.containers.create, image=image_name, name=self.get_metric_service_container_name(), network_mode="host", privileged=True, cap_add=["SYS_ADMIN"], detach=True, restart_policy={"Name": "unless-stopped"}, volumes={ "/var/run/docker.sock": { "bind": "/var/run/docker.sock", "mode": "ro", }, "/sys/fs/cgroup": { "bind": "/sys/fs/cgroup", "mode": "ro", }, }, labels={ "bicep.metric-service": "true", "bicep.host-id": str(self.id), }, environment={ "METRIC_EXPORT_MODE": METRIC_SERVICE_DEFAULT_EXPORT_MODE, "METRIC_ENDPOINT": self.get_metric_service_metric_endpoint(), "REGISTRATION_ENDPOINT": self.get_metric_service_registration_endpoint(), "SCRAPE_INTERVAL": METRIC_SERVICE_DEFAULT_SCRAPE_INTERVAL, "BATCH_SIZE": METRIC_SERVICE_DEFAULT_BATCH_SIZE, "SERVICE_NAME": METRIC_SERVICE_DEFAULT_NAME, "SERVICE_PORT": str(selected_port), "SERVICE_IP": registration_ip, }, ) await asyncio.to_thread(container.start) await update_metric_service( db, metric_service, name=METRIC_SERVICE_DEFAULT_NAME, port=selected_port, status=METRIC_SERVICE_STATUS.REGISTERING.value, status_message="Metric service deployed. Waiting for registration.", ) except Exception as exc: await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.UNAVAILABLE.value, status_message=f"Metric service deployment failed: {exc}", ) LOGGER.error(f"Failed to deploy metric service on {self.name}: {exc}") finally: client.close() def _ensure_metric_service_deployment_task(self) -> str: if self.id is None or SessionLocal is None: LOGGER.error( "Could not schedule metric service deployment for host %s.", self.name, ) return METRIC_SERVICE_DEPLOYMENT_FAILED existing_task = _metric_service_deployment_tasks.get(self.id) if existing_task is not None: if not existing_task.done(): return METRIC_SERVICE_DEPLOYMENT_ALREADY_RUNNING _metric_service_deployment_tasks.pop(self.id, None) host_snapshot = DockerHostSystem( id=self.id, name=self.name, host=self.host, docker_port=self.docker_port, status=self.status, ) async def run_deployment() -> None: db = SessionLocal() try: await host_snapshot._deploy_metric_service(db) except Exception as exc: LOGGER.error( "Background metric service deployment failed on %s: %s", self.name, exc, ) finally: await db.close() _metric_service_deployment_tasks.pop(self.id, None) _metric_service_deployment_tasks[self.id] = asyncio.create_task( run_deployment(), name=f"metric-service-deploy-{self.id}", ) return METRIC_SERVICE_DEPLOYMENT_SCHEDULED async def _mark_metric_service_unavailable( self, db: AsyncSession | None, message: str ) -> None: if db is None or self.id is None: return metric_service = await get_or_create_metric_service( db, self.id, name=METRIC_SERVICE_DEFAULT_NAME, status=METRIC_SERVICE_STATUS.UNAVAILABLE.value, status_message=message, ) await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.UNAVAILABLE.value, status_message=message, ) async def _metric_service_healthcheck(self, ip: str, port: int) -> bool: try: async with httpx.AsyncClient(timeout=3.0) as client: response = await client.get( f"http://{ip}:{port}{METRIC_SERVICE_HEALTHCHECK_PATH}" ) return response.status_code == 200 except Exception: return False def _get_metric_service_started_at(self, state: dict) -> datetime | None: started_at = state.get("StartedAt") if not started_at or started_at.startswith("0001-01-01"): return None try: return datetime.fromisoformat(started_at.replace("Z", "+00:00")) except ValueError: LOGGER.warning( "Could not parse metric service StartedAt timestamp for host %s: %s", self.name, started_at, ) return None def _is_metric_service_registration_stuck(self, state: dict) -> bool: started_at = self._get_metric_service_started_at(state) if started_at is None: return False runtime_seconds = ( datetime.now(timezone.utc) - started_at.astimezone(timezone.utc) ).total_seconds() return ( runtime_seconds >= METRIC_SERVICE_REGISTRATION_STUCK_TIMEOUT_SECONDS ) def _should_redeploy_metric_service(self, state: dict, metric_service) -> bool: if metric_service.last_registration_at: return False started_at = self._get_metric_service_started_at(state) if started_at is None: return False runtime_seconds = ( datetime.now(timezone.utc) - started_at.astimezone(timezone.utc) ).total_seconds() return runtime_seconds >= METRIC_SERVICE_REDEPLOY_TIMEOUT_SECONDS def _clear_metric_service_unhealthy_tracker(self) -> None: if self.id is None: return _metric_service_unhealthy_since.pop(self.id, None) def _should_redeploy_unhealthy_metric_service(self) -> bool: if self.id is None: return False now = datetime.now(timezone.utc) unhealthy_since = _metric_service_unhealthy_since.get(self.id) if unhealthy_since is None: _metric_service_unhealthy_since[self.id] = now return False return ( now - unhealthy_since.astimezone(timezone.utc) ).total_seconds() >= METRIC_SERVICE_REDEPLOY_TIMEOUT_SECONDS async def _redeploy_metric_service_container( self, db: AsyncSession, metric_service, *, reason: str, ) -> None: self._clear_metric_service_unhealthy_tracker() try: await self.remove_metric_service_container() except Exception as exc: LOGGER.error( "Failed to remove metric service container from host %s before redeploy: %s", self.name, exc, ) await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.UNAVAILABLE.value, status_message=( f"{reason} Automatic cleanup before redeploy failed: {exc}" ), clear_registration=True, ) return deployment_state = self._ensure_metric_service_deployment_task() if deployment_state == METRIC_SERVICE_DEPLOYMENT_SCHEDULED: await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.DEPLOYING.value, status_message=reason, clear_registration=True, ) elif deployment_state == METRIC_SERVICE_DEPLOYMENT_ALREADY_RUNNING: await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.DEPLOYING.value, status_message="Metric service redeployment is already in progress.", clear_registration=True, ) else: await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.UNAVAILABLE.value, status_message=( "Metric service cleanup finished, but automatic redeployment " "could not be scheduled." ), clear_registration=True, ) async def _check_metric_service_health(self, db: AsyncSession) -> bool: metric_service = await get_metric_service_by_host_id(db, self.id) if metric_service is None: metric_service = await get_or_create_metric_service( db, self.id, name=METRIC_SERVICE_DEFAULT_NAME, status=METRIC_SERVICE_STATUS.DEPLOYING.value, status_message="Metric service has not been deployed yet.", ) client = get_docker_client(self, timeout=DOCKER_CONTROL_CLIENT_TIMEOUT) try: try: container = await asyncio.to_thread( client.containers.get, self.get_metric_service_container_name() ) except docker_sdk.errors.NotFound: deployment_state = self._ensure_metric_service_deployment_task() if deployment_state == METRIC_SERVICE_DEPLOYMENT_SCHEDULED: await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.DEPLOYING.value, status_message="Metric service missing. Deploying it now.", clear_registration=True, ) elif deployment_state == METRIC_SERVICE_DEPLOYMENT_ALREADY_RUNNING: await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.DEPLOYING.value, status_message="Metric service deployment is already in progress.", ) else: await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.UNAVAILABLE.value, status_message=( "Metric service is missing and could not be redeployed " "automatically." ), ) return False await asyncio.to_thread(container.reload) state = container.attrs.get("State", {}) if not state.get("Running"): if self._should_redeploy_unhealthy_metric_service(): await self._redeploy_metric_service_container( db, metric_service, reason=( "Metric service container stayed unavailable. Removing " "and redeploying it." ), ) return False await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.UNAVAILABLE.value, status_message="Metric service container exists but is not running.", ) return False if not metric_service.ip or not metric_service.port: self._clear_metric_service_unhealthy_tracker() if self._should_redeploy_metric_service(state, metric_service): await self._redeploy_metric_service_container( db, metric_service, reason=( "Metric service did not register in time. Removing and " "redeploying it." ), ) return False if self._is_metric_service_registration_stuck(state): await self._redeploy_metric_service_container( db, metric_service, reason=( "Metric service registration is stuck. Removing and redeploying the " "metric service." ), ) return False await update_metric_service( db, metric_service, status=METRIC_SERVICE_STATUS.REGISTERING.value, status_message="Metric service is running but has not registered yet.", ) return False is_healthy = await self._metric_service_healthcheck( metric_service.ip, metric_service.port ) if is_healthy: self._clear_metric_service_unhealthy_tracker() elif self._should_redeploy_unhealthy_metric_service(): await self._redeploy_metric_service_container( db, metric_service, reason=( "Metric service healthcheck kept failing. Removing and " "redeploying it." ), ) return False await update_metric_service( db, metric_service, status=( METRIC_SERVICE_STATUS.AVAILABLE.value if is_healthy else METRIC_SERVICE_STATUS.UNAVAILABLE.value ), status_message=( "Metric service is healthy." if is_healthy else "Metric service healthcheck failed." ), ) return is_healthy finally: client.close()
[docs] async def remove_metric_service_container(self) -> None: if self.id is None: return client = get_docker_client(self, timeout=DOCKER_CONTROL_CLIENT_TIMEOUT) try: try: container = await asyncio.to_thread( client.containers.get, self.get_metric_service_container_name() ) except docker_sdk.errors.NotFound: return await asyncio.to_thread(container.remove, force=True) finally: client.close()
[docs] async def check_host_health(self, db: AsyncSession | None = None): try: if await self.is_host_reachable(): LOGGER.debug(f"host {self.name} is reachable") client = get_docker_client(self, timeout=DOCKER_CONTROL_CLIENT_TIMEOUT) try: version = await asyncio.to_thread(client.version) finally: client.close() if version: LOGGER.info(f"Docker Host {self.name} is reachable") if db is None: return DOCKER_HOST_STATUS.AVAILABLE.value if await self._check_metric_service_health(db): return DOCKER_HOST_STATUS.AVAILABLE.value return DOCKER_HOST_STATUS.UNAVAILABLE.value else: self._clear_metric_service_unhealthy_tracker() LOGGER.info(f"host {self.name} is not reachable") await self._mark_metric_service_unavailable( db, "Docker host is not reachable." ) except Exception as e: self._clear_metric_service_unhealthy_tracker() LOGGER.error(e) await self._mark_metric_service_unavailable( db, f"Docker host healthcheck failed: {e}" ) return DOCKER_HOST_STATUS.UNAVAILABLE.value
[docs] async def is_host_reachable(self, timeout: float = 2.0) -> bool: if self.is_core_host(): socket_path = os.getenv("DOCKER_SOCKET_PATH", "/var/run/docker.sock") if os.path.exists(socket_path): return True # socket absent – fall back to TCP probe host, port = self.get_host_and_docker_port() try: reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout ) writer.close() await writer.wait_closed() return True except Exception: return False
[docs] async def update_availability(self, db: AsyncSession): old_availability = self.status new_availability = await self.check_host_health(db) if old_availability != new_availability: LOGGER.debug( f"Host {self.name} changed its availability from {old_availability} to {new_availability}" ) await set_host_status(db, self, new_availability) LOGGER.debug(f"Changed status from host {self.name} to {new_availability}")
[docs] async def set_host_status( db: AsyncSession, host: DockerHostSystem, status: DOCKER_HOST_STATUS ): host.status = status await db.commit() await db.refresh(host)
[docs] async def get_host_by_id(db: AsyncSession, id: int): stmt = select(DockerHostSystem).where(DockerHostSystem.id == id) result = await db.execute(stmt) return result.scalar_one_or_none()
[docs] async def get_all_hosts(db: AsyncSession): stmt = select(DockerHostSystem) result = await db.execute(stmt) return result.scalars().all()
[docs] async def add_host_system(db: AsyncSession, host: DockerHostSystem): db.add(host) await db.commit() await db.refresh(host) host.status = await host.check_host_health(db) await db.commit() await db.refresh(host)
[docs] async def remove_host(db: AsyncSession, host_id: int): host: DockerHostSystem = await get_host_by_id(db, host_id) if host is None: return False try: await host.remove_metric_service_container() except Exception as exc: LOGGER.error( f"Failed to remove metric service container from host {host.name}: {exc}" ) raise RuntimeError( f"Could not remove metric service container from host {host.name}." ) from exc await db.delete(host) await db.commit() return True