Files
SD/main/src/main/java/sd/dashboard/DashboardClientHandler.java

180 lines
7.6 KiB
Java

package sd.dashboard;
import java.io.IOException;
import java.net.Socket;
import java.util.Map;
import sd.model.MessageType;
import sd.protocol.MessageProtocol;
import sd.protocol.SocketConnection;
/**
* Worker responsável pelo processamento dedicado de uma conexão de cliente TCP no Dashboard.
* <p>
* Esta classe implementa o padrão <i>Thread-per-Client</i>. Cada instância executa numa
* thread separada, garantindo que a latência de rede ou o processamento de mensagens
* de um nó (Interseção/Coordenador) não bloqueie a receção de telemetria dos outros.
* <p>
* As suas principais funções são:
* <ol>
* <li>Manter a conexão persistente com o nó remoto.</li>
* <li>Desserializar mensagens de protocolo recebidas.</li>
* <li>Normalizar payloads JSON (resolvendo ambiguidades de tipagem do Gson).</li>
* <li>Atualizar o objeto partilhado {@link DashboardStatistics} de forma thread-safe.</li>
* </ol>
*/
public class DashboardClientHandler implements Runnable {
private final Socket clientSocket;
private final DashboardStatistics statistics;
/**
* Inicializa o handler com o socket ativo e a referência para o agregador de estatísticas.
*
* @param clientSocket O socket TCP conectado ao nó remoto.
* @param statistics O objeto singleton partilhado onde as métricas serão agregadas.
*/
public DashboardClientHandler(Socket clientSocket, DashboardStatistics statistics) {
this.clientSocket = clientSocket;
this.statistics = statistics;
}
/**
* Ciclo de vida da conexão.
* <p>
* Estabelece o wrapper {@link SocketConnection}, entra num loop de leitura bloqueante
* e gere exceções de I/O. Garante o fecho limpo do socket em caso de desconexão ou erro.
*/
@Override
public void run() {
String clientInfo = clientSocket.getInetAddress().getHostAddress() + ":" + clientSocket.getPort();
try (SocketConnection connection = new SocketConnection(clientSocket)) {
System.out.println("[Handler] Started handling client: " + clientInfo);
while (!Thread.currentThread().isInterrupted()) {
try {
MessageProtocol message = connection.receiveMessage();
if (message == null) {
System.out.println("[Handler] Client disconnected: " + clientInfo);
break;
}
processMessage(message);
} catch (ClassNotFoundException e) {
System.err.println("[Handler] Unknown message class from " + clientInfo + ": " + e.getMessage());
} catch (IOException e) {
System.out.println("[Handler] Connection error with " + clientInfo + ": " + e.getMessage());
break;
}
}
} catch (IOException e) {
System.err.println("[Handler] Error initializing connection with " + clientInfo + ": " + e.getMessage());
} finally {
try {
if (!clientSocket.isClosed()) {
clientSocket.close();
}
} catch (IOException e) {
System.err.println("[Handler] Error closing socket for " + clientInfo + ": " + e.getMessage());
}
}
}
/**
* Valida e extrai os dados estatísticos da mensagem.
* <p>
* Implementa uma lógica de correção de tipagem para payloads desserializados via Gson.
* Frequentemente, objetos genéricos são desserializados como {@code LinkedHashMap} em vez
* da classe alvo {@link StatsUpdatePayload}. Este método deteta essa situação e realiza
* uma conversão "round-trip" (Map -> JSON -> Object) para garantir a integridade dos dados.
*
* @param message A mensagem recebida da rede.
*/
private void processMessage(MessageProtocol message) {
if (message.getType() != MessageType.STATS_UPDATE) {
System.out.println("[Handler] Ignoring non-statistics message type: " + message.getType());
return;
}
String senderId = message.getSourceNode();
Object payload = message.getPayload();
System.out.println("[Handler] Received STATS_UPDATE from: " + senderId);
// Handle both direct StatsUpdatePayload and Gson-deserialized Map
StatsUpdatePayload stats;
if (payload instanceof StatsUpdatePayload) {
stats = (StatsUpdatePayload) payload;
} else if (payload instanceof java.util.Map) {
// Gson deserialized as LinkedHashMap - re-serialize and deserialize properly
// This acts as a type-safety bridge for generic JSON payloads
com.google.gson.Gson gson = new com.google.gson.Gson();
String json = gson.toJson(payload);
stats = gson.fromJson(json, StatsUpdatePayload.class);
} else {
System.err.println("[Handler] Unknown payload type: " +
(payload != null ? payload.getClass().getName() : "null"));
return;
}
updateStatistics(senderId, stats);
}
/**
* Aplica os dados recebidos ao modelo global de estatísticas.
* <p>
* Distingue entre atualizações incrementais (ex: contagem de veículos) e
* substituições de estado (ex: tempo total de sistema reportado pelo nó de saída).
*
* @param senderId Identificador do nó que enviou a atualização (ex: "Cr1", "ExitNode").
* @param stats O objeto DTO contendo as métricas normalizadas.
*/
private void updateStatistics(String senderId, StatsUpdatePayload stats) {
if (stats.getTotalVehiclesGenerated() >= 0) {
statistics.updateVehiclesGenerated(stats.getTotalVehiclesGenerated());
}
if (stats.getTotalVehiclesCompleted() >= 0) {
statistics.updateVehiclesCompleted(stats.getTotalVehiclesCompleted());
}
// Exit Node sends cumulative totals, so we SET rather than ADD
if (stats.getTotalSystemTime() >= 0) {
statistics.setTotalSystemTime(stats.getTotalSystemTime());
}
if (stats.getTotalWaitingTime() >= 0) {
statistics.setTotalWaitingTime(stats.getTotalWaitingTime());
}
// Process vehicle type statistics (from Exit Node)
if (stats.getVehicleTypeCounts() != null && !stats.getVehicleTypeCounts().isEmpty()) {
Map<sd.model.VehicleType, Integer> counts = stats.getVehicleTypeCounts();
Map<sd.model.VehicleType, Long> waitTimes = stats.getVehicleTypeWaitTimes();
for (var entry : counts.entrySet()) {
sd.model.VehicleType type = entry.getKey();
int count = entry.getValue();
long waitTime = (waitTimes != null && waitTimes.containsKey(type))
? waitTimes.get(type) : 0L;
statistics.updateVehicleTypeStats(type, count, waitTime);
}
}
// Process intersection statistics (from Intersection processes)
if (senderId.startsWith("Cr") || senderId.startsWith("E")) {
statistics.updateIntersectionStats(
senderId,
stats.getIntersectionArrivals(),
stats.getIntersectionDepartures(),
stats.getIntersectionQueueSize()
);
}
System.out.println("[Handler] Successfully updated statistics from: " + senderId);
}
}