Compare commits
2 Commits
b5d78b2bd9
...
master
Author | SHA1 | Date | |
---|---|---|---|
602af77291 | |||
8a26ddcb3a |
@@ -544,7 +544,7 @@ public class P2PConsole
|
||||
System.Console.WriteLine($"{"Node ID",-20} {"IP Address",-15} {"Port",-8} {"Role",-10} {"Last Seen"}");
|
||||
System.Console.WriteLine(new string('-', 80));
|
||||
|
||||
foreach (var node in cluster.Nodes.Values)
|
||||
foreach (var node in cluster.Nodes)
|
||||
{
|
||||
var lastSeen = node.LastSeen.ToString("HH:mm:ss");
|
||||
System.Console.WriteLine($"{node.NodeId,-20} {node.IpAddress,-15} {node.Port,-8} {node.Role,-10} {lastSeen}");
|
||||
|
@@ -651,6 +651,18 @@ public class P2PNode : IDisposable
|
||||
foreach (var node in staleNodes)
|
||||
{
|
||||
_knownNodes.Remove(node.NodeId);
|
||||
|
||||
// Update cluster state
|
||||
lock (_clusterLock)
|
||||
{
|
||||
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == node.NodeId);
|
||||
if (clusterNode != null)
|
||||
{
|
||||
ClusterState.Nodes.Remove(clusterNode);
|
||||
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
|
||||
NodeLeft?.Invoke(this, node);
|
||||
_logger?.LogInformation("Removed stale node {NodeId}", node.NodeId);
|
||||
}
|
||||
@@ -711,10 +723,10 @@ public class P2PNode : IDisposable
|
||||
switch (messageType)
|
||||
{
|
||||
case "heartbeat":
|
||||
await ProcessHeartbeatAsync(data);
|
||||
await ProcessHeartbeatAsync(data, remoteEndpoint);
|
||||
break;
|
||||
case "discovery":
|
||||
await ProcessDiscoveryAsync(data);
|
||||
await ProcessDiscoveryAsync(data, remoteEndpoint);
|
||||
break;
|
||||
default:
|
||||
_logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType);
|
||||
@@ -761,10 +773,11 @@ public class P2PNode : IDisposable
|
||||
switch (messageType)
|
||||
{
|
||||
case "heartbeat":
|
||||
await ProcessHeartbeatAsync(data);
|
||||
await ProcessHeartbeatAsync(data, null); // No remote endpoint for TCP
|
||||
break;
|
||||
case "discovery":
|
||||
await ProcessDiscoveryAsync(data);
|
||||
// Discovery messages are typically UDP, but handle gracefully
|
||||
_logger?.LogWarning("Received discovery message via TCP - unexpected");
|
||||
break;
|
||||
case "election_request":
|
||||
await ProcessElectionRequestAsync(data, writer);
|
||||
@@ -783,7 +796,7 @@ public class P2PNode : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessHeartbeatAsync(JsonElement data)
|
||||
private async Task ProcessHeartbeatAsync(JsonElement data, IPEndPoint? remoteEndpoint = null)
|
||||
{
|
||||
var nodeId = data.GetProperty("nodeId").GetString()!;
|
||||
var role = Enum.Parse<NodeRole>(data.GetProperty("role").GetString()!);
|
||||
@@ -803,15 +816,42 @@ public class P2PNode : IDisposable
|
||||
_lastHeartbeat = timestamp;
|
||||
}
|
||||
|
||||
// Update node info
|
||||
// Update node info in known nodes
|
||||
if (_knownNodes.TryGetValue(nodeId, out var node))
|
||||
{
|
||||
node.LastSeen = DateTime.UtcNow;
|
||||
node.Role = role;
|
||||
|
||||
// Update IP address if we received it from UDP
|
||||
if (remoteEndpoint != null && node.IpAddress == IPAddress.Any)
|
||||
{
|
||||
node.IpAddress = remoteEndpoint.Address;
|
||||
node.Port = remoteEndpoint.Port;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessDiscoveryAsync(JsonElement data)
|
||||
// Update cluster state
|
||||
lock (_clusterLock)
|
||||
{
|
||||
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
|
||||
if (clusterNode != null)
|
||||
{
|
||||
clusterNode.LastSeen = DateTime.UtcNow;
|
||||
clusterNode.Role = role;
|
||||
|
||||
// Update IP address if we received it from UDP
|
||||
if (remoteEndpoint != null && clusterNode.IpAddress == IPAddress.Any)
|
||||
{
|
||||
clusterNode.IpAddress = remoteEndpoint.Address;
|
||||
clusterNode.Port = remoteEndpoint.Port;
|
||||
}
|
||||
|
||||
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessDiscoveryAsync(JsonElement data, IPEndPoint remoteEndpoint)
|
||||
{
|
||||
var nodeId = data.GetProperty("nodeId").GetString()!;
|
||||
|
||||
@@ -821,8 +861,8 @@ public class P2PNode : IDisposable
|
||||
{
|
||||
NodeId = nodeId,
|
||||
Hostname = Environment.MachineName, // We'll get this from heartbeat
|
||||
IpAddress = IPAddress.Any, // We'll get this from heartbeat
|
||||
Port = _port,
|
||||
IpAddress = remoteEndpoint.Address, // Use the actual sender IP
|
||||
Port = remoteEndpoint.Port,
|
||||
Role = NodeRole.Follower,
|
||||
State = NodeState.Running,
|
||||
LastSeen = DateTime.UtcNow,
|
||||
@@ -830,13 +870,32 @@ public class P2PNode : IDisposable
|
||||
};
|
||||
|
||||
_knownNodes[nodeId] = newNode;
|
||||
|
||||
// Update cluster state
|
||||
lock (_clusterLock)
|
||||
{
|
||||
ClusterState.Nodes.Add(newNode);
|
||||
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
NodeJoined?.Invoke(this, newNode);
|
||||
_logger?.LogInformation("Discovered new node {NodeId}", nodeId);
|
||||
_logger?.LogInformation("Discovered new node {NodeId} at {IpAddress}:{Port}", nodeId, remoteEndpoint.Address, remoteEndpoint.Port);
|
||||
}
|
||||
else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId))
|
||||
{
|
||||
// Update last seen for existing node
|
||||
_knownNodes[nodeId].LastSeen = DateTime.UtcNow;
|
||||
|
||||
// Update cluster state
|
||||
lock (_clusterLock)
|
||||
{
|
||||
var existingNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
|
||||
if (existingNode != null)
|
||||
{
|
||||
existingNode.LastSeen = DateTime.UtcNow;
|
||||
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user