diff --git a/QemuVmManager.Console/P2PConsole.cs b/QemuVmManager.Console/P2PConsole.cs index c3ae440..d48cae5 100644 --- a/QemuVmManager.Console/P2PConsole.cs +++ b/QemuVmManager.Console/P2PConsole.cs @@ -109,6 +109,9 @@ public class P2PConsole case "upnp": await ShowUPnPStatusAsync(); break; + case "discovery": + await ShowDiscoveryInfoAsync(); + break; case "exit": case "quit": System.Console.WriteLine("Stopping P2P node..."); @@ -144,6 +147,7 @@ public class P2PConsole System.Console.WriteLine(" migrate - Migrate VM to different node"); System.Console.WriteLine(" forward - Forward port for VM (master only)"); System.Console.WriteLine(" upnp - Show UPnP status"); + System.Console.WriteLine(" discovery - Show network discovery information"); System.Console.WriteLine(" help - Show this help"); System.Console.WriteLine(" exit/quit - Exit the application"); } @@ -518,4 +522,51 @@ public class P2PConsole var input = System.Console.ReadLine()?.Trim(); return string.IsNullOrEmpty(input) ? defaultValue : input; } + + private async Task ShowDiscoveryInfoAsync() + { + System.Console.WriteLine("=== Network Discovery Information ==="); + System.Console.WriteLine($"Current Node ID: {_p2pNode.CurrentNode.NodeId}"); + System.Console.WriteLine($"Current Node IP: {_p2pNode.CurrentNode.IpAddress}"); + System.Console.WriteLine($"Current Node Port: {_p2pNode.CurrentNode.Port}"); + System.Console.WriteLine($"Current Role: {_p2pNode.CurrentNode.Role}"); + System.Console.WriteLine($"Is Master: {_p2pNode.IsMaster}"); + System.Console.WriteLine(); + + // Get cluster state to show known nodes + var cluster = _p2pNode.ClusterState; + System.Console.WriteLine($"Known Nodes: {cluster.Nodes.Count}"); + + if (cluster.Nodes.Count > 0) + { + System.Console.WriteLine(); + System.Console.WriteLine("=== Known Nodes ==="); + System.Console.WriteLine($"{"Node ID",-20} {"IP Address",-15} {"Port",-8} {"Role",-10} {"Last Seen"}"); + System.Console.WriteLine(new string('-', 80)); + + foreach (var node in cluster.Nodes.Values) + { + var lastSeen = node.LastSeen.ToString("HH:mm:ss"); + System.Console.WriteLine($"{node.NodeId,-20} {node.IpAddress,-15} {node.Port,-8} {node.Role,-10} {lastSeen}"); + } + } + else + { + System.Console.WriteLine("No other nodes discovered yet."); + System.Console.WriteLine(); + System.Console.WriteLine("Discovery troubleshooting:"); + System.Console.WriteLine("1. Make sure other nodes are running on the same network"); + System.Console.WriteLine("2. Check if firewall is blocking UDP port 8080"); + System.Console.WriteLine("3. Try running multiple instances on different machines"); + System.Console.WriteLine("4. Check network connectivity between nodes"); + } + + System.Console.WriteLine(); + System.Console.WriteLine("=== Network Configuration ==="); + System.Console.WriteLine("UDP Discovery: Enabled (port 8080)"); + System.Console.WriteLine("TCP Communication: Enabled (port 8081)"); + System.Console.WriteLine("Heartbeat Interval: 5 seconds"); + System.Console.WriteLine("Discovery Interval: 30 seconds"); + System.Console.WriteLine("Node Timeout: 30 seconds"); + } } diff --git a/QemuVmManager.Core/P2PNode.cs b/QemuVmManager.Core/P2PNode.cs index b15b9eb..363cfb7 100644 --- a/QemuVmManager.Core/P2PNode.cs +++ b/QemuVmManager.Core/P2PNode.cs @@ -273,15 +273,23 @@ public class P2PNode : IDisposable private async Task InitializeNetworkAsync() { // Initialize UDP client for discovery and heartbeats - _udpClient = new UdpClient(_port); + _udpClient = new UdpClient(); _udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast, true); + _udpClient.Client.Bind(new IPEndPoint(IPAddress.Any, _port)); - // Initialize TCP listener for direct communication - _tcpListener = new TcpListener(IPAddress.Any, _port); + // Initialize TCP listener for direct communication on a different port + var tcpPort = _port + 1; // Use next port for TCP + _tcpListener = new TcpListener(IPAddress.Any, tcpPort); _tcpListener.Start(); + _logger?.LogInformation("Network initialized - UDP: {UdpPort}, TCP: {TcpPort}", _port, tcpPort); + // Start listening for incoming connections _ = Task.Run(() => ListenForConnectionsAsync(_cancellationTokenSource!.Token)); + + // Start listening for UDP messages + _ = Task.Run(() => ListenForUdpMessagesAsync(_cancellationTokenSource!.Token)); } private async Task HeartbeatLoopAsync(CancellationToken cancellationToken) @@ -389,8 +397,29 @@ public class P2PNode : IDisposable }); var data = System.Text.Encoding.UTF8.GetBytes(discoveryMessage); - await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(IPAddress.Broadcast, _port)); + // Try multiple broadcast addresses + var broadcastAddresses = new[] + { + IPAddress.Broadcast, + IPAddress.Parse("255.255.255.255"), + GetLocalBroadcastAddress() + }; + + foreach (var broadcastAddr in broadcastAddresses) + { + try + { + await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(broadcastAddr, _port)); + _logger?.LogDebug("Sent discovery message to {BroadcastAddress}", broadcastAddr); + } + catch (Exception ex) + { + _logger?.LogDebug(ex, "Failed to send discovery to {BroadcastAddress}", broadcastAddr); + } + } + + _logger?.LogInformation("Discovery cycle completed. Known nodes: {NodeCount}", _knownNodes.Count); await Task.Delay(30000, cancellationToken); // Send discovery every 30 seconds } catch (OperationCanceledException) @@ -579,6 +608,7 @@ public class P2PNode : IDisposable var message = JsonSerializer.Serialize(heartbeat); var data = System.Text.Encoding.UTF8.GetBytes(message); + // Send to known nodes foreach (var node in _knownNodes.Values) { try @@ -590,6 +620,27 @@ public class P2PNode : IDisposable _logger?.LogWarning(ex, "Failed to send heartbeat to node {NodeId}", node.NodeId); } } + + // Also broadcast to discover new nodes + var broadcastAddresses = new[] + { + IPAddress.Broadcast, + IPAddress.Parse("255.255.255.255"), + GetLocalBroadcastAddress() + }; + + foreach (var broadcastAddr in broadcastAddresses) + { + try + { + await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(broadcastAddr, _port)); + _logger?.LogDebug("Broadcasted heartbeat to {BroadcastAddress}", broadcastAddr); + } + catch (Exception ex) + { + _logger?.LogDebug(ex, "Failed to broadcast heartbeat to {BroadcastAddress}", broadcastAddr); + } + } } private async Task CleanupStaleNodesAsync() @@ -625,6 +676,57 @@ public class P2PNode : IDisposable } } + private async Task ListenForUdpMessagesAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + var result = await _udpClient!.ReceiveAsync(cancellationToken); + var message = System.Text.Encoding.UTF8.GetString(result.Buffer); + + _logger?.LogDebug("Received UDP message from {Endpoint}: {Message}", + result.RemoteEndPoint, message); + + await ProcessUdpMessageAsync(message, result.RemoteEndPoint); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + _logger?.LogError(ex, "Error receiving UDP message"); + } + } + } + + private async Task ProcessUdpMessageAsync(string message, IPEndPoint remoteEndpoint) + { + try + { + var data = JsonSerializer.Deserialize(message); + var messageType = data.GetProperty("type").GetString(); + + switch (messageType) + { + case "heartbeat": + await ProcessHeartbeatAsync(data); + break; + case "discovery": + await ProcessDiscoveryAsync(data); + break; + default: + _logger?.LogWarning("Unknown UDP message type: {MessageType}", messageType); + break; + } + } + catch (Exception ex) + { + _logger?.LogError(ex, "Error processing UDP message: {Message}", message); + } + } + private async Task HandleClientAsync(TcpClient client, CancellationToken cancellationToken) { try @@ -718,13 +820,24 @@ public class P2PNode : IDisposable var newNode = new NodeInfo { NodeId = nodeId, - LastSeen = DateTime.UtcNow + Hostname = Environment.MachineName, // We'll get this from heartbeat + IpAddress = IPAddress.Any, // We'll get this from heartbeat + Port = _port, + Role = NodeRole.Follower, + State = NodeState.Running, + LastSeen = DateTime.UtcNow, + SystemInfo = new SystemInfo() // We'll get this from heartbeat }; _knownNodes[nodeId] = newNode; NodeJoined?.Invoke(this, newNode); _logger?.LogInformation("Discovered new node {NodeId}", nodeId); } + else if (nodeId != _nodeId && _knownNodes.ContainsKey(nodeId)) + { + // Update last seen for existing node + _knownNodes[nodeId].LastSeen = DateTime.UtcNow; + } } private async Task ProcessElectionRequestAsync(JsonElement data, StreamWriter writer) @@ -808,6 +921,29 @@ public class P2PNode : IDisposable } } + private IPAddress GetLocalBroadcastAddress() + { + try + { + var localIp = GetLocalIpAddress(); + if (localIp != null && localIp != IPAddress.Any) + { + var ipBytes = localIp.GetAddressBytes(); + // Set all host bits to 1 for broadcast + for (int i = 3; i >= 0; i--) + { + ipBytes[i] = 255; + } + return new IPAddress(ipBytes); + } + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to get local broadcast address"); + } + return IPAddress.Broadcast; + } + private SystemInfo GetSystemInfo() { return new SystemInfo