diff --git a/QemuVmManager.Core/P2PNode.cs b/QemuVmManager.Core/P2PNode.cs index 363cfb7..40f15a8 100644 --- a/QemuVmManager.Core/P2PNode.cs +++ b/QemuVmManager.Core/P2PNode.cs @@ -651,6 +651,18 @@ public class P2PNode : IDisposable foreach (var node in staleNodes) { _knownNodes.Remove(node.NodeId); + + // Update cluster state + lock (_clusterLock) + { + var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == node.NodeId); + if (clusterNode != null) + { + ClusterState.Nodes.Remove(clusterNode); + ClusterState.LastUpdated = DateTime.UtcNow; + } + } + NodeLeft?.Invoke(this, node); _logger?.LogInformation("Removed stale node {NodeId}", node.NodeId); } @@ -711,10 +723,10 @@ public class P2PNode : IDisposable switch (messageType) { case "heartbeat": - await ProcessHeartbeatAsync(data); + await ProcessHeartbeatAsync(data, remoteEndpoint); break; case "discovery": - await ProcessDiscoveryAsync(data); + await ProcessDiscoveryAsync(data, remoteEndpoint); break; default: _logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType); @@ -761,10 +773,11 @@ public class P2PNode : IDisposable switch (messageType) { case "heartbeat": - await ProcessHeartbeatAsync(data); + await ProcessHeartbeatAsync(data, null); // No remote endpoint for TCP break; case "discovery": - await ProcessDiscoveryAsync(data); + // Discovery messages are typically UDP, but handle gracefully + _logger?.LogWarning("Received discovery message via TCP - unexpected"); break; case "election_request": await ProcessElectionRequestAsync(data, writer); @@ -783,7 +796,7 @@ public class P2PNode : IDisposable } } - private async Task ProcessHeartbeatAsync(JsonElement data) + private async Task ProcessHeartbeatAsync(JsonElement data, IPEndPoint? remoteEndpoint = null) { var nodeId = data.GetProperty("nodeId").GetString()!; var role = Enum.Parse(data.GetProperty("role").GetString()!); @@ -803,15 +816,42 @@ public class P2PNode : IDisposable _lastHeartbeat = timestamp; } - // Update node info + // Update node info in known nodes if (_knownNodes.TryGetValue(nodeId, out var node)) { node.LastSeen = DateTime.UtcNow; node.Role = role; + + // Update IP address if we received it from UDP + if (remoteEndpoint != null && node.IpAddress == IPAddress.Any) + { + node.IpAddress = remoteEndpoint.Address; + node.Port = remoteEndpoint.Port; + } + } + + // Update cluster state + lock (_clusterLock) + { + var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId); + if (clusterNode != null) + { + clusterNode.LastSeen = DateTime.UtcNow; + clusterNode.Role = role; + + // Update IP address if we received it from UDP + if (remoteEndpoint != null && clusterNode.IpAddress == IPAddress.Any) + { + clusterNode.IpAddress = remoteEndpoint.Address; + clusterNode.Port = remoteEndpoint.Port; + } + + ClusterState.LastUpdated = DateTime.UtcNow; + } } } - private async Task ProcessDiscoveryAsync(JsonElement data) + private async Task ProcessDiscoveryAsync(JsonElement data, IPEndPoint remoteEndpoint) { var nodeId = data.GetProperty("nodeId").GetString()!; @@ -821,8 +861,8 @@ public class P2PNode : IDisposable { NodeId = nodeId, Hostname = Environment.MachineName, // We'll get this from heartbeat - IpAddress = IPAddress.Any, // We'll get this from heartbeat - Port = _port, + IpAddress = remoteEndpoint.Address, // Use the actual sender IP + Port = remoteEndpoint.Port, Role = NodeRole.Follower, State = NodeState.Running, LastSeen = DateTime.UtcNow, @@ -830,13 +870,32 @@ public class P2PNode : IDisposable }; _knownNodes[nodeId] = newNode; + + // Update cluster state + lock (_clusterLock) + { + ClusterState.Nodes.Add(newNode); + ClusterState.LastUpdated = DateTime.UtcNow; + } + NodeJoined?.Invoke(this, newNode); - _logger?.LogInformation("Discovered new node {NodeId}", nodeId); + _logger?.LogInformation("Discovered new node {NodeId} at {IpAddress}:{Port}", nodeId, remoteEndpoint.Address, remoteEndpoint.Port); } else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId)) { // Update last seen for existing node _knownNodes[nodeId].LastSeen = DateTime.UtcNow; + + // Update cluster state + lock (_clusterLock) + { + var existingNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId); + if (existingNode != null) + { + existingNode.LastSeen = DateTime.UtcNow; + ClusterState.LastUpdated = DateTime.UtcNow; + } + } } }