Add ExitNodeProcess and unit tests

This commit is contained in:
David Alves
2025-11-05 11:54:34 +00:00
parent 3b4f968a59
commit 0960a7a141
2 changed files with 709 additions and 0 deletions

View File

@@ -0,0 +1,382 @@
package sd;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import sd.config.SimulationConfig;
import sd.coordinator.SocketClient;
import sd.model.Message;
import sd.model.MessageType;
import sd.model.Vehicle;
import sd.model.VehicleType;
import sd.protocol.MessageProtocol;
import sd.protocol.SocketConnection;
import sd.serialization.SerializationException;
/**
* Processo responsável pelo nó de saída do sistema de simulação de tráfego distribuído.
*
* Este processo representa o ponto final ("S") onde os veículos completam as suas rotas.
* As suas principais responsabilidades são:
* - Receber veículos que terminam a sua rota vindos das interseções
* - Calcular e agregar estatísticas finais dos veículos
* - Enviar estatísticas periódicas para o dashboard
* - Gerar relatórios finais ao terminar a simulação
*/
public class ExitNodeProcess {
private final SimulationConfig config;
private ServerSocket serverSocket;
private final ExecutorService connectionHandlerPool;
/** Flag para controlar a execução do processo (volatile para visibilidade entre threads) */
private volatile boolean running;
/** Counter de veículos que completaram a rota */
private int totalVehiclesReceived;
/** Soma dos tempos no sistema de todos os veículos */
private double totalSystemTime;
/** Soma dos tempos de espera de todos os veículos */
private double totalWaitingTime;
/** Soma dos tempos de travessia de todos os veículos */
private double totalCrossingTime;
/** Contagem de veículos por tipo */
private final Map<VehicleType, Integer> vehicleTypeCount;
/** Tempo total de espera acumulado por tipo de veículo */
private final Map<VehicleType, Double> vehicleTypeWaitTime;
/** Socket para comunicação com o dashboard */
private SocketClient dashboardClient;
/**
* Método para iniciar o processo
*
* @param args Argumentos da linha de comandos. Se fornecido, args[0] deve ser
* o caminho para um ficheiro de configuração personalizado.
*/
public static void main(String[] args) {
System.out.println("=".repeat(60));
System.out.println("EXIT NODE PROCESS");
System.out.println("=".repeat(60));
try {
String configFile = args.length > 0 ? args[0] : "src/main/resources/simulation.properties";
System.out.println("Loading configuration from: " + configFile);
SimulationConfig config = new SimulationConfig(configFile);
ExitNodeProcess exitNode = new ExitNodeProcess(config);
System.out.println("\n" + "=".repeat(60));
exitNode.initialize();
System.out.println("\n" + "=".repeat(60));
exitNode.start();
} catch (IOException e) {
System.err.println("Failed to start exit node: " + e.getMessage());
System.exit(1);
} catch (Exception e) {
System.err.println("Exit node error: " + e.getMessage());
System.exit(1);
}
}
/**
* Constrói um novo processo de nó de saída.
*
* Inicializa todas as estruturas de dados necessárias para recolher estatísticas
* e configura o pool de threads para processar as ligações concorrentes.
*
* @param config Configuração da simulação contendo portas e endereços dos serviços
*/
public ExitNodeProcess(SimulationConfig config) {
this.config = config;
this.connectionHandlerPool = Executors.newCachedThreadPool();
this.running = false;
this.totalVehiclesReceived = 0;
this.totalSystemTime = 0.0;
this.totalWaitingTime = 0.0;
this.totalCrossingTime = 0.0;
this.vehicleTypeCount = new HashMap<>();
this.vehicleTypeWaitTime = new HashMap<>();
// Inicializa os counters para cada tipo de veículo
for (VehicleType type : VehicleType.values()) {
vehicleTypeCount.put(type, 0);
vehicleTypeWaitTime.put(type, 0.0);
}
System.out.println("Exit node initialized");
System.out.println(" - Exit port: " + config.getExitPort());
System.out.println(" - Dashboard: " + config.getDashboardHost() + ":" + config.getDashboardPort());
}
/**
* Inicializa o processo de ligação ao dashboard.
*
* Tenta conectar-se ao dashboard. Se a ligação falhar, o processo
* continua a funcionar normalmente, mas sem enviar estatísticas.
*
*/
public void initialize() {
System.out.println("Connecting to dashboard...");
try {
String host = config.getDashboardHost();
int port = config.getDashboardPort();
dashboardClient = new SocketClient("Dashboard", host, port);
dashboardClient.connect();
System.out.println("Successfully connected to dashboard");
} catch (IOException e) {
System.err.println("WARNING: Failed to connect to dashboard: " + e.getMessage());
System.err.println("Exit node will continue without dashboard connection");
}
}
/**
* Inicia o socket e começa a aceitar ligações.
*
* Este é o loop principal do processo que:
* 1. Cria um socket na porta definida
* 2. Aguarda pelas ligações das interseções
* 3. Delega cada ligação a uma thread da pool para processamento assíncrono
*
* @throws IOException Se o socket não puder ser criado ou houver erro na aceitação
*/
public void start() throws IOException {
int port = config.getExitPort();
serverSocket = new ServerSocket(port);
running = true;
System.out.println("Exit node started on port " + port);
System.out.println("Waiting for vehicles...\n");
while (running) {
try {
Socket clientSocket = serverSocket.accept();
connectionHandlerPool.submit(() -> handleIncomingConnection(clientSocket));
} catch (IOException e) {
if (running) {
System.err.println("Error accepting connection: " + e.getMessage());
}
}
}
}
/**
* Processa uma ligação recebida de uma interseção.
*
* Mantém a ligação aberta e processa continuamente mensagens do tipo
* VEHICLE_TRANSFER. Cada mensagem representa um veículo que chegou ao nó de saída.
*
* @param clientSocket Socket da ligação estabelecida com a interseção
*/
private void handleIncomingConnection(Socket clientSocket) {
try (SocketConnection connection = new SocketConnection(clientSocket)) {
System.out.println("New connection accepted from " +
clientSocket.getInetAddress().getHostAddress());
while (running && connection.isConnected()) {
try {
MessageProtocol message = connection.receiveMessage();
if (message.getType() == MessageType.VEHICLE_TRANSFER) {
Vehicle vehicle = (Vehicle) message.getPayload();
processExitingVehicle(vehicle);
}
} catch (ClassNotFoundException e) {
System.err.println("Unknown message type received: " + e.getMessage());
}
}
} catch (IOException e) {
if (running) {
System.err.println("Connection error: " + e.getMessage());
}
}
}
/**
* Processa um veículo que chegou ao nó de saída.
*
* Método sincronizado para garantir thread-safety ao atualizar as estatísticas.
* Calcula as métricas finais do veículo e atualiza:
* - Counters globais;
* - Estatísticas por tipo de veículo;
* - Faz update ao dashboard a cada 10 veículos.
*
* @param vehicle Veículo que completou a sua rota
*/
private synchronized void processExitingVehicle(Vehicle vehicle) {
totalVehiclesReceived++;
double systemTime = vehicle.getTotalTravelTime(getCurrentTime());
double waitTime = vehicle.getTotalWaitingTime();
double crossingTime = vehicle.getTotalCrossingTime();
totalSystemTime += systemTime;
totalWaitingTime += waitTime;
totalCrossingTime += crossingTime;
VehicleType type = vehicle.getType();
vehicleTypeCount.put(type, vehicleTypeCount.get(type) + 1);
vehicleTypeWaitTime.put(type, vehicleTypeWaitTime.get(type) + waitTime);
System.out.printf("[Exit] Vehicle %s completed (type=%s, system_time=%.2fs, wait=%.2fs)%n",
vehicle.getId(), vehicle.getType(), systemTime, waitTime);
if (totalVehiclesReceived % 10 == 0) {
sendStatsToDashboard();
}
}
/**
* Obtém o tempo atual da simulação em segundos.
*
* @return Tempo atual em segundos desde "epoch"
*
* "Epoch" é um ponto de referência temporal Unix (1 de janeiro de 1970).
* Este método retorna os segundos decorridos desde esse momento.
*/
private double getCurrentTime() {
return System.currentTimeMillis() / 1000.0;
}
/**
* Envia as estatísticas para o dashboard.
*
* Prepara e envia uma mensagem STATS_UPDATE com:
* - O total de veículos processados;
* - A média dos tempos (sistema, espera, travessia);
* - As contagens e médias por cada tipo de veículo.
*
*/
private void sendStatsToDashboard() {
if (dashboardClient == null || !dashboardClient.isConnected()) {
return;
}
try {
Map<String, Object> stats = new HashMap<>();
stats.put("totalVehicles", totalVehiclesReceived);
stats.put("avgSystemTime", totalVehiclesReceived > 0 ? totalSystemTime / totalVehiclesReceived : 0.0);
stats.put("avgWaitingTime", totalVehiclesReceived > 0 ? totalWaitingTime / totalVehiclesReceived : 0.0);
stats.put("avgCrossingTime", totalVehiclesReceived > 0 ? totalCrossingTime / totalVehiclesReceived : 0.0);
Map<String, Integer> typeCounts = new HashMap<>();
Map<String, Double> typeAvgWait = new HashMap<>();
for (VehicleType type : VehicleType.values()) {
int count = vehicleTypeCount.get(type);
typeCounts.put(type.name(), count);
if (count > 0) {
typeAvgWait.put(type.name(), vehicleTypeWaitTime.get(type) / count);
}
}
stats.put("vehicleTypeCounts", typeCounts);
stats.put("vehicleTypeAvgWait", typeAvgWait);
Message message = new Message(MessageType.STATS_UPDATE, "ExitNode", "Dashboard", stats);
dashboardClient.send(message);
System.out.printf("[Exit] Sent stats to dashboard (total=%d, avg_wait=%.2fs)%n",
totalVehiclesReceived, totalWaitingTime / totalVehiclesReceived);
} catch (SerializationException | IOException e) {
System.err.println("Failed to send stats to dashboard: " + e.getMessage());
}
}
/**
* Termina o processo
*
* Executa a seguinte sequência:
* Imprime as estatísticas finais no terminal;
* Envia a última atualização de estatísticas ao dashboard;
* Fecha o socket;
* Aguarda pela finalização das threads;
* Fecha a ligação com o dashboard;
*/
public void shutdown() {
System.out.println("\n[Exit] Shutting down...");
running = false;
printFinalStatistics();
sendStatsToDashboard();
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
} catch (IOException e) {
System.err.println("Error closing server socket: " + e.getMessage());
}
connectionHandlerPool.shutdown();
try {
if (!connectionHandlerPool.awaitTermination(5, TimeUnit.SECONDS)) {
connectionHandlerPool.shutdownNow();
}
} catch (InterruptedException e) {
connectionHandlerPool.shutdownNow();
}
if (dashboardClient != null) {
dashboardClient.close();
}
System.out.println("[Exit] Shutdown complete.");
System.out.println("=".repeat(60));
}
/**
* Imprime as estatísticas finais detalhadas no terminal
*
* Gera um relatório com:
* Total de veículos que completaram a rota;
* Médias de tempo no sistema, espera e travessia;
* Distribuição e médias pelo tipo de veículo (BIKE, LIGHT, HEAVY);
*
* Este método é chamado durante o shutdown para fornecer um resumo
* da simulação antes de terminar o processo.
*/
private void printFinalStatistics() {
System.out.println("\n=== EXIT NODE STATISTICS ===");
System.out.printf("Total Vehicles Completed: %d%n", totalVehiclesReceived);
if (totalVehiclesReceived > 0) {
System.out.printf("%nAVERAGE METRICS:%n");
System.out.printf(" System Time: %.2f seconds%n", totalSystemTime / totalVehiclesReceived);
System.out.printf(" Waiting Time: %.2f seconds%n", totalWaitingTime / totalVehiclesReceived);
System.out.printf(" Crossing Time: %.2f seconds%n", totalCrossingTime / totalVehiclesReceived);
}
System.out.println("\nVEHICLE TYPE DISTRIBUTION:");
for (VehicleType type : VehicleType.values()) {
int count = vehicleTypeCount.get(type);
if (count > 0) {
double percentage = (count * 100.0) / totalVehiclesReceived;
double avgWait = vehicleTypeWaitTime.get(type) / count;
System.out.printf(" %s: %d (%.1f%%), Avg Wait: %.2fs%n",
type, count, percentage, avgWait);
}
}
}
}

View File

@@ -0,0 +1,327 @@
package sd;
import java.io.IOException;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import sd.config.SimulationConfig;
/**
* Testes unitários para a classe ExitNodeProcess.
*
* Esta classe de testes verifica:
* - Construção e inicialização do processo
* - Criação e aceitação de conexões do servidor socket
* - Gestão do ciclo de vida (start/shutdown)
* - Processamento concorrente de múltiplas conexões
* - Impressão de estatísticas finais
*
* Os testes utilizam configurações temporárias e portas dedicadas (19001)
* para evitar conflitos com outros testes ou processos em execução.
*/
public class ExitNodeProcessTest {
@TempDir
Path tempDir;
private Path configFile;
private ExitNodeProcess exitNodeProcess;
private Thread exitNodeThread;
/**
* Configura o ambiente de teste antes de cada teste.
* Cria um ficheiro de configuração temporário com as definições necessárias.
*/
@BeforeEach
public void setUp() throws IOException {
configFile = tempDir.resolve("test-simulation.properties");
String configContent = """
# Test Exit Node Configuration
# Exit Configuration
exit.host=localhost
exit.port=19001
# Dashboard Configuration (will not be running in tests)
dashboard.host=localhost
dashboard.port=19000
# Vehicle Crossing Times
vehicle.bike.crossingTime=2.0
vehicle.light.crossingTime=3.0
vehicle.heavy.crossingTime=5.0
# Simulation Duration
simulation.duration=60.0
""";
Files.writeString(configFile, configContent);
}
/**
* Limpa os recursos após cada teste.
* Garante que o processo e threads são terminados corretamente.
*/
@AfterEach
public void tearDown() {
if (exitNodeProcess != null) {
exitNodeProcess.shutdown();
}
if (exitNodeThread != null && exitNodeThread.isAlive()) {
exitNodeThread.interrupt();
try {
exitNodeThread.join(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* Testa a construção bem-sucedida do ExitNodeProcess com configuração válida.
*/
@Test
public void testConstructor_Success() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
assertNotNull(exitNodeProcess);
}
/**
* Testa que uma exceção é lançada quando a configuração é inválida.
*/
@Test
public void testConstructor_InvalidConfig() {
Exception exception = assertThrows(IOException.class, () -> {
new SimulationConfig("non-existent-config.properties");
});
assertNotNull(exception);
}
/**
* Testa a inicialização sem dashboard disponível.
* Verifica que o processo continua a funcionar mesmo sem conexão ao dashboard.
*/
@Test
public void testInitialize_WithoutDashboard() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
assertDoesNotThrow(() -> exitNodeProcess.initialize());
}
/**
* Testa que o servidor socket é criado corretamente na porta configurada.
* Verifica que é possível estabelecer uma conexão ao socket do servidor.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testStart_ServerSocketCreated() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected when shutdown
}
});
exitNodeThread.start();
try {
assertTrue(latch.await(2, TimeUnit.SECONDS), "Exit node should start within timeout");
Thread.sleep(100);
assertDoesNotThrow(() -> {
try (Socket testSocket = new Socket("localhost", 19001)) {
assertTrue(testSocket.isConnected());
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Testa que o servidor aceita conexões de clientes.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testStart_AcceptsConnection() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(latch.await(2, TimeUnit.SECONDS));
Thread.sleep(200);
assertDoesNotThrow(() -> {
try (Socket socket = new Socket("localhost", 19001)) {
assertTrue(socket.isConnected());
}
});
}
/**
* Testa múltiplas inicializações e encerramentos do processo.
* Verifica que o processo pode ser iniciado e parado múltiplas vezes,
* permitindo reutilização da porta.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testMultipleStartStop() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(latch.await(2, TimeUnit.SECONDS));
Thread.sleep(100);
exitNodeProcess.shutdown();
Thread.sleep(100);
assertDoesNotThrow(() -> {
SimulationConfig config2 = new SimulationConfig(configFile.toString());
ExitNodeProcess exitNode2 = new ExitNodeProcess(config2);
exitNode2.initialize();
exitNode2.shutdown();
});
}
/**
* Testa que o shutdown fecha corretamente o servidor socket.
* Após o shutdown, novas conexões ao socket devem falhar.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testShutdown_ClosesServerSocket() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch startLatch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
startLatch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(startLatch.await(2, TimeUnit.SECONDS));
Thread.sleep(200);
exitNodeProcess.shutdown();
Thread.sleep(200);
assertThrows(IOException.class, () -> {
Socket socket = new Socket("localhost", 19001);
socket.close();
});
}
/**
* Testa que as estatísticas finais são impressas corretamente durante o shutdown.
* Verifica que o método não lança exceções mesmo sem dados processados.
*/
@Test
public void testPrintFinalStatistics() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
assertDoesNotThrow(() -> exitNodeProcess.shutdown());
}
/**
* Testa o processamento de múltiplas conexões concorrentes.
* Verifica que o servidor consegue lidar com vários clientes simultaneamente
* usando o pool de threads.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testMultipleConcurrentConnections() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(latch.await(2, TimeUnit.SECONDS));
Thread.sleep(200);
Thread[] clients = new Thread[3];
for (int i = 0; i < 3; i++) {
clients[i] = new Thread(() -> {
try (Socket socket = new Socket("localhost", 19001)) {
assertTrue(socket.isConnected());
Thread.sleep(100);
} catch (IOException | InterruptedException e) {
// ignore
}
});
clients[i].start();
}
for (Thread client : clients) {
client.join(1000);
}
}
}