shutdown and teardown fixes + incoming connection handler

This commit is contained in:
2025-11-11 17:28:44 +00:00
parent 84cba39597
commit 6b94d727e2
2 changed files with 64 additions and 147 deletions

View File

@@ -12,10 +12,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import sd.config.SimulationConfig; import sd.config.SimulationConfig;
import sd.engine.TrafficLightThread;
import sd.model.Intersection; import sd.model.Intersection;
import sd.model.MessageType; import sd.model.MessageType;
import sd.model.TrafficLight; import sd.model.TrafficLight;
import sd.model.TrafficLightState;
import sd.model.Vehicle; import sd.model.Vehicle;
import sd.protocol.MessageProtocol; import sd.protocol.MessageProtocol;
import sd.protocol.SocketConnection; import sd.protocol.SocketConnection;
@@ -178,128 +178,15 @@ public class IntersectionProcess {
System.out.println("\n[" + intersectionId + "] Starting traffic light threads..."); System.out.println("\n[" + intersectionId + "] Starting traffic light threads...");
for (TrafficLight light : intersection.getTrafficLights()) { for (TrafficLight light : intersection.getTrafficLights()) {
trafficLightPool.submit(() -> runTrafficLightCycle(light));
TrafficLightThread lightTask = new TrafficLightThread(light, this, config);
trafficLightPool.submit(lightTask);
System.out.println(" Started thread for: " + light.getDirection()); System.out.println(" Started thread for: " + light.getDirection());
} }
} }
/**
* The main loop for a traffic light thread.
* Continuously cycles between green and red states.
*
* only one traffic light can be green at any given time in this intersection.
*
* @param light The traffic light to control.
*/
private void runTrafficLightCycle(TrafficLight light) {
System.out.println("[" + light.getId() + "] Traffic light thread started.");
while (running) {
try {
// Acquire coordination lock to become green
trafficCoordinationLock.lock();
try {
// Wait until no other direction is green
while (currentGreenDirection != null && running) {
trafficCoordinationLock.unlock();
Thread.sleep(100); // Brief wait before retrying
trafficCoordinationLock.lock();
}
if (!running) {
break; // Exit if shutting down
}
// Mark this direction as the current green light
currentGreenDirection = light.getDirection();
light.changeState(TrafficLightState.GREEN);
System.out.println("[" + light.getId() + "] State: GREEN");
} finally {
trafficCoordinationLock.unlock();
}
// Process vehicles while green
processGreenLight(light);
// Wait for green duration
Thread.sleep((long) (light.getGreenTime() * 1000));
// Release coordination lock (turn red)
trafficCoordinationLock.lock();
try {
light.changeState(TrafficLightState.RED);
currentGreenDirection = null; // Release exclusive access
System.out.println("[" + light.getId() + "] State: RED (RELEASED ACCESS)");
} finally {
trafficCoordinationLock.unlock();
}
// Wait for red duration
Thread.sleep((long) (light.getRedTime() * 1000));
} catch (InterruptedException e) {
System.out.println("[" + light.getId() + "] Traffic light thread interrupted.");
break;
}
}
System.out.println("[" + light.getId() + "] Traffic light thread stopped.");
}
/**
* Processes vehicles when a traffic light is GREEN.
* Dequeues vehicles and sends them to their next destination.
*
* @param light The traffic light that is currently green.
*/
private void processGreenLight(TrafficLight light) {
while (light.getState() == TrafficLightState.GREEN && light.getQueueSize() > 0) {
Vehicle vehicle = light.removeVehicle();
if (vehicle != null) {
// Get crossing time based on vehicle type
double crossingTime = getCrossingTimeForVehicle(vehicle);
// Simulate crossing time
try {
Thread.sleep((long) (crossingTime * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
// Update vehicle statistics
vehicle.addCrossingTime(crossingTime);
// Update intersection statistics
intersection.incrementVehiclesSent();
// Send vehicle to next destination
sendVehicleToNextDestination(vehicle);
}
}
}
/**
* Gets the crossing time for a vehicle based on its type.
*
* @param vehicle The vehicle.
* @return The crossing time in seconds.
*/
private double getCrossingTimeForVehicle(Vehicle vehicle) {
switch (vehicle.getType()) {
case BIKE:
return config.getBikeVehicleCrossingTime();
case LIGHT:
return config.getLightVehicleCrossingTime();
case HEAVY:
return config.getHeavyVehicleCrossingTime();
default:
return config.getLightVehicleCrossingTime();
}
}
/** /**
* Sends a vehicle to its next destination via socket connection. * Sends a vehicle to its next destination via socket connection.
* *
@@ -410,12 +297,21 @@ public class IntersectionProcess {
System.out.println("[" + intersectionId + "] New connection accepted from " + System.out.println("[" + intersectionId + "] New connection accepted from " +
clientSocket.getInetAddress().getHostAddress()); clientSocket.getInetAddress().getHostAddress());
// Check running flag again before handling - prevents accepting during shutdown // Check running flag again before handling
if (!running) { if (!running) {
clientSocket.close(); clientSocket.close();
break; break;
} }
// **Set timeout before submitting to handler**
try {
clientSocket.setSoTimeout(1000);
} catch (java.net.SocketException e) {
System.err.println("[" + intersectionId + "] Failed to set timeout: " + e.getMessage());
clientSocket.close();
continue;
}
// Handle each connection in a separate thread // Handle each connection in a separate thread
connectionHandlerPool.submit(() -> handleIncomingConnection(clientSocket)); connectionHandlerPool.submit(() -> handleIncomingConnection(clientSocket));
@@ -424,7 +320,6 @@ public class IntersectionProcess {
if (!running) { if (!running) {
break; // Normal shutdown break; // Normal shutdown
} }
// Unexpected error during normal operation
System.err.println("[" + intersectionId + "] Error accepting connection: " + System.err.println("[" + intersectionId + "] Error accepting connection: " +
e.getMessage()); e.getMessage());
} }
@@ -438,11 +333,16 @@ public class IntersectionProcess {
* @param clientSocket The accepted socket connection. * @param clientSocket The accepted socket connection.
*/ */
private void handleIncomingConnection(Socket clientSocket) { private void handleIncomingConnection(Socket clientSocket) {
try (SocketConnection connection = new SocketConnection(clientSocket)) { try {
// Set socket timeout so receiveMessage() won't block forever
clientSocket.setSoTimeout(1000); // 1 second timeout clientSocket.setSoTimeout(1000); // 1 second timeout
} catch (java.net.SocketException e) {
System.err.println("[" + intersectionId + "] Failed to set socket timeout: " + e.getMessage());
return;
}
try (SocketConnection connection = new SocketConnection(clientSocket)) {
System.out.println("[" + intersectionId + "] New connection accepted from " + System.out.println("[" + intersectionId + "] New connection accepted from " +
clientSocket.getInetAddress().getHostAddress()); clientSocket.getInetAddress().getHostAddress());
@@ -462,7 +362,7 @@ public class IntersectionProcess {
} }
} catch (java.net.SocketTimeoutException e) { } catch (java.net.SocketTimeoutException e) {
// Timeout is expected - just check running flag and continue // Timeout - check running flag and continue
if (!running) { if (!running) {
break; break;
} }
@@ -487,46 +387,57 @@ public class IntersectionProcess {
* Shuts down all threads and closes all connections. * Shuts down all threads and closes all connections.
*/ */
public void shutdown() { public void shutdown() {
// Check if already shutdown
if (!running) {
return; // Already shutdown, do nothing
}
System.out.println("\n[" + intersectionId + "] Shutting down..."); System.out.println("\n[" + intersectionId + "] Shutting down...");
running = false; running = false;
// 1. Close ServerSocket first
if (serverSocket != null && !serverSocket.isClosed()) { if (serverSocket != null && !serverSocket.isClosed()) {
try { try {
serverSocket.close(); serverSocket.close();
} catch (IOException e) { } catch (IOException e) {
System.err.println("[" + intersectionId + "] Error closing server socket: " + // Expected
e.getMessage());
} }
} }
// 2. Shutdown thread pools with force
if (trafficLightPool != null && !trafficLightPool.isShutdown()) {
trafficLightPool.shutdownNow();
}
if (connectionHandlerPool != null && !connectionHandlerPool.isShutdown()) {
connectionHandlerPool.shutdownNow();
}
// 3. Wait briefly for termination (don't block forever)
try {
if (trafficLightPool != null) {
trafficLightPool.awaitTermination(1, TimeUnit.SECONDS);
}
if (connectionHandlerPool != null) {
connectionHandlerPool.awaitTermination(1, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 4. Close outgoing connections
synchronized (outgoingConnections) { synchronized (outgoingConnections) {
for (SocketConnection conn : outgoingConnections.values()) { for (SocketConnection conn : outgoingConnections.values()) {
try { try {
conn.close(); conn.close();
} catch (Exception e) { } catch (Exception e) {
// Ignore errors during shutdown // Ignore
} }
} }
outgoingConnections.clear(); outgoingConnections.clear();
} }
trafficLightPool.shutdown();
connectionHandlerPool.shutdownNow(); // Use shutdownNow() to interrupt running tasks
try {
if (!trafficLightPool.awaitTermination(2, TimeUnit.SECONDS)) {
trafficLightPool.shutdownNow();
}
if (!connectionHandlerPool.awaitTermination(2, TimeUnit.SECONDS)) {
connectionHandlerPool.shutdownNow();
}
} catch (InterruptedException e) {
trafficLightPool.shutdownNow();
connectionHandlerPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("[" + intersectionId + "] Shutdown complete."); System.out.println("[" + intersectionId + "] Shutdown complete.");
System.out.println("=".repeat(60) + "\n"); System.out.println("============================================================\n");
} }
/** /**

View File

@@ -97,11 +97,17 @@ public class IntersectionProcessTest {
Files.writeString(configFile, configContent); Files.writeString(configFile, configContent);
} }
// cleanup after tests
@AfterEach @AfterEach
public void tearDown() { public void tearDown() {
if (intersectionProcess != null) { if (intersectionProcess != null) {
try {
// Only shutdown if still running
intersectionProcess.shutdown(); intersectionProcess.shutdown();
} catch (Exception e) {
System.err.println("Error in tearDown: " + e.getMessage());
} finally {
intersectionProcess = null;
}
} }
} }