P2P fixes
This commit is contained in:
@@ -651,6 +651,18 @@ public class P2PNode : IDisposable
|
|||||||
foreach (var node in staleNodes)
|
foreach (var node in staleNodes)
|
||||||
{
|
{
|
||||||
_knownNodes.Remove(node.NodeId);
|
_knownNodes.Remove(node.NodeId);
|
||||||
|
|
||||||
|
// Update cluster state
|
||||||
|
lock (_clusterLock)
|
||||||
|
{
|
||||||
|
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == node.NodeId);
|
||||||
|
if (clusterNode != null)
|
||||||
|
{
|
||||||
|
ClusterState.Nodes.Remove(clusterNode);
|
||||||
|
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
NodeLeft?.Invoke(this, node);
|
NodeLeft?.Invoke(this, node);
|
||||||
_logger?.LogInformation("Removed stale node {NodeId}", node.NodeId);
|
_logger?.LogInformation("Removed stale node {NodeId}", node.NodeId);
|
||||||
}
|
}
|
||||||
@@ -711,10 +723,10 @@ public class P2PNode : IDisposable
|
|||||||
switch (messageType)
|
switch (messageType)
|
||||||
{
|
{
|
||||||
case "heartbeat":
|
case "heartbeat":
|
||||||
await ProcessHeartbeatAsync(data);
|
await ProcessHeartbeatAsync(data, remoteEndpoint);
|
||||||
break;
|
break;
|
||||||
case "discovery":
|
case "discovery":
|
||||||
await ProcessDiscoveryAsync(data);
|
await ProcessDiscoveryAsync(data, remoteEndpoint);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
_logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType);
|
_logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType);
|
||||||
@@ -761,10 +773,11 @@ public class P2PNode : IDisposable
|
|||||||
switch (messageType)
|
switch (messageType)
|
||||||
{
|
{
|
||||||
case "heartbeat":
|
case "heartbeat":
|
||||||
await ProcessHeartbeatAsync(data);
|
await ProcessHeartbeatAsync(data, null); // No remote endpoint for TCP
|
||||||
break;
|
break;
|
||||||
case "discovery":
|
case "discovery":
|
||||||
await ProcessDiscoveryAsync(data);
|
// Discovery messages are typically UDP, but handle gracefully
|
||||||
|
_logger?.LogWarning("Received discovery message via TCP - unexpected");
|
||||||
break;
|
break;
|
||||||
case "election_request":
|
case "election_request":
|
||||||
await ProcessElectionRequestAsync(data, writer);
|
await ProcessElectionRequestAsync(data, writer);
|
||||||
@@ -783,7 +796,7 @@ public class P2PNode : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task ProcessHeartbeatAsync(JsonElement data)
|
private async Task ProcessHeartbeatAsync(JsonElement data, IPEndPoint? remoteEndpoint = null)
|
||||||
{
|
{
|
||||||
var nodeId = data.GetProperty("nodeId").GetString()!;
|
var nodeId = data.GetProperty("nodeId").GetString()!;
|
||||||
var role = Enum.Parse<NodeRole>(data.GetProperty("role").GetString()!);
|
var role = Enum.Parse<NodeRole>(data.GetProperty("role").GetString()!);
|
||||||
@@ -803,15 +816,42 @@ public class P2PNode : IDisposable
|
|||||||
_lastHeartbeat = timestamp;
|
_lastHeartbeat = timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update node info
|
// Update node info in known nodes
|
||||||
if (_knownNodes.TryGetValue(nodeId, out var node))
|
if (_knownNodes.TryGetValue(nodeId, out var node))
|
||||||
{
|
{
|
||||||
node.LastSeen = DateTime.UtcNow;
|
node.LastSeen = DateTime.UtcNow;
|
||||||
node.Role = role;
|
node.Role = role;
|
||||||
|
|
||||||
|
// Update IP address if we received it from UDP
|
||||||
|
if (remoteEndpoint != null && node.IpAddress == IPAddress.Any)
|
||||||
|
{
|
||||||
|
node.IpAddress = remoteEndpoint.Address;
|
||||||
|
node.Port = remoteEndpoint.Port;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update cluster state
|
||||||
|
lock (_clusterLock)
|
||||||
|
{
|
||||||
|
var clusterNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
|
||||||
|
if (clusterNode != null)
|
||||||
|
{
|
||||||
|
clusterNode.LastSeen = DateTime.UtcNow;
|
||||||
|
clusterNode.Role = role;
|
||||||
|
|
||||||
|
// Update IP address if we received it from UDP
|
||||||
|
if (remoteEndpoint != null && clusterNode.IpAddress == IPAddress.Any)
|
||||||
|
{
|
||||||
|
clusterNode.IpAddress = remoteEndpoint.Address;
|
||||||
|
clusterNode.Port = remoteEndpoint.Port;
|
||||||
|
}
|
||||||
|
|
||||||
|
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task ProcessDiscoveryAsync(JsonElement data)
|
private async Task ProcessDiscoveryAsync(JsonElement data, IPEndPoint remoteEndpoint)
|
||||||
{
|
{
|
||||||
var nodeId = data.GetProperty("nodeId").GetString()!;
|
var nodeId = data.GetProperty("nodeId").GetString()!;
|
||||||
|
|
||||||
@@ -821,8 +861,8 @@ public class P2PNode : IDisposable
|
|||||||
{
|
{
|
||||||
NodeId = nodeId,
|
NodeId = nodeId,
|
||||||
Hostname = Environment.MachineName, // We'll get this from heartbeat
|
Hostname = Environment.MachineName, // We'll get this from heartbeat
|
||||||
IpAddress = IPAddress.Any, // We'll get this from heartbeat
|
IpAddress = remoteEndpoint.Address, // Use the actual sender IP
|
||||||
Port = _port,
|
Port = remoteEndpoint.Port,
|
||||||
Role = NodeRole.Follower,
|
Role = NodeRole.Follower,
|
||||||
State = NodeState.Running,
|
State = NodeState.Running,
|
||||||
LastSeen = DateTime.UtcNow,
|
LastSeen = DateTime.UtcNow,
|
||||||
@@ -830,13 +870,32 @@ public class P2PNode : IDisposable
|
|||||||
};
|
};
|
||||||
|
|
||||||
_knownNodes[nodeId] = newNode;
|
_knownNodes[nodeId] = newNode;
|
||||||
|
|
||||||
|
// Update cluster state
|
||||||
|
lock (_clusterLock)
|
||||||
|
{
|
||||||
|
ClusterState.Nodes.Add(newNode);
|
||||||
|
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
|
||||||
NodeJoined?.Invoke(this, newNode);
|
NodeJoined?.Invoke(this, newNode);
|
||||||
_logger?.LogInformation("Discovered new node {NodeId}", nodeId);
|
_logger?.LogInformation("Discovered new node {NodeId} at {IpAddress}:{Port}", nodeId, remoteEndpoint.Address, remoteEndpoint.Port);
|
||||||
}
|
}
|
||||||
else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId))
|
else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId))
|
||||||
{
|
{
|
||||||
// Update last seen for existing node
|
// Update last seen for existing node
|
||||||
_knownNodes[nodeId].LastSeen = DateTime.UtcNow;
|
_knownNodes[nodeId].LastSeen = DateTime.UtcNow;
|
||||||
|
|
||||||
|
// Update cluster state
|
||||||
|
lock (_clusterLock)
|
||||||
|
{
|
||||||
|
var existingNode = ClusterState.Nodes.FirstOrDefault(n => n.NodeId == nodeId);
|
||||||
|
if (existingNode != null)
|
||||||
|
{
|
||||||
|
existingNode.LastSeen = DateTime.UtcNow;
|
||||||
|
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user