15 Commits

Author SHA1 Message Date
Gaa56
a31ac61a9a Start traffic light threads on process initialization 2025-11-07 12:14:52 +00:00
5dc1b40c88 Merge pull request #32 from davidalves04/14-create-trafficlightthread-class
14 create trafficlightthread class
2025-11-06 13:53:12 +00:00
3117bdf332 Merge branch 'dev' into 14-create-trafficlightthread-class 2025-11-06 13:53:01 +00:00
1140c3ca48 Merge pull request #30 from davidalves04/13-create-exit-node-process
13 create exit node process
2025-11-06 13:49:21 +00:00
Gaa56
484cba1eee Update TrafficLightThread 2025-11-05 13:21:10 +00:00
Gaa56
0e5526c3f6 Merge pull request #31 from davidalves04/dev
Dev
2025-11-05 12:37:48 +00:00
David Alves
cf88db4297 Add traffic light coordination and tests
Sorry to add this on this branch ahah
2025-11-05 12:09:32 +00:00
David Alves
0960a7a141 Add ExitNodeProcess and unit tests 2025-11-05 11:54:34 +00:00
David Alves
3b4f968a59 Merge pull request #29 from davidalves04/12-implement-coordinatorgenerator-process
Coordinator Process Implementation
2025-11-03 00:02:56 +00:00
0c256ad6f5 Fix Intersection Destination - Doubled Advance 2025-11-02 23:56:54 +00:00
340e436063 Merge branch 'dev' into 12-implement-coordinatorgenerator-process 2025-11-02 23:21:36 +00:00
1684a6713e Implementation of the Coordinator Process 2025-11-02 23:17:15 +00:00
22a7081ade Merge pull request #28 from davidalves04/10-create-network-communication-classes
Fix Serialization
2025-11-02 22:39:38 +00:00
Gaa56
4710c96450 Create TrafficLightThread Class 2025-10-30 18:06:02 +00:00
f9644bd18c Merge pull request #26 from davidalves04/dev
#12 Req.
2025-10-30 16:09:04 +00:00
10 changed files with 1939 additions and 95 deletions

View File

@@ -0,0 +1,382 @@
package sd;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import sd.config.SimulationConfig;
import sd.coordinator.SocketClient;
import sd.model.Message;
import sd.model.MessageType;
import sd.model.Vehicle;
import sd.model.VehicleType;
import sd.protocol.MessageProtocol;
import sd.protocol.SocketConnection;
import sd.serialization.SerializationException;
/**
* Processo responsável pelo nó de saída do sistema de simulação de tráfego distribuído.
*
* Este processo representa o ponto final ("S") onde os veículos completam as suas rotas.
* As suas principais responsabilidades são:
* - Receber veículos que terminam a sua rota vindos das interseções
* - Calcular e agregar estatísticas finais dos veículos
* - Enviar estatísticas periódicas para o dashboard
* - Gerar relatórios finais ao terminar a simulação
*/
public class ExitNodeProcess {
private final SimulationConfig config;
private ServerSocket serverSocket;
private final ExecutorService connectionHandlerPool;
/** Flag para controlar a execução do processo (volatile para visibilidade entre threads) */
private volatile boolean running;
/** Counter de veículos que completaram a rota */
private int totalVehiclesReceived;
/** Soma dos tempos no sistema de todos os veículos */
private double totalSystemTime;
/** Soma dos tempos de espera de todos os veículos */
private double totalWaitingTime;
/** Soma dos tempos de travessia de todos os veículos */
private double totalCrossingTime;
/** Contagem de veículos por tipo */
private final Map<VehicleType, Integer> vehicleTypeCount;
/** Tempo total de espera acumulado por tipo de veículo */
private final Map<VehicleType, Double> vehicleTypeWaitTime;
/** Socket para comunicação com o dashboard */
private SocketClient dashboardClient;
/**
* Método para iniciar o processo
*
* @param args Argumentos da linha de comandos. Se fornecido, args[0] deve ser
* o caminho para um ficheiro de configuração personalizado.
*/
public static void main(String[] args) {
System.out.println("=".repeat(60));
System.out.println("EXIT NODE PROCESS");
System.out.println("=".repeat(60));
try {
String configFile = args.length > 0 ? args[0] : "src/main/resources/simulation.properties";
System.out.println("Loading configuration from: " + configFile);
SimulationConfig config = new SimulationConfig(configFile);
ExitNodeProcess exitNode = new ExitNodeProcess(config);
System.out.println("\n" + "=".repeat(60));
exitNode.initialize();
System.out.println("\n" + "=".repeat(60));
exitNode.start();
} catch (IOException e) {
System.err.println("Failed to start exit node: " + e.getMessage());
System.exit(1);
} catch (Exception e) {
System.err.println("Exit node error: " + e.getMessage());
System.exit(1);
}
}
/**
* Constrói um novo processo de nó de saída.
*
* Inicializa todas as estruturas de dados necessárias para recolher estatísticas
* e configura o pool de threads para processar as ligações concorrentes.
*
* @param config Configuração da simulação contendo portas e endereços dos serviços
*/
public ExitNodeProcess(SimulationConfig config) {
this.config = config;
this.connectionHandlerPool = Executors.newCachedThreadPool();
this.running = false;
this.totalVehiclesReceived = 0;
this.totalSystemTime = 0.0;
this.totalWaitingTime = 0.0;
this.totalCrossingTime = 0.0;
this.vehicleTypeCount = new HashMap<>();
this.vehicleTypeWaitTime = new HashMap<>();
// Inicializa os counters para cada tipo de veículo
for (VehicleType type : VehicleType.values()) {
vehicleTypeCount.put(type, 0);
vehicleTypeWaitTime.put(type, 0.0);
}
System.out.println("Exit node initialized");
System.out.println(" - Exit port: " + config.getExitPort());
System.out.println(" - Dashboard: " + config.getDashboardHost() + ":" + config.getDashboardPort());
}
/**
* Inicializa o processo de ligação ao dashboard.
*
* Tenta conectar-se ao dashboard. Se a ligação falhar, o processo
* continua a funcionar normalmente, mas sem enviar estatísticas.
*
*/
public void initialize() {
System.out.println("Connecting to dashboard...");
try {
String host = config.getDashboardHost();
int port = config.getDashboardPort();
dashboardClient = new SocketClient("Dashboard", host, port);
dashboardClient.connect();
System.out.println("Successfully connected to dashboard");
} catch (IOException e) {
System.err.println("WARNING: Failed to connect to dashboard: " + e.getMessage());
System.err.println("Exit node will continue without dashboard connection");
}
}
/**
* Inicia o socket e começa a aceitar ligações.
*
* Este é o loop principal do processo que:
* 1. Cria um socket na porta definida
* 2. Aguarda pelas ligações das interseções
* 3. Delega cada ligação a uma thread da pool para processamento assíncrono
*
* @throws IOException Se o socket não puder ser criado ou houver erro na aceitação
*/
public void start() throws IOException {
int port = config.getExitPort();
serverSocket = new ServerSocket(port);
running = true;
System.out.println("Exit node started on port " + port);
System.out.println("Waiting for vehicles...\n");
while (running) {
try {
Socket clientSocket = serverSocket.accept();
connectionHandlerPool.submit(() -> handleIncomingConnection(clientSocket));
} catch (IOException e) {
if (running) {
System.err.println("Error accepting connection: " + e.getMessage());
}
}
}
}
/**
* Processa uma ligação recebida de uma interseção.
*
* Mantém a ligação aberta e processa continuamente mensagens do tipo
* VEHICLE_TRANSFER. Cada mensagem representa um veículo que chegou ao nó de saída.
*
* @param clientSocket Socket da ligação estabelecida com a interseção
*/
private void handleIncomingConnection(Socket clientSocket) {
try (SocketConnection connection = new SocketConnection(clientSocket)) {
System.out.println("New connection accepted from " +
clientSocket.getInetAddress().getHostAddress());
while (running && connection.isConnected()) {
try {
MessageProtocol message = connection.receiveMessage();
if (message.getType() == MessageType.VEHICLE_TRANSFER) {
Vehicle vehicle = (Vehicle) message.getPayload();
processExitingVehicle(vehicle);
}
} catch (ClassNotFoundException e) {
System.err.println("Unknown message type received: " + e.getMessage());
}
}
} catch (IOException e) {
if (running) {
System.err.println("Connection error: " + e.getMessage());
}
}
}
/**
* Processa um veículo que chegou ao nó de saída.
*
* Método sincronizado para garantir thread-safety ao atualizar as estatísticas.
* Calcula as métricas finais do veículo e atualiza:
* - Counters globais;
* - Estatísticas por tipo de veículo;
* - Faz update ao dashboard a cada 10 veículos.
*
* @param vehicle Veículo que completou a sua rota
*/
private synchronized void processExitingVehicle(Vehicle vehicle) {
totalVehiclesReceived++;
double systemTime = vehicle.getTotalTravelTime(getCurrentTime());
double waitTime = vehicle.getTotalWaitingTime();
double crossingTime = vehicle.getTotalCrossingTime();
totalSystemTime += systemTime;
totalWaitingTime += waitTime;
totalCrossingTime += crossingTime;
VehicleType type = vehicle.getType();
vehicleTypeCount.put(type, vehicleTypeCount.get(type) + 1);
vehicleTypeWaitTime.put(type, vehicleTypeWaitTime.get(type) + waitTime);
System.out.printf("[Exit] Vehicle %s completed (type=%s, system_time=%.2fs, wait=%.2fs)%n",
vehicle.getId(), vehicle.getType(), systemTime, waitTime);
if (totalVehiclesReceived % 10 == 0) {
sendStatsToDashboard();
}
}
/**
* Obtém o tempo atual da simulação em segundos.
*
* @return Tempo atual em segundos desde "epoch"
*
* "Epoch" é um ponto de referência temporal Unix (1 de janeiro de 1970).
* Este método retorna os segundos decorridos desde esse momento.
*/
private double getCurrentTime() {
return System.currentTimeMillis() / 1000.0;
}
/**
* Envia as estatísticas para o dashboard.
*
* Prepara e envia uma mensagem STATS_UPDATE com:
* - O total de veículos processados;
* - A média dos tempos (sistema, espera, travessia);
* - As contagens e médias por cada tipo de veículo.
*
*/
private void sendStatsToDashboard() {
if (dashboardClient == null || !dashboardClient.isConnected()) {
return;
}
try {
Map<String, Object> stats = new HashMap<>();
stats.put("totalVehicles", totalVehiclesReceived);
stats.put("avgSystemTime", totalVehiclesReceived > 0 ? totalSystemTime / totalVehiclesReceived : 0.0);
stats.put("avgWaitingTime", totalVehiclesReceived > 0 ? totalWaitingTime / totalVehiclesReceived : 0.0);
stats.put("avgCrossingTime", totalVehiclesReceived > 0 ? totalCrossingTime / totalVehiclesReceived : 0.0);
Map<String, Integer> typeCounts = new HashMap<>();
Map<String, Double> typeAvgWait = new HashMap<>();
for (VehicleType type : VehicleType.values()) {
int count = vehicleTypeCount.get(type);
typeCounts.put(type.name(), count);
if (count > 0) {
typeAvgWait.put(type.name(), vehicleTypeWaitTime.get(type) / count);
}
}
stats.put("vehicleTypeCounts", typeCounts);
stats.put("vehicleTypeAvgWait", typeAvgWait);
Message message = new Message(MessageType.STATS_UPDATE, "ExitNode", "Dashboard", stats);
dashboardClient.send(message);
System.out.printf("[Exit] Sent stats to dashboard (total=%d, avg_wait=%.2fs)%n",
totalVehiclesReceived, totalWaitingTime / totalVehiclesReceived);
} catch (SerializationException | IOException e) {
System.err.println("Failed to send stats to dashboard: " + e.getMessage());
}
}
/**
* Termina o processo
*
* Executa a seguinte sequência:
* Imprime as estatísticas finais no terminal;
* Envia a última atualização de estatísticas ao dashboard;
* Fecha o socket;
* Aguarda pela finalização das threads;
* Fecha a ligação com o dashboard;
*/
public void shutdown() {
System.out.println("\n[Exit] Shutting down...");
running = false;
printFinalStatistics();
sendStatsToDashboard();
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
} catch (IOException e) {
System.err.println("Error closing server socket: " + e.getMessage());
}
connectionHandlerPool.shutdown();
try {
if (!connectionHandlerPool.awaitTermination(5, TimeUnit.SECONDS)) {
connectionHandlerPool.shutdownNow();
}
} catch (InterruptedException e) {
connectionHandlerPool.shutdownNow();
}
if (dashboardClient != null) {
dashboardClient.close();
}
System.out.println("[Exit] Shutdown complete.");
System.out.println("=".repeat(60));
}
/**
* Imprime as estatísticas finais detalhadas no terminal
*
* Gera um relatório com:
* Total de veículos que completaram a rota;
* Médias de tempo no sistema, espera e travessia;
* Distribuição e médias pelo tipo de veículo (BIKE, LIGHT, HEAVY);
*
* Este método é chamado durante o shutdown para fornecer um resumo
* da simulação antes de terminar o processo.
*/
private void printFinalStatistics() {
System.out.println("\n=== EXIT NODE STATISTICS ===");
System.out.printf("Total Vehicles Completed: %d%n", totalVehiclesReceived);
if (totalVehiclesReceived > 0) {
System.out.printf("%nAVERAGE METRICS:%n");
System.out.printf(" System Time: %.2f seconds%n", totalSystemTime / totalVehiclesReceived);
System.out.printf(" Waiting Time: %.2f seconds%n", totalWaitingTime / totalVehiclesReceived);
System.out.printf(" Crossing Time: %.2f seconds%n", totalCrossingTime / totalVehiclesReceived);
}
System.out.println("\nVEHICLE TYPE DISTRIBUTION:");
for (VehicleType type : VehicleType.values()) {
int count = vehicleTypeCount.get(type);
if (count > 0) {
double percentage = (count * 100.0) / totalVehiclesReceived;
double avgWait = vehicleTypeWaitTime.get(type) / count;
System.out.printf(" %s: %d (%.1f%%), Avg Wait: %.2fs%n",
type, count, percentage, avgWait);
}
}
}
}

View File

@@ -8,12 +8,14 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import sd.config.SimulationConfig;
import sd.engine.TrafficLightThread;
import sd.model.Intersection;
import sd.model.MessageType;
import sd.model.TrafficLight;
import sd.model.TrafficLightState;
import sd.model.Vehicle;
import sd.protocol.MessageProtocol;
import sd.protocol.SocketConnection;
@@ -42,6 +44,19 @@ public class IntersectionProcess {
private volatile boolean running; //Quando uma thread escreve um valor volatile, todas as outras
//threads veem a mudança imediatamente.
// Traffic Light Coordination
/**
* Lock to ensure mutual exclusion between traffic lights.
* Only one traffic light can be green at any given time within this intersection.
*/
private final Lock trafficCoordinationLock;
/**
* Tracks which direction currently has the green light.
* null means no direction is currently green (all are red).
*/
private volatile String currentGreenDirection;
/**
* Constructs a new IntersectionProcess.
*
@@ -57,6 +72,8 @@ public class IntersectionProcess {
this.connectionHandlerPool = Executors.newCachedThreadPool();
this.trafficLightPool = Executors.newFixedThreadPool(4); // Max 4 directions
this.running = false;
this.trafficCoordinationLock = new ReentrantLock();
this.currentGreenDirection = null;
System.out.println("=".repeat(60));
System.out.println("INTERSECTION PROCESS: " + intersectionId);
@@ -70,8 +87,6 @@ public class IntersectionProcess {
configureRouting();
startTrafficLights();
System.out.println("[" + intersectionId + "] Initialization complete.");
}
@@ -161,107 +176,25 @@ public class IntersectionProcess {
System.out.println("\n[" + intersectionId + "] Starting traffic light threads...");
for (TrafficLight light : intersection.getTrafficLights()) {
trafficLightPool.submit(() -> runTrafficLightCycle(light));
TrafficLightThread lightTask = new TrafficLightThread(light, this, config);
trafficLightPool.submit(lightTask);
System.out.println(" Started thread for: " + light.getDirection());
}
}
/**
* The main loop for a traffic light thread.
* Continuously cycles between GREEN and RED states.
*
* @param light The traffic light to control.
*/
private void runTrafficLightCycle(TrafficLight light) {
System.out.println("[" + light.getId() + "] Traffic light thread started.");
while (running) {
try {
// Green state
light.changeState(TrafficLightState.GREEN);
System.out.println("[" + light.getId() + "] State: GREEN");
// Process vehicles while green
processGreenLight(light);
// Wait for green duration
Thread.sleep((long) (light.getGreenTime() * 1000));
// RED state
light.changeState(TrafficLightState.RED);
System.out.println("[" + light.getId() + "] State: RED");
// Wait for red duration
Thread.sleep((long) (light.getRedTime() * 1000));
} catch (InterruptedException e) {
System.out.println("[" + light.getId() + "] Traffic light thread interrupted.");
break;
}
}
System.out.println("[" + light.getId() + "] Traffic light thread stopped.");
}
/**
* Processes vehicles when a traffic light is GREEN.
* Dequeues vehicles and sends them to their next destination.
*
* @param light The traffic light that is currently green.
*/
private void processGreenLight(TrafficLight light) {
while (light.getState() == TrafficLightState.GREEN && light.getQueueSize() > 0) {
Vehicle vehicle = light.removeVehicle();
if (vehicle != null) {
// Get crossing time based on vehicle type
double crossingTime = getCrossingTimeForVehicle(vehicle);
// Simulate crossing time
try {
Thread.sleep((long) (crossingTime * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
// Update vehicle statistics
vehicle.addCrossingTime(crossingTime);
// Update intersection statistics
intersection.incrementVehiclesSent();
// Send vehicle to next destination
sendVehicleToNextDestination(vehicle);
}
}
}
/**
* Gets the crossing time for a vehicle based on its type.
*
* @param vehicle The vehicle.
* @return The crossing time in seconds.
*/
private double getCrossingTimeForVehicle(Vehicle vehicle) {
switch (vehicle.getType()) {
case BIKE:
return config.getBikeVehicleCrossingTime();
case LIGHT:
return config.getLightVehicleCrossingTime();
case HEAVY:
return config.getHeavyVehicleCrossingTime();
default:
return config.getLightVehicleCrossingTime();
}
}
/**
* Sends a vehicle to its next destination via socket connection.
*
* @param vehicle The vehicle that has crossed this intersection.
*/
private void sendVehicleToNextDestination(Vehicle vehicle) {
public void sendVehicleToNextDestination(Vehicle vehicle) {
String nextDestination = vehicle.getCurrentDestination();
try {
@@ -353,6 +286,10 @@ public class IntersectionProcess {
running = true;
System.out.println("\n[" + intersectionId + "] Server started on port " + port);
// Start traffic light threads when running is true
startTrafficLights();
System.out.println("[" + intersectionId + "] Waiting for incoming connections...\n");
// Main accept loop
@@ -460,6 +397,16 @@ public class IntersectionProcess {
System.out.println("=".repeat(60));
}
/**
* Gets the Intersection object managed by this process.
* Useful for testing and monitoring.
*
* @return The Intersection object.
*/
public Intersection getIntersection() {
return intersection;
}
// --- Inner class for Vehicle Transfer Messages ---
/**

View File

@@ -0,0 +1,204 @@
package sd.coordinator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import sd.config.SimulationConfig;
import sd.model.Message;
import sd.model.MessageType;
import sd.model.Vehicle;
import sd.serialization.SerializationException;
import sd.util.VehicleGenerator;
/**
* Coordinator process responsible for:
* 1. Vehicle generation (using VehicleGenerator)
* 2. Distributing vehicles to intersection processes via sockets
* 3. Managing simulation timing and shutdown
*
* This is the main entry point for the distributed simulation architecture.
*/
public class CoordinatorProcess {
private final SimulationConfig config;
private final VehicleGenerator vehicleGenerator;
private final Map<String, SocketClient> intersectionClients;
private double currentTime;
private int vehicleCounter;
private boolean running;
private double nextGenerationTime;
public static void main(String[] args) {
System.out.println("=".repeat(60));
System.out.println("COORDINATOR PROCESS - DISTRIBUTED TRAFFIC SIMULATION");
System.out.println("=".repeat(60));
try {
// 1. Load configuration
String configFile = args.length > 0 ? args[0] : "src/main/resources/simulation.properties";
System.out.println("Loading configuration from: " + configFile);
SimulationConfig config = new SimulationConfig(configFile);
CoordinatorProcess coordinator = new CoordinatorProcess(config);
// 2. Connect to intersection processes
System.out.println("\n" + "=".repeat(60));
coordinator.initialize();
// 3. Run the sim
System.out.println("\n" + "=".repeat(60));
coordinator.run();
} catch (IOException e) {
System.err.println("Failed to load configuration: " + e.getMessage());
System.exit(1);
} catch (Exception e) {
System.err.println("Coordinator error: " + e.getMessage());
System.exit(1);
}
}
public CoordinatorProcess(SimulationConfig config) {
this.config = config;
this.vehicleGenerator = new VehicleGenerator(config);
this.intersectionClients = new HashMap<>();
this.currentTime = 0.0;
this.vehicleCounter = 0;
this.running = false;
this.nextGenerationTime = 0.0;
System.out.println("Coordinator initialized with configuration:");
System.out.println(" - Simulation duration: " + config.getSimulationDuration() + "s");
System.out.println(" - Arrival model: " + config.getArrivalModel());
System.out.println(" - Arrival rate: " + config.getArrivalRate() + " vehicles/s");
}
public void initialize() {
System.out.println("Connecting to intersection processes...");
String[] intersectionIds = {"Cr1", "Cr2", "Cr3", "Cr4", "Cr5"};
for (String intersectionId : intersectionIds) {
try {
String host = config.getIntersectionHost(intersectionId);
int port = config.getIntersectionPort(intersectionId);
SocketClient client = new SocketClient(intersectionId, host, port);
client.connect();
intersectionClients.put(intersectionId, client);
} catch (IOException e) {
System.err.println("Failed to connect to " + intersectionId + ": " + e.getMessage());
}
}
System.out.println("Successfully connected to " + intersectionClients.size() + " intersection(s)");
if (intersectionClients.isEmpty()) {
System.err.println("WARNING: No intersections connected. Simulation cannot proceed.");
}
}
public void run() {
double duration = config.getSimulationDuration();
running = true;
System.out.println("Starting vehicle generation simulation...");
System.out.println("Duration: " + duration + " seconds");
System.out.println();
nextGenerationTime = vehicleGenerator.getNextArrivalTime(currentTime);
final double TIME_STEP = 0.1;
while (running && currentTime < duration) {
if (currentTime >= nextGenerationTime) {
generateAndSendVehicle();
nextGenerationTime = vehicleGenerator.getNextArrivalTime(currentTime);
}
currentTime += TIME_STEP;
}
System.out.println();
System.out.println("Simulation complete at t=" + String.format("%.2f", currentTime) + "s");
System.out.println("Total vehicles generated: " + vehicleCounter);
shutdown();
}
private void generateAndSendVehicle() {
Vehicle vehicle = vehicleGenerator.generateVehicle("V" + (++vehicleCounter), currentTime);
System.out.printf("[t=%.2f] Vehicle %s generated (type=%s, route=%s)%n",
currentTime, vehicle.getId(), vehicle.getType(), vehicle.getRoute());
if (vehicle.getRoute().isEmpty()) {
System.err.println("ERROR: Vehicle " + vehicle.getId() + " has empty route!");
return;
}
String entryIntersection = vehicle.getRoute().get(0);
sendVehicleToIntersection(vehicle, entryIntersection);
}
private void sendVehicleToIntersection(Vehicle vehicle, String intersectionId) {
SocketClient client = intersectionClients.get(intersectionId);
if (client == null || !client.isConnected()) {
System.err.println("ERROR: No connection to " + intersectionId + " for vehicle " + vehicle.getId());
return;
}
try {
Message message = new Message(
MessageType.VEHICLE_SPAWN,
"COORDINATOR",
intersectionId,
vehicle
);
client.send(message);
System.out.printf("->Sent to %s%n", intersectionId);
} catch (SerializationException | IOException e) {
System.err.println("ERROR: Failed to send vehicle " + vehicle.getId() + " to " + intersectionId);
System.err.println("Reason: " + e.getMessage());
}
}
public void shutdown() {
System.out.println();
System.out.println("=".repeat(60));
System.out.println("Shutting down coordinator...");
for (Map.Entry<String, SocketClient> entry : intersectionClients.entrySet()) {
String intersectionId = entry.getKey();
SocketClient client = entry.getValue();
try {
if (client.isConnected()) {
Message personalizedShutdown = new Message(
MessageType.SHUTDOWN,
"COORDINATOR",
intersectionId,
"Simulation complete"
);
client.send(personalizedShutdown);
System.out.println("Sent shutdown message to " + intersectionId);
}
} catch (SerializationException | IOException e) {
System.err.println("Error sending shutdown to " + intersectionId + ": " + e.getMessage());
} finally {
client.close();
}
}
System.out.println("Coordinator shutdown complete");
System.out.println("=".repeat(60));
}
public void stop() {
System.out.println("\nStop signal received...");
running = false;
}
}

View File

@@ -0,0 +1,124 @@
package sd.coordinator;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import sd.model.Message;
import sd.serialization.MessageSerializer;
import sd.serialization.SerializationException;
import sd.serialization.SerializerFactory;
/**
* Socket client for communication with a single intersection process.
*
* Handles a persistent TCP connection to one intersection,
* providing a simple way to send serialized messages.
*/
public class SocketClient {
private final String intersectionId;
private final String host;
private final int port;
private Socket socket;
private OutputStream outputStream;
private MessageSerializer serializer;
/**
* Creates a new SocketClient for a given intersection.
*
* @param intersectionId Intersection ID (ex. "Cr1")
* @param host Host address (ex. "localhost")
* @param port Port number
*/
public SocketClient(String intersectionId, String host, int port) {
this.intersectionId = intersectionId;
this.host = host;
this.port = port;
this.serializer = SerializerFactory.createDefault();
}
/**
* Connects to the intersection process via TCP.
*
* @throws IOException if the connection cannot be established
*/
public void connect() throws IOException {
try {
socket = new Socket(host, port);
outputStream = socket.getOutputStream();
System.out.println("Connected to " + intersectionId + " at " + host + ":" + port);
} catch (IOException e) {
System.err.println("Failed to connect to " + intersectionId + " at " + host + ":" + port);
throw e;
}
}
/**
* Sends a message to the connected intersection.
* The message is serialized and written over the socket.
*
* @param message The message to send
* @throws SerializationException if serialization fails
* @throws IOException if the socket write fails
*/
public void send(Message message) throws SerializationException, IOException {
if (socket == null || socket.isClosed()) {
throw new IOException("Socket is not connected to " + intersectionId);
}
try {
byte[] data = serializer.serialize(message);
// Prefix with message length (so receiver knows how much to read)
int length = data.length;
outputStream.write((length >> 24) & 0xFF);
outputStream.write((length >> 16) & 0xFF);
outputStream.write((length >> 8) & 0xFF);
outputStream.write(length & 0xFF);
outputStream.write(data);
outputStream.flush();
} catch (SerializationException | IOException e) {
System.err.println("Error sending message to " + intersectionId + ": " + e.getMessage());
throw e;
}
}
/**
* Closes the socket connection safely.
* Calling it multiple times wont cause issues.
*/
public void close() {
try {
if (outputStream != null) {
outputStream.close();
}
if (socket != null && !socket.isClosed()) {
socket.close();
System.out.println("Closed connection to " + intersectionId);
}
} catch (IOException e) {
System.err.println("Error closing connection to " + intersectionId + ": " + e.getMessage());
}
}
/**
* @return true if connected and socket is open, false otherwise
*/
public boolean isConnected() {
return socket != null && socket.isConnected() && !socket.isClosed();
}
public String getIntersectionId() {
return intersectionId;
}
@Override
public String toString() {
return String.format("SocketClient[intersection=%s, host=%s, port=%d, connected=%s]",
intersectionId, host, port, isConnected());
}
}

View File

@@ -0,0 +1,158 @@
package sd.engine;
import sd.IntersectionProcess;
import sd.config.SimulationConfig;
import sd.model.TrafficLight;
import sd.model.TrafficLightState;
import sd.model.Vehicle;
/**
* Implements the control logic for a single TrafficLight
* as a Runnable task that runs in its own Thread.
*
*/
public class TrafficLightThread implements Runnable {
/**
* The TrafficLight object (the *model*) that this thread controls.
* Contains the queue and the state.
*/
private final TrafficLight light;
/**
* The IntersectionProcess (the Process) that "owns" this thread.
* Used to call methods on the process, such as sendVehicleToNextDestination().
*/
private final IntersectionProcess process;
/**
* The simulation configuration, used to get timings (e.g., crossing time).
*/
private final SimulationConfig config;
/**
* Volatile flag to control the graceful shutdown mechanism.
* When set to 'false', the 'run()' loop terminates.
*/
private volatile boolean running;
/**
* Constructor for the Traffic Light Thread.
*
* @param light The TrafficLight object (model) to be controlled.
* @param process The parent IntersectionProcess (for callbacks).
* @param config The simulation configuration (to get timings).
*/
public TrafficLightThread(TrafficLight light, IntersectionProcess process, SimulationConfig config) {
this.light = light;
this.process = process;
this.config = config;
this.running = false; // Starts as 'stopped'
}
/**
* The main entry point for the thread.
* Implements the GREEN/RED cycle logic extracted from IntersectionProcess.
*
*/
@Override
public void run() {
this.running = true;
System.out.println("[" + light.getId() + "] Traffic light thread started.");
try {
// Main thread loop, continues while 'running' is true
// This 'running' flag is controlled by the parent IntersectionProcess
while (running) {
// --- GREEN Phase ---
light.changeState(TrafficLightState.GREEN); //
System.out.println("[" + light.getId() + "] State: GREEN");
// Process vehicles in the queue
processGreenLightQueue();
// Wait for green duration
Thread.sleep((long) (light.getGreenTime() * 1000)); //
if (!running) break; // Check flag after sleep
// --- RED Phase ---
light.changeState(TrafficLightState.RED); //
System.out.println("[" + light.getId() + "] State: RED");
// Wait for red duration
Thread.sleep((long) (light.getRedTime() * 1000)); //
}
} catch (InterruptedException e) {
// Apanha a InterruptedException (outra forma de parar a thread)
System.out.println("[" + light.getId() + "] Traffic light thread interrupted.");
this.running = false; // Garante que o loop termina
}
System.out.println("[" + light.getId() + "] Traffic light thread stopped.");
}
/**
* Processes vehicles in the queue while the traffic light is GREEN.
* Logic extracted from IntersectionProcess.processGreenLight()
*
*/
private void processGreenLightQueue() throws InterruptedException {
//
while (running && light.getState() == TrafficLightState.GREEN && light.getQueueSize() > 0) {
Vehicle vehicle = light.removeVehicle(); //
if (vehicle != null) {
// 1. Get the crossing time (t_sem)
double crossingTime = getCrossingTimeForVehicle(vehicle); //
// 2. Simulate the time the vehicle takes to cross
Thread.sleep((long) (crossingTime * 1000)); //
// 3. Update vehicle statistics
vehicle.addCrossingTime(crossingTime); //
// 4. Update intersection statistics
process.getIntersection().incrementVehiclesSent(); //
// 5. Call the parent Process to send the vehicle
process.sendVehicleToNextDestination(vehicle); //
}
}
}
/**
* Gets the crossing time for a vehicle based on its type.
* Logic extracted from IntersectionProcess.getCrossingTimeForVehicle()
*
*
* @param vehicle The vehicle.
* @return The crossing time in seconds.
*/
private double getCrossingTimeForVehicle(Vehicle vehicle) {
switch (vehicle.getType()) { //
case BIKE:
return config.getBikeVehicleCrossingTime(); //
case LIGHT:
return config.getLightVehicleCrossingTime(); //
case HEAVY:
return config.getHeavyVehicleCrossingTime(); //
default:
return config.getLightVehicleCrossingTime(); //
}
}
/**
* Requests the thread to stop gracefully (graceful shutdown).
* Sets the 'running' flag to false. The thread will finish
* its current sleep cycle and exit the 'run()' loop.
*/
public void shutdown() {
this.running = false;
}
}

View File

@@ -114,8 +114,8 @@ public class Intersection {
public void receiveVehicle(Vehicle vehicle) {
totalVehiclesReceived++;
// Advance route since vehicle just arrived at this intersection
vehicle.advanceRoute();
// Note: Route advancement is handled by SimulationEngine.handleVehicleArrival()
// before calling this method, so we don't advance here.
String nextDestination = vehicle.getCurrentDestination();

View File

@@ -0,0 +1,327 @@
package sd;
import java.io.IOException;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import sd.config.SimulationConfig;
/**
* Testes unitários para a classe ExitNodeProcess.
*
* Esta classe de testes verifica:
* - Construção e inicialização do processo
* - Criação e aceitação de conexões do servidor socket
* - Gestão do ciclo de vida (start/shutdown)
* - Processamento concorrente de múltiplas conexões
* - Impressão de estatísticas finais
*
* Os testes utilizam configurações temporárias e portas dedicadas (19001)
* para evitar conflitos com outros testes ou processos em execução.
*/
public class ExitNodeProcessTest {
@TempDir
Path tempDir;
private Path configFile;
private ExitNodeProcess exitNodeProcess;
private Thread exitNodeThread;
/**
* Configura o ambiente de teste antes de cada teste.
* Cria um ficheiro de configuração temporário com as definições necessárias.
*/
@BeforeEach
public void setUp() throws IOException {
configFile = tempDir.resolve("test-simulation.properties");
String configContent = """
# Test Exit Node Configuration
# Exit Configuration
exit.host=localhost
exit.port=19001
# Dashboard Configuration (will not be running in tests)
dashboard.host=localhost
dashboard.port=19000
# Vehicle Crossing Times
vehicle.bike.crossingTime=2.0
vehicle.light.crossingTime=3.0
vehicle.heavy.crossingTime=5.0
# Simulation Duration
simulation.duration=60.0
""";
Files.writeString(configFile, configContent);
}
/**
* Limpa os recursos após cada teste.
* Garante que o processo e threads são terminados corretamente.
*/
@AfterEach
public void tearDown() {
if (exitNodeProcess != null) {
exitNodeProcess.shutdown();
}
if (exitNodeThread != null && exitNodeThread.isAlive()) {
exitNodeThread.interrupt();
try {
exitNodeThread.join(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* Testa a construção bem-sucedida do ExitNodeProcess com configuração válida.
*/
@Test
public void testConstructor_Success() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
assertNotNull(exitNodeProcess);
}
/**
* Testa que uma exceção é lançada quando a configuração é inválida.
*/
@Test
public void testConstructor_InvalidConfig() {
Exception exception = assertThrows(IOException.class, () -> {
new SimulationConfig("non-existent-config.properties");
});
assertNotNull(exception);
}
/**
* Testa a inicialização sem dashboard disponível.
* Verifica que o processo continua a funcionar mesmo sem conexão ao dashboard.
*/
@Test
public void testInitialize_WithoutDashboard() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
assertDoesNotThrow(() -> exitNodeProcess.initialize());
}
/**
* Testa que o servidor socket é criado corretamente na porta configurada.
* Verifica que é possível estabelecer uma conexão ao socket do servidor.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testStart_ServerSocketCreated() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected when shutdown
}
});
exitNodeThread.start();
try {
assertTrue(latch.await(2, TimeUnit.SECONDS), "Exit node should start within timeout");
Thread.sleep(100);
assertDoesNotThrow(() -> {
try (Socket testSocket = new Socket("localhost", 19001)) {
assertTrue(testSocket.isConnected());
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Testa que o servidor aceita conexões de clientes.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testStart_AcceptsConnection() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(latch.await(2, TimeUnit.SECONDS));
Thread.sleep(200);
assertDoesNotThrow(() -> {
try (Socket socket = new Socket("localhost", 19001)) {
assertTrue(socket.isConnected());
}
});
}
/**
* Testa múltiplas inicializações e encerramentos do processo.
* Verifica que o processo pode ser iniciado e parado múltiplas vezes,
* permitindo reutilização da porta.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testMultipleStartStop() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(latch.await(2, TimeUnit.SECONDS));
Thread.sleep(100);
exitNodeProcess.shutdown();
Thread.sleep(100);
assertDoesNotThrow(() -> {
SimulationConfig config2 = new SimulationConfig(configFile.toString());
ExitNodeProcess exitNode2 = new ExitNodeProcess(config2);
exitNode2.initialize();
exitNode2.shutdown();
});
}
/**
* Testa que o shutdown fecha corretamente o servidor socket.
* Após o shutdown, novas conexões ao socket devem falhar.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testShutdown_ClosesServerSocket() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch startLatch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
startLatch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(startLatch.await(2, TimeUnit.SECONDS));
Thread.sleep(200);
exitNodeProcess.shutdown();
Thread.sleep(200);
assertThrows(IOException.class, () -> {
Socket socket = new Socket("localhost", 19001);
socket.close();
});
}
/**
* Testa que as estatísticas finais são impressas corretamente durante o shutdown.
* Verifica que o método não lança exceções mesmo sem dados processados.
*/
@Test
public void testPrintFinalStatistics() throws IOException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
assertDoesNotThrow(() -> exitNodeProcess.shutdown());
}
/**
* Testa o processamento de múltiplas conexões concorrentes.
* Verifica que o servidor consegue lidar com vários clientes simultaneamente
* usando o pool de threads.
*/
@Test
@Timeout(value = 3, unit = TimeUnit.SECONDS)
public void testMultipleConcurrentConnections() throws IOException, InterruptedException {
SimulationConfig config = new SimulationConfig(configFile.toString());
exitNodeProcess = new ExitNodeProcess(config);
exitNodeProcess.initialize();
CountDownLatch latch = new CountDownLatch(1);
exitNodeThread = new Thread(() -> {
try {
latch.countDown();
exitNodeProcess.start();
} catch (IOException e) {
// expected
}
});
exitNodeThread.start();
assertTrue(latch.await(2, TimeUnit.SECONDS));
Thread.sleep(200);
Thread[] clients = new Thread[3];
for (int i = 0; i < 3; i++) {
clients[i] = new Thread(() -> {
try (Socket socket = new Socket("localhost", 19001)) {
assertTrue(socket.isConnected());
Thread.sleep(100);
} catch (IOException | InterruptedException e) {
// ignore
}
});
clients[i].start();
}
for (Thread client : clients) {
client.join(1000);
}
}
}

View File

@@ -0,0 +1,206 @@
package sd;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import sd.model.TrafficLight;
import sd.model.TrafficLightState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
/**
* Test class to verify traffic light coordination within an intersection.
* Ensures that only ONE traffic light can be GREEN at any given time.
*/
public class TrafficLightCoordinationTest {
private IntersectionProcess intersectionProcess;
@BeforeEach
public void setUp() throws IOException {
// Create an intersection with multiple traffic lights
intersectionProcess = new IntersectionProcess("Cr2", "src/main/resources/simulation.properties");
intersectionProcess.initialize();
}
@AfterEach
public void tearDown() throws InterruptedException {
if (intersectionProcess != null) {
intersectionProcess.shutdown();
}
}
/**
* Test that verifies mutual exclusion between traffic lights.
* Monitors all traffic lights for 10 seconds and ensures that
* at most ONE light is GREEN at any point in time.
*/
@Test
public void testOnlyOneGreenLightAtATime() throws InterruptedException {
System.out.println("\n=== Testing Traffic Light Mutual Exclusion ===");
// Start the intersection
Thread intersectionThread = new Thread(() -> {
try {
intersectionProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
});
intersectionThread.start();
// Monitor traffic lights for violations
AtomicInteger maxGreenSimultaneously = new AtomicInteger(0);
AtomicInteger violationCount = new AtomicInteger(0);
List<String> violations = new ArrayList<>();
// Monitor for 10 seconds
long endTime = System.currentTimeMillis() + 10000;
while (System.currentTimeMillis() < endTime) {
int greenCount = 0;
StringBuilder currentState = new StringBuilder("States: ");
for (TrafficLight light : intersectionProcess.getIntersection().getTrafficLights()) {
TrafficLightState state = light.getState();
currentState.append(light.getDirection()).append("=").append(state).append(" ");
if (state == TrafficLightState.GREEN) {
greenCount++;
}
}
// Update maximum simultaneous green lights
if (greenCount > maxGreenSimultaneously.get()) {
maxGreenSimultaneously.set(greenCount);
}
// Check for violations (more than one green)
if (greenCount > 1) {
violationCount.incrementAndGet();
String violation = String.format("[VIOLATION] %d lights GREEN simultaneously: %s",
greenCount, currentState.toString());
violations.add(violation);
System.err.println(violation);
}
Thread.sleep(50); // Check every 50ms
}
System.out.println("\n=== Test Results ===");
System.out.println("Maximum simultaneous GREEN lights: " + maxGreenSimultaneously.get());
System.out.println("Total violations detected: " + violationCount.get());
if (!violations.isEmpty()) {
System.err.println("\nViolation details:");
violations.forEach(System.err::println);
}
// Assert that we never had more than one green light
assertEquals(0, violationCount.get(),
"Traffic light coordination violated! Multiple lights were GREEN simultaneously.");
assertTrue(maxGreenSimultaneously.get() <= 1,
"At most ONE light should be GREEN at any time. Found: " + maxGreenSimultaneously.get());
System.out.println("\n✅ Traffic light coordination working correctly!");
}
/**
* Test that verifies all traffic lights get a chance to be GREEN.
* Ensures fairness in the coordination mechanism.
*/
@Test
public void testAllLightsGetGreenTime() throws InterruptedException {
System.out.println("\n=== Testing Traffic Light Fairness ===");
// Start the intersection
Thread intersectionThread = new Thread(() -> {
try {
intersectionProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
});
intersectionThread.start();
// Track which lights have been green
List<TrafficLight> lights = intersectionProcess.getIntersection().getTrafficLights();
boolean[] hasBeenGreen = new boolean[lights.size()];
// Monitor for 15 seconds (enough time for all lights to cycle)
long endTime = System.currentTimeMillis() + 15000;
while (System.currentTimeMillis() < endTime) {
for (int i = 0; i < lights.size(); i++) {
if (lights.get(i).getState() == TrafficLightState.GREEN) {
hasBeenGreen[i] = true;
System.out.println("" + lights.get(i).getDirection() + " has been GREEN");
}
}
Thread.sleep(100);
}
// Check if all lights got green time
int greenCount = 0;
System.out.println("\n=== Fairness Results ===");
for (int i = 0; i < lights.size(); i++) {
String status = hasBeenGreen[i] ? "✓ YES" : "✗ NO";
System.out.println(lights.get(i).getDirection() + " got GREEN time: " + status);
if (hasBeenGreen[i]) greenCount++;
}
assertTrue(greenCount > 0, "At least one light should have been GREEN during the test");
System.out.println("\n" + greenCount + "/" + lights.size() + " lights were GREEN during test period");
}
/**
* Test that verifies the state transitions are consistent.
*/
@Test
public void testStateTransitionsAreConsistent() throws InterruptedException {
System.out.println("\n=== Testing State Transition Consistency ===");
Thread intersectionThread = new Thread(() -> {
try {
intersectionProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
});
intersectionThread.start();
List<TrafficLight> lights = intersectionProcess.getIntersection().getTrafficLights();
TrafficLightState[] previousStates = new TrafficLightState[lights.size()];
// Initialize previous states
for (int i = 0; i < lights.size(); i++) {
previousStates[i] = lights.get(i).getState();
}
int transitionCount = 0;
long endTime = System.currentTimeMillis() + 8000;
while (System.currentTimeMillis() < endTime) {
for (int i = 0; i < lights.size(); i++) {
TrafficLightState currentState = lights.get(i).getState();
if (currentState != previousStates[i]) {
transitionCount++;
System.out.println(lights.get(i).getDirection() + " transitioned: " +
previousStates[i] + "" + currentState);
previousStates[i] = currentState;
}
}
Thread.sleep(100);
}
System.out.println("\nTotal state transitions observed: " + transitionCount);
assertTrue(transitionCount > 0, "There should be state transitions during the test period");
}
}

View File

@@ -0,0 +1,302 @@
package sd.coordinator;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.jupiter.api.AfterEach;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import sd.model.Message;
import sd.model.MessageType;
import sd.model.Vehicle;
import sd.serialization.MessageSerializer;
import sd.serialization.SerializerFactory;
/**
* Integration tests for the Coordinator-side networking.
*
* What were checking here:
* 1. A SocketClient can actually connect to something listening
* 2. Messages go over the wire and can be deserialized
* 3. Vehicle payloads survive the trip
* 4. Shutdown messages can be broadcast to multiple intersections
*
* We do this by spinning up a tiny mock intersection server in-process.
*/
class CoordinatorIntegrationTest {
private List<MockIntersectionServer> mockServers;
private static final int BASE_PORT = 9001; // keep clear of real ports
@BeforeEach
void setUp() {
mockServers = new ArrayList<>();
}
@AfterEach
void tearDown() {
// Stop all mock servers
for (MockIntersectionServer server : mockServers) {
server.stop();
}
mockServers.clear();
}
/**
* Can the client open a TCP connection to our fake intersection?
*/
@Test
@Timeout(5)
void testSocketClientConnection() throws IOException, InterruptedException {
MockIntersectionServer server = new MockIntersectionServer("Cr1", BASE_PORT);
server.start();
mockServers.add(server);
// tiny pause to let the server bind
Thread.sleep(100);
SocketClient client = new SocketClient("Cr1", "localhost", BASE_PORT);
client.connect();
assertTrue(client.isConnected(), "Client should be connected to mock intersection");
client.close();
}
/**
* End-to-end: send a message, make sure the server actually receives it.
*/
@Test
@Timeout(5)
void testMessageTransmission() throws Exception {
MockIntersectionServer server = new MockIntersectionServer("Cr1", BASE_PORT);
server.start();
mockServers.add(server);
Thread.sleep(100);
SocketClient client = new SocketClient("Cr1", "localhost", BASE_PORT);
client.connect();
Message testMessage = new Message(
MessageType.VEHICLE_SPAWN,
"COORDINATOR",
"Cr1",
"Test payload"
);
client.send(testMessage);
// give the server a moment to read and deserialize
Thread.sleep(200);
assertFalse(
server.getReceivedMessages().isEmpty(),
"Mock server should have received at least one message"
);
Message receivedMsg = server.getReceivedMessages().poll();
assertNotNull(receivedMsg, "Server should have actually received a message");
assertEquals(MessageType.VEHICLE_SPAWN, receivedMsg.getType(), "Message type should match what we sent");
assertEquals("COORDINATOR", receivedMsg.getSenderId(), "Sender ID should be preserved");
assertEquals("Cr1", receivedMsg.getDestinationId(), "Destination ID should be preserved");
client.close();
}
/**
* Make sure vehicle payloads survive the trip and arrive non-null.
*/
@Test
@Timeout(5)
void testVehicleSpawnMessage() throws Exception {
MockIntersectionServer server = new MockIntersectionServer("Cr1", BASE_PORT);
server.start();
mockServers.add(server);
Thread.sleep(100);
SocketClient client = new SocketClient("Cr1", "localhost", BASE_PORT);
client.connect();
// fake a vehicle like the coordinator would send
List<String> route = List.of("Cr1", "Cr4", "Cr5", "S");
Vehicle vehicle = new Vehicle("V1", sd.model.VehicleType.LIGHT, 0.0, route);
Message spawnMessage = new Message(
MessageType.VEHICLE_SPAWN,
"COORDINATOR",
"Cr1",
vehicle
);
client.send(spawnMessage);
Thread.sleep(200);
Message receivedMsg = server.getReceivedMessages().poll();
assertNotNull(receivedMsg, "Mock server should receive the spawn message");
assertEquals(MessageType.VEHICLE_SPAWN, receivedMsg.getType(), "Message should be of type VEHICLE_SPAWN");
assertNotNull(receivedMsg.getPayload(), "Payload should not be null (vehicle must arrive)");
client.close();
}
/**
* Broadcast shutdown to multiple mock intersections and see if all of them get it.
*/
@Test
@Timeout(5)
void testShutdownMessageBroadcast() throws Exception {
// Start a couple of fake intersections
for (int i = 1; i <= 3; i++) {
MockIntersectionServer server = new MockIntersectionServer("Cr" + i, BASE_PORT + i - 1);
server.start();
mockServers.add(server);
}
Thread.sleep(200);
// Connect to all of them
List<SocketClient> clients = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
SocketClient client = new SocketClient("Cr" + i, "localhost", BASE_PORT + i - 1);
client.connect();
clients.add(client);
}
Message shutdownMessage = new Message(
MessageType.SHUTDOWN,
"COORDINATOR",
"ALL",
"Simulation complete"
);
for (SocketClient client : clients) {
client.send(shutdownMessage);
}
Thread.sleep(200);
for (MockIntersectionServer server : mockServers) {
assertFalse(
server.getReceivedMessages().isEmpty(),
"Server " + server.getIntersectionId() + " should have received the shutdown message"
);
Message msg = server.getReceivedMessages().poll();
assertEquals(MessageType.SHUTDOWN, msg.getType(), "Server should receive a SHUTDOWN message");
}
for (SocketClient client : clients) {
client.close();
}
}
/**
* Tiny TCP server that pretends to be an intersection.
* It:
* - listens on a port
* - accepts connections
* - reads length-prefixed messages
* - deserializes them and stores them for the test to inspect
*/
private static class MockIntersectionServer {
private final String intersectionId;
private final int port;
private ServerSocket serverSocket;
private Thread serverThread;
private volatile boolean running;
private final ConcurrentLinkedQueue<Message> receivedMessages;
private final MessageSerializer serializer;
public MockIntersectionServer(String intersectionId, int port) {
this.intersectionId = intersectionId;
this.port = port;
this.receivedMessages = new ConcurrentLinkedQueue<>();
this.serializer = SerializerFactory.createDefault();
this.running = false;
}
public void start() throws IOException {
serverSocket = new ServerSocket(port);
running = true;
System.out.printf("Mock %s listening on port %d%n", intersectionId, port);
serverThread = new Thread(() -> {
try {
while (running) {
Socket clientSocket = serverSocket.accept();
handleClient(clientSocket);
}
} catch (IOException e) {
if (running) {
System.err.println("Mock " + intersectionId + " server error: " + e.getMessage());
}
}
}, "mock-" + intersectionId + "-listener");
serverThread.start();
}
private void handleClient(Socket clientSocket) {
new Thread(() -> {
try (DataInputStream input = new DataInputStream(clientSocket.getInputStream())) {
while (running) {
// Read length prefix (4 bytes, big-endian)
int length = input.readInt();
byte[] data = new byte[length];
input.readFully(data);
Message message = serializer.deserialize(data, Message.class);
receivedMessages.offer(message);
System.out.println("Mock " + intersectionId + " received: " + message.getType());
}
} catch (IOException e) {
if (running) {
System.err.println("Mock " + intersectionId + " client handler error: " + e.getMessage());
}
} catch (Exception e) {
System.err.println("Mock " + intersectionId + " deserialization error: " + e.getMessage());
}
}, "mock-" + intersectionId + "-client").start();
}
public void stop() {
running = false;
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
if (serverThread != null) {
serverThread.interrupt();
serverThread.join(1000);
}
System.out.printf("Mock %s stopped%n", intersectionId);
} catch (IOException | InterruptedException e) {
System.err.println("Error stopping mock server " + intersectionId + ": " + e.getMessage());
}
}
public ConcurrentLinkedQueue<Message> getReceivedMessages() {
return receivedMessages;
}
public String getIntersectionId() {
return intersectionId;
}
}
}

View File

@@ -0,0 +1,194 @@
package sd.coordinator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import sd.config.SimulationConfig;
import sd.model.Vehicle;
import sd.util.VehicleGenerator;
/**
* Tests for the Coordinator/vehicle-generation layer.
*
* What were checking here:
* 1. Coordinator can be created with a valid config
* 2. Vehicle arrival times are monotonic and sane
* 3. Vehicle IDs are created in the format we expect (V1, V2, ...)
* 4. Generated vehicles have proper routes (start at CrX, end at S)
* 5. Config actually has intersection info
* 6. Duration in config is not something crazy
*/
class CoordinatorProcessTest {
private SimulationConfig config;
private static final String TEST_CONFIG = "src/main/resources/simulation.properties";
@BeforeEach
void setUp() throws IOException {
config = new SimulationConfig(TEST_CONFIG);
}
@AfterEach
void tearDown() {
config = null;
}
/**
* Basic smoke test: can we build a coordinator with this config?
*/
@Test
void testCoordinatorInitialization() {
CoordinatorProcess coordinator = new CoordinatorProcess(config);
assertNotNull(coordinator, "Coordinator should be created with a valid config");
}
/**
* Make sure the VehicleGenerator is giving us increasing arrival times,
* i.e. time doesnt go backwards and intervals look reasonable.
*/
@Test
void testVehicleGenerationTiming() {
VehicleGenerator generator = new VehicleGenerator(config);
double currentTime = 0.0;
List<Double> arrivalTimes = new ArrayList<>();
// generate a small batch to inspect
for (int i = 0; i < 10; i++) {
double nextArrival = generator.getNextArrivalTime(currentTime);
arrivalTimes.add(nextArrival);
currentTime = nextArrival;
}
// times should strictly increase
for (int i = 1; i < arrivalTimes.size(); i++) {
assertTrue(
arrivalTimes.get(i) > arrivalTimes.get(i - 1),
"Arrival times must increase — got " + arrivalTimes.get(i - 1) + " then " + arrivalTimes.get(i)
);
}
// and they shouldn't be nonsense
for (double time : arrivalTimes) {
assertTrue(time >= 0, "Arrival time should not be negative (got " + time + ")");
assertTrue(time < 1000, "Arrival time looks suspiciously large: " + time);
}
}
/**
* We generate V1..V5 manually and make sure the IDs are exactly those.
*/
@Test
void testVehicleIdGeneration() {
VehicleGenerator generator = new VehicleGenerator(config);
List<Vehicle> vehicles = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
Vehicle v = generator.generateVehicle("V" + i, 0.0);
vehicles.add(v);
assertEquals("V" + i, v.getId(), "Vehicle ID should be 'V" + i + "' but got " + v.getId());
}
// just to be safe, no duplicates in that small set
long distinctCount = vehicles.stream().map(Vehicle::getId).distinct().count();
assertEquals(5, distinctCount, "Vehicle IDs in this batch should all be unique");
}
/**
* A generated vehicle should:
* - have a non-empty route
* - start in a known intersection (Cr1..Cr5)
* - end in S (exit)
*/
@Test
void testVehicleRouteValidity() {
VehicleGenerator generator = new VehicleGenerator(config);
for (int i = 0; i < 20; i++) {
Vehicle vehicle = generator.generateVehicle("V" + i, 0.0);
assertNotNull(vehicle.getRoute(), "Vehicle route should not be null");
assertFalse(vehicle.getRoute().isEmpty(), "Vehicle route should not be empty");
String firstHop = vehicle.getRoute().get(0);
assertTrue(
firstHop.matches("Cr[1-5]"),
"First hop should be a valid intersection (Cr1..Cr5), got: " + firstHop
);
String lastHop = vehicle.getRoute().get(vehicle.getRoute().size() - 1);
assertEquals("S", lastHop, "Last hop should be exit 'S' but got: " + lastHop);
}
}
/**
* Whatever is in simulation.properties should give us a sane duration.
*/
@Test
void testSimulationDuration() {
double duration = config.getSimulationDuration();
assertTrue(duration > 0, "Simulation duration must be positive");
assertTrue(duration >= 1.0, "Simulation should run at least 1 second (got " + duration + ")");
assertTrue(duration <= 86400.0, "Simulation should not run more than a day (got " + duration + ")");
}
/**
* Check that the 5 intersections defined in the architecture
* actually exist in the config and have valid network data.
*/
@Test
void testIntersectionConfiguration() {
String[] intersectionIds = {"Cr1", "Cr2", "Cr3", "Cr4", "Cr5"};
for (String id : intersectionIds) {
String host = config.getIntersectionHost(id);
int port = config.getIntersectionPort(id);
assertNotNull(host, "Host should not be null for " + id);
assertFalse(host.isEmpty(), "Host should not be empty for " + id);
assertTrue(port > 0, "Port should be > 0 for " + id + " (got " + port + ")");
assertTrue(port < 65536, "Port should be a valid TCP port for " + id + " (got " + port + ")");
}
}
/**
* Quick sanity check: over a bunch of generated vehicles,
* we should eventually see the different vehicle types appear.
*
* Note: this is probabilistic, so we're not being super strict.
*/
@Test
void testVehicleTypeDistribution() {
VehicleGenerator generator = new VehicleGenerator(config);
boolean hasBike = false;
boolean hasLight = false;
boolean hasHeavy = false;
// 50 is enough for a "we're probably fine" test
for (int i = 0; i < 50; i++) {
Vehicle vehicle = generator.generateVehicle("V" + i, 0.0);
switch (vehicle.getType()) {
case BIKE -> hasBike = true;
case LIGHT -> hasLight = true;
case HEAVY -> hasHeavy = true;
}
}
// at least one of them should have shown up — if not, RNG is cursed
assertTrue(
hasBike || hasLight || hasHeavy,
"Expected to see at least one vehicle type after 50 generations"
);
}
}