Fix P2P discovery and networking
This commit is contained in:
@@ -273,15 +273,23 @@ public class P2PNode : IDisposable
|
||||
private async Task InitializeNetworkAsync()
|
||||
{
|
||||
// Initialize UDP client for discovery and heartbeats
|
||||
_udpClient = new UdpClient(_port);
|
||||
_udpClient = new UdpClient();
|
||||
_udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
||||
_udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast, true);
|
||||
_udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, _port));
|
||||
|
||||
// Initialize TCP listener for direct communication
|
||||
_tcpListener = new TcpListener(IPAddress.Any, _port);
|
||||
// Initialize TCP listener for direct communication on a different port
|
||||
var tcpPort = _port + 1; // Use next port for TCP
|
||||
_tcpListener = new TcpListener(IPAddress.Any, tcpPort);
|
||||
_tcpListener.Start();
|
||||
|
||||
_logger?.LogInformation("Network initialized - UDP: {UdpPort}, TCP: {TcpPort}", _port, tcpPort);
|
||||
|
||||
// Start listening for incoming connections
|
||||
_ = Task.Run(() => ListenForConnectionsAsync(_cancellationTokenSource!.Token));
|
||||
|
||||
// Start listening for UDP messages
|
||||
_ = Task.Run(() => ListenForUdpMessagesAsync(_cancellationTokenSource!.Token));
|
||||
}
|
||||
|
||||
private async Task HeartbeatLoopAsync(CancellationToken cancellationToken)
|
||||
@@ -389,8 +397,29 @@ public class P2PNode : IDisposable
|
||||
});
|
||||
|
||||
var data = System.Text.Encoding.UTF8.GetBytes(discoveryMessage);
|
||||
await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(IPAddress.Broadcast, _port));
|
||||
|
||||
// Try multiple broadcast addresses
|
||||
var broadcastAddresses = new[]
|
||||
{
|
||||
IPAddress.Broadcast,
|
||||
IPAddress.Parse("255.255.255.255"),
|
||||
GetLocalBroadcastAddress()
|
||||
};
|
||||
|
||||
foreach (var broadcastAddr in broadcastAddresses)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(broadcastAddr, _port));
|
||||
_logger?.LogDebug("Sent discovery message to {BroadcastAddress}", broadcastAddr);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogDebug(ex, "Failed to send discovery to {BroadcastAddress}", broadcastAddr);
|
||||
}
|
||||
}
|
||||
|
||||
_logger?.LogInformation("Discovery cycle completed. Known nodes: {NodeCount}", _knownNodes.Count);
|
||||
await Task.Delay(30000, cancellationToken); // Send discovery every 30 seconds
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
@@ -579,6 +608,7 @@ public class P2PNode : IDisposable
|
||||
var message = JsonSerializer.Serialize(heartbeat);
|
||||
var data = System.Text.Encoding.UTF8.GetBytes(message);
|
||||
|
||||
// Send to known nodes
|
||||
foreach (var node in _knownNodes.Values)
|
||||
{
|
||||
try
|
||||
@@ -590,6 +620,27 @@ public class P2PNode : IDisposable
|
||||
_logger?.LogWarning(ex, "Failed to send heartbeat to node {NodeId}", node.NodeId);
|
||||
}
|
||||
}
|
||||
|
||||
// Also broadcast to discover new nodes
|
||||
var broadcastAddresses = new[]
|
||||
{
|
||||
IPAddress.Broadcast,
|
||||
IPAddress.Parse("255.255.255.255"),
|
||||
GetLocalBroadcastAddress()
|
||||
};
|
||||
|
||||
foreach (var broadcastAddr in broadcastAddresses)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(broadcastAddr, _port));
|
||||
_logger?.LogDebug("Broadcasted heartbeat to {BroadcastAddress}", broadcastAddr);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogDebug(ex, "Failed to broadcast heartbeat to {BroadcastAddress}", broadcastAddr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CleanupStaleNodesAsync()
|
||||
@@ -625,6 +676,57 @@ public class P2PNode : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ListenForUdpMessagesAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = await _udpClient!.ReceiveAsync(cancellationToken);
|
||||
var message = System.Text.Encoding.UTF8.GetString(result.Buffer);
|
||||
|
||||
_logger?.LogDebug("Received UDP message from {Endpoint}: {Message}",
|
||||
result.RemoteEndPoint, message);
|
||||
|
||||
await ProcessUdpMessageAsync(message, result.RemoteEndPoint);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error receiving UDP message");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessUdpMessageAsync(string message, IPEndPoint remoteEndpoint)
|
||||
{
|
||||
try
|
||||
{
|
||||
var data = JsonSerializer.Deserialize<JsonElement>(message);
|
||||
var messageType = data.GetProperty("type").GetString();
|
||||
|
||||
switch (messageType)
|
||||
{
|
||||
case "heartbeat":
|
||||
await ProcessHeartbeatAsync(data);
|
||||
break;
|
||||
case "discovery":
|
||||
await ProcessDiscoveryAsync(data);
|
||||
break;
|
||||
default:
|
||||
_logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error processing UDP message: {Message}", message);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleClientAsync(TcpClient client, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
@@ -718,13 +820,24 @@ public class P2PNode : IDisposable
|
||||
var newNode = new NodeInfo
|
||||
{
|
||||
NodeId = nodeId,
|
||||
LastSeen = DateTime.UtcNow
|
||||
Hostname = Environment.MachineName, // We'll get this from heartbeat
|
||||
IpAddress = IPAddress.Any, // We'll get this from heartbeat
|
||||
Port = _port,
|
||||
Role = NodeRole.Follower,
|
||||
State = NodeState.Running,
|
||||
LastSeen = DateTime.UtcNow,
|
||||
SystemInfo = new SystemInfo() // We'll get this from heartbeat
|
||||
};
|
||||
|
||||
_knownNodes[nodeId] = newNode;
|
||||
NodeJoined?.Invoke(this, newNode);
|
||||
_logger?.LogInformation("Discovered new node {NodeId}", nodeId);
|
||||
}
|
||||
else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId))
|
||||
{
|
||||
// Update last seen for existing node
|
||||
_knownNodes[nodeId].LastSeen = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessElectionRequestAsync(JsonElement data, StreamWriter writer)
|
||||
@@ -808,6 +921,29 @@ public class P2PNode : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
private IPAddress GetLocalBroadcastAddress()
|
||||
{
|
||||
try
|
||||
{
|
||||
var localIp = GetLocalIpAddress();
|
||||
if (localIp != null && localIp != IPAddress.Any)
|
||||
{
|
||||
var ipBytes = localIp.GetAddressBytes();
|
||||
// Set all host bits to 1 for broadcast
|
||||
for (int i = 3; i >= 0; i--)
|
||||
{
|
||||
ipBytes[i] = 255;
|
||||
}
|
||||
return new IPAddress(ipBytes);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to get local broadcast address");
|
||||
}
|
||||
return IPAddress.Broadcast;
|
||||
}
|
||||
|
||||
private SystemInfo GetSystemInfo()
|
||||
{
|
||||
return new SystemInfo
|
||||
|
Reference in New Issue
Block a user