Compare commits

..

2 Commits

Author SHA1 Message Date
602af77291 P2P fixes 2025-08-31 14:20:31 -04:00
8a26ddcb3a Typo fix 2025-08-31 14:14:17 -04:00
2 changed files with 70 additions and 11 deletions

View File

@@ -544,7 +544,7 @@ public class P2PConsole
System.Console.WriteLine($"{"Node ID",-20} {"IP Address",-15} {"Port",-8} {"Role",-10} {"Last Seen"}"); System.Console.WriteLine($"{"Node ID",-20} {"IP Address",-15} {"Port",-8} {"Role",-10} {"Last Seen"}");
System.Console.WriteLine(new string('-', 80)); System.Console.WriteLine(new string('-', 80));
foreach (var node in cluster.Nodes.Values) foreach (var node in cluster.Nodes)
{ {
var lastSeen = node.LastSeen.ToString("HH:mm:ss"); var lastSeen = node.LastSeen.ToString("HH:mm:ss");
System.Console.WriteLine($"{node.NodeId,-20} {node.IpAddress,-15} {node.Port,-8} {node.Role,-10} {lastSeen}"); System.Console.WriteLine($"{node.NodeId,-20} {node.IpAddress,-15} {node.Port,-8} {node.Role,-10} {lastSeen}");

View File

@@ -651,6 +651,18 @@ public class P2PNode : IDisposable
foreach (var node in staleNodes) foreach (var node in staleNodes)
{ {
_knownNodes.Remove(node.NodeId); _knownNodes.Remove(node.NodeId);
// Update cluster state
lock (_clusterLock)
{
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == node.NodeId);
if (clusterNode != null)
{
ClusterState.Nodes.Remove(clusterNode);
ClusterState.LastUpdated = DateTime.UtcNow;
}
}
NodeLeft?.Invoke(this, node); NodeLeft?.Invoke(this, node);
_logger?.LogInformation("Removed stale node {NodeId}", node.NodeId); _logger?.LogInformation("Removed stale node {NodeId}", node.NodeId);
} }
@@ -711,10 +723,10 @@ public class P2PNode : IDisposable
switch (messageType) switch (messageType)
{ {
case "heartbeat": case "heartbeat":
await ProcessHeartbeatAsync(data); await ProcessHeartbeatAsync(data, remoteEndpoint);
break; break;
case "discovery": case "discovery":
await ProcessDiscoveryAsync(data); await ProcessDiscoveryAsync(data, remoteEndpoint);
break; break;
default: default:
_logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType); _logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType);
@@ -761,10 +773,11 @@ public class P2PNode : IDisposable
switch (messageType) switch (messageType)
{ {
case "heartbeat": case "heartbeat":
await ProcessHeartbeatAsync(data); await ProcessHeartbeatAsync(data, null); // No remote endpoint for TCP
break; break;
case "discovery": case "discovery":
await ProcessDiscoveryAsync(data); // Discovery messages are typically UDP, but handle gracefully
_logger?.LogWarning("Received discovery message via TCP - unexpected");
break; break;
case "election_request": case "election_request":
await ProcessElectionRequestAsync(data, writer); await ProcessElectionRequestAsync(data, writer);
@@ -783,7 +796,7 @@ public class P2PNode : IDisposable
} }
} }
private async Task ProcessHeartbeatAsync(JsonElement data) private async Task ProcessHeartbeatAsync(JsonElement data, IPEndPoint? remoteEndpoint = null)
{ {
var nodeId = data.GetProperty("nodeId").GetString()!; var nodeId = data.GetProperty("nodeId").GetString()!;
var role = Enum.Parse<NodeRole>(data.GetProperty("role").GetString()!); var role = Enum.Parse<NodeRole>(data.GetProperty("role").GetString()!);
@@ -803,15 +816,42 @@ public class P2PNode : IDisposable
_lastHeartbeat = timestamp; _lastHeartbeat = timestamp;
} }
// Update node info // Update node info in known nodes
if (_knownNodes.TryGetValue(nodeId, out var node)) if (_knownNodes.TryGetValue(nodeId, out var node))
{ {
node.LastSeen = DateTime.UtcNow; node.LastSeen = DateTime.UtcNow;
node.Role = role; node.Role = role;
// Update IP address if we received it from UDP
if (remoteEndpoint != null && node.IpAddress == IPAddress.Any)
{
node.IpAddress = remoteEndpoint.Address;
node.Port = remoteEndpoint.Port;
}
}
// Update cluster state
lock (_clusterLock)
{
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
if (clusterNode != null)
{
clusterNode.LastSeen = DateTime.UtcNow;
clusterNode.Role = role;
// Update IP address if we received it from UDP
if (remoteEndpoint != null && clusterNode.IpAddress == IPAddress.Any)
{
clusterNode.IpAddress = remoteEndpoint.Address;
clusterNode.Port = remoteEndpoint.Port;
}
ClusterState.LastUpdated = DateTime.UtcNow;
}
} }
} }
private async Task ProcessDiscoveryAsync(JsonElement data) private async Task ProcessDiscoveryAsync(JsonElement data, IPEndPoint remoteEndpoint)
{ {
var nodeId = data.GetProperty("nodeId").GetString()!; var nodeId = data.GetProperty("nodeId").GetString()!;
@@ -821,8 +861,8 @@ public class P2PNode : IDisposable
{ {
NodeId = nodeId, NodeId = nodeId,
Hostname = Environment.MachineName, // We'll get this from heartbeat Hostname = Environment.MachineName, // We'll get this from heartbeat
IpAddress = IPAddress.Any, // We'll get this from heartbeat IpAddress = remoteEndpoint.Address, // Use the actual sender IP
Port = _port, Port = remoteEndpoint.Port,
Role = NodeRole.Follower, Role = NodeRole.Follower,
State = NodeState.Running, State = NodeState.Running,
LastSeen = DateTime.UtcNow, LastSeen = DateTime.UtcNow,
@@ -830,13 +870,32 @@ public class P2PNode : IDisposable
}; };
_knownNodes[nodeId] = newNode; _knownNodes[nodeId] = newNode;
// Update cluster state
lock (_clusterLock)
{
ClusterState.Nodes.Add(newNode);
ClusterState.LastUpdated = DateTime.UtcNow;
}
NodeJoined?.Invoke(this, newNode); NodeJoined?.Invoke(this, newNode);
_logger?.LogInformation("Discovered new node {NodeId}", nodeId); _logger?.LogInformation("Discovered new node {NodeId} at {IpAddress}:{Port}", nodeId, remoteEndpoint.Address, remoteEndpoint.Port);
} }
else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId)) else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId))
{ {
// Update last seen for existing node // Update last seen for existing node
_knownNodes[nodeId].LastSeen = DateTime.UtcNow; _knownNodes[nodeId].LastSeen = DateTime.UtcNow;
// Update cluster state
lock (_clusterLock)
{
var existingNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
if (existingNode != null)
{
existingNode.LastSeen = DateTime.UtcNow;
ClusterState.LastUpdated = DateTime.UtcNow;
}
}
} }
} }