Compare commits

...

2 Commits

Author SHA1 Message Date
602af77291 P2P fixes 2025-08-31 14:20:31 -04:00
8a26ddcb3a Typo fix 2025-08-31 14:14:17 -04:00
2 changed files with 70 additions and 11 deletions

View File

@@ -544,7 +544,7 @@ public class P2PConsole
System.Console.WriteLine($"{"Node ID",-20} {"IP Address",-15} {"Port",-8} {"Role",-10} {"Last Seen"}");
System.Console.WriteLine(new string('-', 80));
foreach (var node in cluster.Nodes.Values)
foreach (var node in cluster.Nodes)
{
var lastSeen = node.LastSeen.ToString("HH:mm:ss");
System.Console.WriteLine($"{node.NodeId,-20} {node.IpAddress,-15} {node.Port,-8} {node.Role,-10} {lastSeen}");

View File

@@ -651,6 +651,18 @@ public class P2PNode : IDisposable
foreach (var node in staleNodes)
{
_knownNodes.Remove(node.NodeId);
// Update cluster state
lock (_clusterLock)
{
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == node.NodeId);
if (clusterNode != null)
{
ClusterState.Nodes.Remove(clusterNode);
ClusterState.LastUpdated = DateTime.UtcNow;
}
}
NodeLeft?.Invoke(this, node);
_logger?.LogInformation("Removed stale node {NodeId}", node.NodeId);
}
@@ -711,10 +723,10 @@ public class P2PNode : IDisposable
switch (messageType)
{
case "heartbeat":
await ProcessHeartbeatAsync(data);
await ProcessHeartbeatAsync(data, remoteEndpoint);
break;
case "discovery":
await ProcessDiscoveryAsync(data);
await ProcessDiscoveryAsync(data, remoteEndpoint);
break;
default:
_logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType);
@@ -761,10 +773,11 @@ public class P2PNode : IDisposable
switch (messageType)
{
case "heartbeat":
await ProcessHeartbeatAsync(data);
await ProcessHeartbeatAsync(data, null); // No remote endpoint for TCP
break;
case "discovery":
await ProcessDiscoveryAsync(data);
// Discovery messages are typically UDP, but handle gracefully
_logger?.LogWarning("Received discovery message via TCP - unexpected");
break;
case "election_request":
await ProcessElectionRequestAsync(data, writer);
@@ -783,7 +796,7 @@ public class P2PNode : IDisposable
}
}
private async Task ProcessHeartbeatAsync(JsonElement data)
private async Task ProcessHeartbeatAsync(JsonElement data, IPEndPoint? remoteEndpoint = null)
{
var nodeId = data.GetProperty("nodeId").GetString()!;
var role = Enum.Parse<NodeRole>(data.GetProperty("role").GetString()!);
@@ -803,15 +816,42 @@ public class P2PNode : IDisposable
_lastHeartbeat = timestamp;
}
// Update node info
// Update node info in known nodes
if (_knownNodes.TryGetValue(nodeId, out var node))
{
node.LastSeen = DateTime.UtcNow;
node.Role = role;
// Update IP address if we received it from UDP
if (remoteEndpoint != null && node.IpAddress == IPAddress.Any)
{
node.IpAddress = remoteEndpoint.Address;
node.Port = remoteEndpoint.Port;
}
}
// Update cluster state
lock (_clusterLock)
{
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
if (clusterNode != null)
{
clusterNode.LastSeen = DateTime.UtcNow;
clusterNode.Role = role;
// Update IP address if we received it from UDP
if (remoteEndpoint != null && clusterNode.IpAddress == IPAddress.Any)
{
clusterNode.IpAddress = remoteEndpoint.Address;
clusterNode.Port = remoteEndpoint.Port;
}
ClusterState.LastUpdated = DateTime.UtcNow;
}
}
}
private async Task ProcessDiscoveryAsync(JsonElement data)
private async Task ProcessDiscoveryAsync(JsonElement data, IPEndPoint remoteEndpoint)
{
var nodeId = data.GetProperty("nodeId").GetString()!;
@@ -821,8 +861,8 @@ public class P2PNode : IDisposable
{
NodeId = nodeId,
Hostname = Environment.MachineName, // We'll get this from heartbeat
IpAddress = IPAddress.Any, // We'll get this from heartbeat
Port = _port,
IpAddress = remoteEndpoint.Address, // Use the actual sender IP
Port = remoteEndpoint.Port,
Role = NodeRole.Follower,
State = NodeState.Running,
LastSeen = DateTime.UtcNow,
@@ -830,13 +870,32 @@ public class P2PNode : IDisposable
};
_knownNodes[nodeId] = newNode;
// Update cluster state
lock (_clusterLock)
{
ClusterState.Nodes.Add(newNode);
ClusterState.LastUpdated = DateTime.UtcNow;
}
NodeJoined?.Invoke(this, newNode);
_logger?.LogInformation("Discovered new node {NodeId}", nodeId);
_logger?.LogInformation("Discovered new node {NodeId} at {IpAddress}:{Port}", nodeId, remoteEndpoint.Address, remoteEndpoint.Port);
}
else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId))
{
// Update last seen for existing node
_knownNodes[nodeId].LastSeen = DateTime.UtcNow;
// Update cluster state
lock (_clusterLock)
{
var existingNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
if (existingNode != null)
{
existingNode.LastSeen = DateTime.UtcNow;
ClusterState.LastUpdated = DateTime.UtcNow;
}
}
}
}