From 0960a7a141bb9a1e499e9ee357f1cb5b0144f86e Mon Sep 17 00:00:00 2001 From: David Alves Date: Wed, 5 Nov 2025 11:54:34 +0000 Subject: [PATCH] Add ExitNodeProcess and unit tests --- main/src/main/java/sd/ExitNodeProcess.java | 382 ++++++++++++++++++ .../src/test/java/sd/ExitNodeProcessTest.java | 327 +++++++++++++++ 2 files changed, 709 insertions(+) create mode 100644 main/src/main/java/sd/ExitNodeProcess.java create mode 100644 main/src/test/java/sd/ExitNodeProcessTest.java diff --git a/main/src/main/java/sd/ExitNodeProcess.java b/main/src/main/java/sd/ExitNodeProcess.java new file mode 100644 index 0000000..c2a4f2d --- /dev/null +++ b/main/src/main/java/sd/ExitNodeProcess.java @@ -0,0 +1,382 @@ +package sd; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import sd.config.SimulationConfig; +import sd.coordinator.SocketClient; +import sd.model.Message; +import sd.model.MessageType; +import sd.model.Vehicle; +import sd.model.VehicleType; +import sd.protocol.MessageProtocol; +import sd.protocol.SocketConnection; +import sd.serialization.SerializationException; + +/** + * Processo responsável pelo nó de saída do sistema de simulação de tráfego distribuído. + * + * Este processo representa o ponto final ("S") onde os veículos completam as suas rotas. + * As suas principais responsabilidades são: + * - Receber veículos que terminam a sua rota vindos das interseções + * - Calcular e agregar estatísticas finais dos veículos + * - Enviar estatísticas periódicas para o dashboard + * - Gerar relatórios finais ao terminar a simulação + */ +public class ExitNodeProcess { + + private final SimulationConfig config; + private ServerSocket serverSocket; + private final ExecutorService connectionHandlerPool; + + /** Flag para controlar a execução do processo (volatile para visibilidade entre threads) */ + private volatile boolean running; + + /** Counter de veículos que completaram a rota */ + private int totalVehiclesReceived; + + /** Soma dos tempos no sistema de todos os veículos */ + private double totalSystemTime; + + /** Soma dos tempos de espera de todos os veículos */ + private double totalWaitingTime; + + /** Soma dos tempos de travessia de todos os veículos */ + private double totalCrossingTime; + + /** Contagem de veículos por tipo */ + private final Map vehicleTypeCount; + + /** Tempo total de espera acumulado por tipo de veículo */ + private final Map vehicleTypeWaitTime; + + /** Socket para comunicação com o dashboard */ + private SocketClient dashboardClient; + + /** + * Método para iniciar o processo + * + * @param args Argumentos da linha de comandos. Se fornecido, args[0] deve ser + * o caminho para um ficheiro de configuração personalizado. + */ + public static void main(String[] args) { + System.out.println("=".repeat(60)); + System.out.println("EXIT NODE PROCESS"); + System.out.println("=".repeat(60)); + + try { + String configFile = args.length > 0 ? args[0] : "src/main/resources/simulation.properties"; + System.out.println("Loading configuration from: " + configFile); + + SimulationConfig config = new SimulationConfig(configFile); + ExitNodeProcess exitNode = new ExitNodeProcess(config); + + System.out.println("\n" + "=".repeat(60)); + exitNode.initialize(); + + System.out.println("\n" + "=".repeat(60)); + exitNode.start(); + + } catch (IOException e) { + System.err.println("Failed to start exit node: " + e.getMessage()); + System.exit(1); + } catch (Exception e) { + System.err.println("Exit node error: " + e.getMessage()); + System.exit(1); + } + } + + /** + * Constrói um novo processo de nó de saída. + * + * Inicializa todas as estruturas de dados necessárias para recolher estatísticas + * e configura o pool de threads para processar as ligações concorrentes. + * + * @param config Configuração da simulação contendo portas e endereços dos serviços + */ + public ExitNodeProcess(SimulationConfig config) { + this.config = config; + this.connectionHandlerPool = Executors.newCachedThreadPool(); + this.running = false; + + this.totalVehiclesReceived = 0; + this.totalSystemTime = 0.0; + this.totalWaitingTime = 0.0; + this.totalCrossingTime = 0.0; + this.vehicleTypeCount = new HashMap<>(); + this.vehicleTypeWaitTime = new HashMap<>(); + + // Inicializa os counters para cada tipo de veículo + for (VehicleType type : VehicleType.values()) { + vehicleTypeCount.put(type, 0); + vehicleTypeWaitTime.put(type, 0.0); + } + + System.out.println("Exit node initialized"); + System.out.println(" - Exit port: " + config.getExitPort()); + System.out.println(" - Dashboard: " + config.getDashboardHost() + ":" + config.getDashboardPort()); + } + + /** + * Inicializa o processo de ligação ao dashboard. + * + * Tenta conectar-se ao dashboard. Se a ligação falhar, o processo + * continua a funcionar normalmente, mas sem enviar estatísticas. + * + */ + public void initialize() { + System.out.println("Connecting to dashboard..."); + + try { + String host = config.getDashboardHost(); + int port = config.getDashboardPort(); + + dashboardClient = new SocketClient("Dashboard", host, port); + dashboardClient.connect(); + + System.out.println("Successfully connected to dashboard"); + } catch (IOException e) { + System.err.println("WARNING: Failed to connect to dashboard: " + e.getMessage()); + System.err.println("Exit node will continue without dashboard connection"); + } + } + + /** + * Inicia o socket e começa a aceitar ligações. + * + * Este é o loop principal do processo que: + * 1. Cria um socket na porta definida + * 2. Aguarda pelas ligações das interseções + * 3. Delega cada ligação a uma thread da pool para processamento assíncrono + * + * @throws IOException Se o socket não puder ser criado ou houver erro na aceitação + */ + public void start() throws IOException { + int port = config.getExitPort(); + serverSocket = new ServerSocket(port); + running = true; + + System.out.println("Exit node started on port " + port); + System.out.println("Waiting for vehicles...\n"); + + while (running) { + try { + Socket clientSocket = serverSocket.accept(); + connectionHandlerPool.submit(() -> handleIncomingConnection(clientSocket)); + } catch (IOException e) { + if (running) { + System.err.println("Error accepting connection: " + e.getMessage()); + } + } + } + } + + /** + * Processa uma ligação recebida de uma interseção. + * + * Mantém a ligação aberta e processa continuamente mensagens do tipo + * VEHICLE_TRANSFER. Cada mensagem representa um veículo que chegou ao nó de saída. + * + * @param clientSocket Socket da ligação estabelecida com a interseção + */ + private void handleIncomingConnection(Socket clientSocket) { + try (SocketConnection connection = new SocketConnection(clientSocket)) { + + System.out.println("New connection accepted from " + + clientSocket.getInetAddress().getHostAddress()); + + while (running && connection.isConnected()) { + try { + MessageProtocol message = connection.receiveMessage(); + + if (message.getType() == MessageType.VEHICLE_TRANSFER) { + Vehicle vehicle = (Vehicle) message.getPayload(); + processExitingVehicle(vehicle); + } + + } catch (ClassNotFoundException e) { + System.err.println("Unknown message type received: " + e.getMessage()); + } + } + + } catch (IOException e) { + if (running) { + System.err.println("Connection error: " + e.getMessage()); + } + } + } + + /** + * Processa um veículo que chegou ao nó de saída. + * + * Método sincronizado para garantir thread-safety ao atualizar as estatísticas. + * Calcula as métricas finais do veículo e atualiza: + * - Counters globais; + * - Estatísticas por tipo de veículo; + * - Faz update ao dashboard a cada 10 veículos. + * + * @param vehicle Veículo que completou a sua rota + */ + private synchronized void processExitingVehicle(Vehicle vehicle) { + totalVehiclesReceived++; + + double systemTime = vehicle.getTotalTravelTime(getCurrentTime()); + double waitTime = vehicle.getTotalWaitingTime(); + double crossingTime = vehicle.getTotalCrossingTime(); + + totalSystemTime += systemTime; + totalWaitingTime += waitTime; + totalCrossingTime += crossingTime; + + VehicleType type = vehicle.getType(); + vehicleTypeCount.put(type, vehicleTypeCount.get(type) + 1); + vehicleTypeWaitTime.put(type, vehicleTypeWaitTime.get(type) + waitTime); + + System.out.printf("[Exit] Vehicle %s completed (type=%s, system_time=%.2fs, wait=%.2fs)%n", + vehicle.getId(), vehicle.getType(), systemTime, waitTime); + + if (totalVehiclesReceived % 10 == 0) { + sendStatsToDashboard(); + } + } + + /** + * Obtém o tempo atual da simulação em segundos. + * + * @return Tempo atual em segundos desde "epoch" + * + * "Epoch" é um ponto de referência temporal Unix (1 de janeiro de 1970). + * Este método retorna os segundos decorridos desde esse momento. + */ + private double getCurrentTime() { + return System.currentTimeMillis() / 1000.0; + } + + /** + * Envia as estatísticas para o dashboard. + * + * Prepara e envia uma mensagem STATS_UPDATE com: + * - O total de veículos processados; + * - A média dos tempos (sistema, espera, travessia); + * - As contagens e médias por cada tipo de veículo. + * + */ + private void sendStatsToDashboard() { + if (dashboardClient == null || !dashboardClient.isConnected()) { + return; + } + + try { + Map stats = new HashMap<>(); + stats.put("totalVehicles", totalVehiclesReceived); + stats.put("avgSystemTime", totalVehiclesReceived > 0 ? totalSystemTime / totalVehiclesReceived : 0.0); + stats.put("avgWaitingTime", totalVehiclesReceived > 0 ? totalWaitingTime / totalVehiclesReceived : 0.0); + stats.put("avgCrossingTime", totalVehiclesReceived > 0 ? totalCrossingTime / totalVehiclesReceived : 0.0); + + Map typeCounts = new HashMap<>(); + Map typeAvgWait = new HashMap<>(); + for (VehicleType type : VehicleType.values()) { + int count = vehicleTypeCount.get(type); + typeCounts.put(type.name(), count); + if (count > 0) { + typeAvgWait.put(type.name(), vehicleTypeWaitTime.get(type) / count); + } + } + stats.put("vehicleTypeCounts", typeCounts); + stats.put("vehicleTypeAvgWait", typeAvgWait); + + Message message = new Message(MessageType.STATS_UPDATE, "ExitNode", "Dashboard", stats); + dashboardClient.send(message); + + System.out.printf("[Exit] Sent stats to dashboard (total=%d, avg_wait=%.2fs)%n", + totalVehiclesReceived, totalWaitingTime / totalVehiclesReceived); + + } catch (SerializationException | IOException e) { + System.err.println("Failed to send stats to dashboard: " + e.getMessage()); + } + } + + /** + * Termina o processo + * + * Executa a seguinte sequência: + * Imprime as estatísticas finais no terminal; + * Envia a última atualização de estatísticas ao dashboard; + * Fecha o socket; + * Aguarda pela finalização das threads; + * Fecha a ligação com o dashboard; + */ + public void shutdown() { + System.out.println("\n[Exit] Shutting down..."); + running = false; + + printFinalStatistics(); + + sendStatsToDashboard(); + + try { + if (serverSocket != null && !serverSocket.isClosed()) { + serverSocket.close(); + } + } catch (IOException e) { + System.err.println("Error closing server socket: " + e.getMessage()); + } + + connectionHandlerPool.shutdown(); + try { + if (!connectionHandlerPool.awaitTermination(5, TimeUnit.SECONDS)) { + connectionHandlerPool.shutdownNow(); + } + } catch (InterruptedException e) { + connectionHandlerPool.shutdownNow(); + } + + if (dashboardClient != null) { + dashboardClient.close(); + } + + System.out.println("[Exit] Shutdown complete."); + System.out.println("=".repeat(60)); + } + + /** + * Imprime as estatísticas finais detalhadas no terminal + * + * Gera um relatório com: + * Total de veículos que completaram a rota; + * Médias de tempo no sistema, espera e travessia; + * Distribuição e médias pelo tipo de veículo (BIKE, LIGHT, HEAVY); + * + * Este método é chamado durante o shutdown para fornecer um resumo + * da simulação antes de terminar o processo. + */ + private void printFinalStatistics() { + System.out.println("\n=== EXIT NODE STATISTICS ==="); + System.out.printf("Total Vehicles Completed: %d%n", totalVehiclesReceived); + + if (totalVehiclesReceived > 0) { + System.out.printf("%nAVERAGE METRICS:%n"); + System.out.printf(" System Time: %.2f seconds%n", totalSystemTime / totalVehiclesReceived); + System.out.printf(" Waiting Time: %.2f seconds%n", totalWaitingTime / totalVehiclesReceived); + System.out.printf(" Crossing Time: %.2f seconds%n", totalCrossingTime / totalVehiclesReceived); + } + + System.out.println("\nVEHICLE TYPE DISTRIBUTION:"); + for (VehicleType type : VehicleType.values()) { + int count = vehicleTypeCount.get(type); + if (count > 0) { + double percentage = (count * 100.0) / totalVehiclesReceived; + double avgWait = vehicleTypeWaitTime.get(type) / count; + System.out.printf(" %s: %d (%.1f%%), Avg Wait: %.2fs%n", + type, count, percentage, avgWait); + } + } + } + +} diff --git a/main/src/test/java/sd/ExitNodeProcessTest.java b/main/src/test/java/sd/ExitNodeProcessTest.java new file mode 100644 index 0000000..56e9df9 --- /dev/null +++ b/main/src/test/java/sd/ExitNodeProcessTest.java @@ -0,0 +1,327 @@ +package sd; + +import java.io.IOException; +import java.net.Socket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +import sd.config.SimulationConfig; + +/** + * Testes unitários para a classe ExitNodeProcess. + * + * Esta classe de testes verifica: + * - Construção e inicialização do processo + * - Criação e aceitação de conexões do servidor socket + * - Gestão do ciclo de vida (start/shutdown) + * - Processamento concorrente de múltiplas conexões + * - Impressão de estatísticas finais + * + * Os testes utilizam configurações temporárias e portas dedicadas (19001) + * para evitar conflitos com outros testes ou processos em execução. + */ +public class ExitNodeProcessTest { + + @TempDir + Path tempDir; + + private Path configFile; + private ExitNodeProcess exitNodeProcess; + private Thread exitNodeThread; + + /** + * Configura o ambiente de teste antes de cada teste. + * Cria um ficheiro de configuração temporário com as definições necessárias. + */ + @BeforeEach + public void setUp() throws IOException { + configFile = tempDir.resolve("test-simulation.properties"); + + String configContent = """ + # Test Exit Node Configuration + + # Exit Configuration + exit.host=localhost + exit.port=19001 + + # Dashboard Configuration (will not be running in tests) + dashboard.host=localhost + dashboard.port=19000 + + # Vehicle Crossing Times + vehicle.bike.crossingTime=2.0 + vehicle.light.crossingTime=3.0 + vehicle.heavy.crossingTime=5.0 + + # Simulation Duration + simulation.duration=60.0 + """; + + Files.writeString(configFile, configContent); + } + + /** + * Limpa os recursos após cada teste. + * Garante que o processo e threads são terminados corretamente. + */ + @AfterEach + public void tearDown() { + if (exitNodeProcess != null) { + exitNodeProcess.shutdown(); + } + if (exitNodeThread != null && exitNodeThread.isAlive()) { + exitNodeThread.interrupt(); + try { + exitNodeThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Testa a construção bem-sucedida do ExitNodeProcess com configuração válida. + */ + @Test + public void testConstructor_Success() throws IOException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + assertNotNull(exitNodeProcess); + } + + /** + * Testa que uma exceção é lançada quando a configuração é inválida. + */ + @Test + public void testConstructor_InvalidConfig() { + Exception exception = assertThrows(IOException.class, () -> { + new SimulationConfig("non-existent-config.properties"); + }); + assertNotNull(exception); + } + + /** + * Testa a inicialização sem dashboard disponível. + * Verifica que o processo continua a funcionar mesmo sem conexão ao dashboard. + */ + @Test + public void testInitialize_WithoutDashboard() throws IOException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + assertDoesNotThrow(() -> exitNodeProcess.initialize()); + } + + /** + * Testa que o servidor socket é criado corretamente na porta configurada. + * Verifica que é possível estabelecer uma conexão ao socket do servidor. + */ + @Test + @Timeout(value = 3, unit = TimeUnit.SECONDS) + public void testStart_ServerSocketCreated() throws IOException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + exitNodeProcess.initialize(); + + CountDownLatch latch = new CountDownLatch(1); + + exitNodeThread = new Thread(() -> { + try { + latch.countDown(); + exitNodeProcess.start(); + } catch (IOException e) { + // expected when shutdown + } + }); + + exitNodeThread.start(); + + try { + assertTrue(latch.await(2, TimeUnit.SECONDS), "Exit node should start within timeout"); + Thread.sleep(100); + + assertDoesNotThrow(() -> { + try (Socket testSocket = new Socket("localhost", 19001)) { + assertTrue(testSocket.isConnected()); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Testa que o servidor aceita conexões de clientes. + */ + @Test + @Timeout(value = 3, unit = TimeUnit.SECONDS) + public void testStart_AcceptsConnection() throws IOException, InterruptedException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + exitNodeProcess.initialize(); + + CountDownLatch latch = new CountDownLatch(1); + + exitNodeThread = new Thread(() -> { + try { + latch.countDown(); + exitNodeProcess.start(); + } catch (IOException e) { + // expected + } + }); + + exitNodeThread.start(); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + Thread.sleep(200); + + assertDoesNotThrow(() -> { + try (Socket socket = new Socket("localhost", 19001)) { + assertTrue(socket.isConnected()); + } + }); + } + + /** + * Testa múltiplas inicializações e encerramentos do processo. + * Verifica que o processo pode ser iniciado e parado múltiplas vezes, + * permitindo reutilização da porta. + */ + @Test + @Timeout(value = 3, unit = TimeUnit.SECONDS) + public void testMultipleStartStop() throws IOException, InterruptedException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + exitNodeProcess.initialize(); + + CountDownLatch latch = new CountDownLatch(1); + + exitNodeThread = new Thread(() -> { + try { + latch.countDown(); + exitNodeProcess.start(); + } catch (IOException e) { + // expected + } + }); + + exitNodeThread.start(); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + Thread.sleep(100); + + exitNodeProcess.shutdown(); + Thread.sleep(100); + + assertDoesNotThrow(() -> { + SimulationConfig config2 = new SimulationConfig(configFile.toString()); + ExitNodeProcess exitNode2 = new ExitNodeProcess(config2); + exitNode2.initialize(); + exitNode2.shutdown(); + }); + } + + /** + * Testa que o shutdown fecha corretamente o servidor socket. + * Após o shutdown, novas conexões ao socket devem falhar. + */ + @Test + @Timeout(value = 3, unit = TimeUnit.SECONDS) + public void testShutdown_ClosesServerSocket() throws IOException, InterruptedException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + exitNodeProcess.initialize(); + + CountDownLatch startLatch = new CountDownLatch(1); + + exitNodeThread = new Thread(() -> { + try { + startLatch.countDown(); + exitNodeProcess.start(); + } catch (IOException e) { + // expected + } + }); + + exitNodeThread.start(); + assertTrue(startLatch.await(2, TimeUnit.SECONDS)); + Thread.sleep(200); + + exitNodeProcess.shutdown(); + Thread.sleep(200); + + assertThrows(IOException.class, () -> { + Socket socket = new Socket("localhost", 19001); + socket.close(); + }); + } + + /** + * Testa que as estatísticas finais são impressas corretamente durante o shutdown. + * Verifica que o método não lança exceções mesmo sem dados processados. + */ + @Test + public void testPrintFinalStatistics() throws IOException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + exitNodeProcess.initialize(); + + assertDoesNotThrow(() -> exitNodeProcess.shutdown()); + } + + /** + * Testa o processamento de múltiplas conexões concorrentes. + * Verifica que o servidor consegue lidar com vários clientes simultaneamente + * usando o pool de threads. + */ + @Test + @Timeout(value = 3, unit = TimeUnit.SECONDS) + public void testMultipleConcurrentConnections() throws IOException, InterruptedException { + SimulationConfig config = new SimulationConfig(configFile.toString()); + exitNodeProcess = new ExitNodeProcess(config); + exitNodeProcess.initialize(); + + CountDownLatch latch = new CountDownLatch(1); + + exitNodeThread = new Thread(() -> { + try { + latch.countDown(); + exitNodeProcess.start(); + } catch (IOException e) { + // expected + } + }); + + exitNodeThread.start(); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + Thread.sleep(200); + + Thread[] clients = new Thread[3]; + for (int i = 0; i < 3; i++) { + clients[i] = new Thread(() -> { + try (Socket socket = new Socket("localhost", 19001)) { + assertTrue(socket.isConnected()); + Thread.sleep(100); + } catch (IOException | InterruptedException e) { + // ignore + } + }); + clients[i].start(); + } + + for (Thread client : clients) { + client.join(1000); + } + } +}