Source code for app.bicep_utils.tests.test_routes

import pytest
import os
import asyncio
import psutil
import subprocess
import json
from unittest.mock import AsyncMock, patch, MagicMock
from starlette.datastructures import UploadFile
from BICEP_Utils.validation.models import NetworkAnalysisData 
from BICEP_Utils.general_utilities import (
    save_file,
    get_env_variable,
    execute_command_async,
    stop_process,
    wait_for_process_completion,
    create_and_activate_network_interface,
    mirror_network_traffic_to_interface,
    remove_network_interface
)
from fastapi import status
from BICEP_Utils.fastapi.routes import *
from BICEP_Utils.fastapi.dependencies import get_ids_instance
from BICEP_Utils.models.ids_base import IDSBase

[docs] @pytest.fixture def mock_ids(): mock = AsyncMock(spec=IDSBase) mock.container_id = 1 mock.container_name = None mock.ensemble_id = None mock.configure = AsyncMock(return_value="Succesfully configured") mock.start_network_analysis = AsyncMock(return_value="Started Network Analysis") mock.configure_ruleset = AsyncMock(return_value = "Succesfully configured Ruleset") return mock
[docs] @pytest.mark.asyncio async def test_healthcheck(): response = await healthcheck() response_json = json.loads(response.body.decode()) assert response.status_code == 200 assert response_json == {"message": "healthy"}
[docs] @patch("BICEP_Utils.fastapi.routes.save_file") @pytest.mark.asyncio async def test_configuration(save_file_mock, mock_ids): container_id = "1" mock_file = MagicMock(spec=UploadFile) response = await configure(container_id=container_id,container_name=mock_ids.container_name,file=mock_file,ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 200 assert response_json == {'message': mock_ids.configure.return_value}
[docs] @pytest.mark.asyncio async def test_configuration_file_is_none(mock_ids): container_id="1" mock_file = None response = await configure(container_id=container_id,container_name=mock_ids.container_name,file=mock_file,ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 400 assert response_json == {"error": "No file provided"}
[docs] @patch("BICEP_Utils.fastapi.routes.save_file") @pytest.mark.asyncio async def test_ruleset(save_file_mock, mock_ids): mock_file = MagicMock(spec=UploadFile) response = await ruleset(file=mock_file,ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 200 assert response_json == {'message': mock_ids.configure_ruleset.return_value}
[docs] @pytest.mark.asyncio async def test_ruleset_file_is_none(mock_ids): mock_file = None response = await ruleset(file=mock_file,ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 400 assert response_json == {"error": "No file provided"}
[docs] @pytest.mark.asyncio async def test_add_to_ensemble_with_incorrect_id(mock_ids): response = await add_to_ensemble(ensemble_id=mock_ids.ensemble_id, ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 500 assert response_json == {"error": "Ensemble ID was None!"}
[docs] @pytest.mark.asyncio async def test_add_to_ensemble(mock_ids): mock_ids.ensemble_id = 1 response = await add_to_ensemble(ensemble_id=mock_ids.ensemble_id, ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 200 assert response_json == {"message": f"Added IDS to ensemble {mock_ids.ensemble_id}"}
[docs] @pytest.mark.asyncio async def test_remove_from_ensemble(mock_ids): mock_ids.ensemble_id = 1 response = await remove_from_ensemble(mock_ids) assert response.status_code == 200 assert mock_ids.ensemble_id == None
[docs] @patch("BICEP_Utils.fastapi.routes.save_file") @patch("shutil.copyfileobj") @pytest.mark.asyncio async def test_static_analysis(copy_mock, save_file_mock, mock_ids): dataset_id = "1" dataset = MagicMock(spec=UploadFile) dataset.file = True response = await static_analysis(ensemble_id=mock_ids.ensemble_id, dataset_id=dataset_id, container_id=mock_ids.container_id, dataset=dataset,ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 200 assert response_json == {"message": f"Started analysis for container {mock_ids.container_id}"}
[docs] @patch("BICEP_Utils.fastapi.routes.save_file") @pytest.mark.asyncio async def test_static_analysis_no_file_provided(save_file_mock, mock_ids): dataset_id = "1" dataset = None response = await static_analysis(ensemble_id=mock_ids.ensemble_id, dataset_id=dataset_id, container_id=mock_ids.container_id, dataset=dataset,ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 400 assert response_json == {"error": "No file provided"}
[docs] @pytest.mark.asyncio async def test_network_analysis(mock_ids): network_analysis_data= NetworkAnalysisData( container_id = 1, ensemble_id=None ) response = await network_analysis(network_analysis_data=network_analysis_data,ids=mock_ids) print(response) response_json = json.loads(response.body.decode()) print(response_json) assert response.status_code == 200 assert response_json == {"message": mock_ids.start_network_analysis.return_value}
[docs] @pytest.mark.asyncio async def test_network_analysis_for_ensemble(mock_ids): network_analysis_data= NetworkAnalysisData( container_id = 1, ensemble_id=1 ) response = await network_analysis(network_analysis_data=network_analysis_data,ids=mock_ids) response_json = json.loads(response.body.decode()) assert response.status_code == 200 assert response_json == {"message": mock_ids.start_network_analysis.return_value} assert mock_ids.ensemble_id == network_analysis_data.ensemble_id
[docs] @pytest.mark.asyncio async def test_stop_analysis(mock_ids): mock_ids.dataset_id = 2 mock_ids.ensemble_id = 3 response = await stop_analysis(ids=mock_ids) resposne_json = json.loads(response.body.decode()) assert response.status_code == 200 assert resposne_json == {'message': 'successfully stopped analysis'} assert mock_ids.dataset_id == None assert mock_ids.ensemble_id == None