P2P abilities
This commit is contained in:
834
QemuVmManager.Core/P2PNode.cs
Normal file
834
QemuVmManager.Core/P2PNode.cs
Normal file
@@ -0,0 +1,834 @@
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using QemuVmManager.Models;
|
||||
|
||||
namespace QemuVmManager.Core;
|
||||
|
||||
public class P2PNode : IDisposable
|
||||
{
|
||||
private readonly string _nodeId;
|
||||
private readonly int _port;
|
||||
private readonly IUPnPManager _upnpManager;
|
||||
private readonly QemuProcessManager _qemuManager;
|
||||
private readonly ILogger<P2PNode>? _logger;
|
||||
|
||||
private readonly Dictionary<string, NodeInfo> _knownNodes = new();
|
||||
private readonly Dictionary<string, VmInstance> _localVms = new();
|
||||
private readonly object _clusterLock = new();
|
||||
|
||||
private UdpClient? _udpClient;
|
||||
private TcpListener? _tcpListener;
|
||||
private CancellationTokenSource? _cancellationTokenSource;
|
||||
private Task? _heartbeatTask;
|
||||
private Task? _electionTask;
|
||||
private Task? _discoveryTask;
|
||||
|
||||
private NodeRole _currentRole = NodeRole.Follower;
|
||||
private long _currentTerm = 0;
|
||||
private string? _votedFor;
|
||||
private DateTime _lastHeartbeat = DateTime.UtcNow;
|
||||
private bool _isRunning = false;
|
||||
|
||||
public event EventHandler<NodeRole>? RoleChanged;
|
||||
public event EventHandler<NodeInfo>? NodeJoined;
|
||||
public event EventHandler<NodeInfo>? NodeLeft;
|
||||
public event EventHandler<VmInstance>? VmStarted;
|
||||
public event EventHandler<VmInstance>? VmStopped;
|
||||
|
||||
public NodeInfo CurrentNode { get; private set; }
|
||||
public ClusterState ClusterState { get; private set; } = new();
|
||||
public bool IsMaster => _currentRole == NodeRole.Master;
|
||||
|
||||
public P2PNode(string nodeId, int port = 8080, IUPnPManager? upnpManager = null,
|
||||
QemuProcessManager? qemuManager = null, ILogger<P2PNode>? logger = null)
|
||||
{
|
||||
_nodeId = nodeId;
|
||||
_port = port;
|
||||
_upnpManager = upnpManager ?? new UPnPManager();
|
||||
_qemuManager = qemuManager ?? new QemuProcessManager();
|
||||
_logger = logger;
|
||||
|
||||
CurrentNode = new NodeInfo
|
||||
{
|
||||
NodeId = _nodeId,
|
||||
Hostname = Environment.MachineName,
|
||||
IpAddress = GetLocalIpAddress(),
|
||||
Port = _port,
|
||||
Role = NodeRole.Follower,
|
||||
State = NodeState.Stopped,
|
||||
SystemInfo = GetSystemInfo()
|
||||
};
|
||||
}
|
||||
|
||||
public async Task StartAsync()
|
||||
{
|
||||
if (_isRunning)
|
||||
return;
|
||||
|
||||
_logger?.LogInformation("Starting P2P node {NodeId} on port {Port}", _nodeId, _port);
|
||||
|
||||
_cancellationTokenSource = new CancellationTokenSource();
|
||||
_isRunning = true;
|
||||
CurrentNode.State = NodeState.Starting;
|
||||
|
||||
try
|
||||
{
|
||||
// Initialize network components
|
||||
await InitializeNetworkAsync();
|
||||
|
||||
// Start background tasks
|
||||
_heartbeatTask = Task.Run(() => HeartbeatLoopAsync(_cancellationTokenSource.Token));
|
||||
_electionTask = Task.Run(() => ElectionLoopAsync(_cancellationTokenSource.Token));
|
||||
_discoveryTask = Task.Run(() => DiscoveryLoopAsync(_cancellationTokenSource.Token));
|
||||
|
||||
CurrentNode.State = NodeState.Running;
|
||||
_logger?.LogInformation("P2P node {NodeId} started successfully", _nodeId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Failed to start P2P node {NodeId}", _nodeId);
|
||||
CurrentNode.State = NodeState.Error;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task StopAsync()
|
||||
{
|
||||
if (!_isRunning)
|
||||
return;
|
||||
|
||||
_logger?.LogInformation("Stopping P2P node {NodeId}", _nodeId);
|
||||
|
||||
_isRunning = false;
|
||||
CurrentNode.State = NodeState.Stopping;
|
||||
|
||||
_cancellationTokenSource?.Cancel();
|
||||
|
||||
// Stop all local VMs
|
||||
foreach (var vm in _localVms.Values.ToList())
|
||||
{
|
||||
await StopLocalVmAsync(vm.VmId);
|
||||
}
|
||||
|
||||
// Clean up network resources
|
||||
_udpClient?.Close();
|
||||
_tcpListener?.Stop();
|
||||
|
||||
CurrentNode.State = NodeState.Stopped;
|
||||
_logger?.LogInformation("P2P node {NodeId} stopped", _nodeId);
|
||||
}
|
||||
|
||||
public async Task<VmInstance> StartVmAsync(VmConfiguration config, string? targetNodeId = null)
|
||||
{
|
||||
var vmId = Guid.NewGuid().ToString();
|
||||
var targetNode = targetNodeId ?? _nodeId;
|
||||
|
||||
if (targetNode == _nodeId)
|
||||
{
|
||||
// Start VM locally
|
||||
return await StartLocalVmAsync(vmId, config);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Request remote node to start VM
|
||||
return await RequestRemoteVmStartAsync(vmId, config, targetNode);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> StopVmAsync(string vmId)
|
||||
{
|
||||
if (_localVms.ContainsKey(vmId))
|
||||
{
|
||||
return await StopLocalVmAsync(vmId);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Find which node has this VM and request stop
|
||||
var vm = ClusterState.DistributedVms.GetValueOrDefault(vmId);
|
||||
if (vm != null)
|
||||
{
|
||||
return await RequestRemoteVmStopAsync(vmId, vm.NodeId);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public async Task<PortForwardingResponse> RequestPortForwardingAsync(string vmId, int privatePort, int? publicPort = null)
|
||||
{
|
||||
if (!IsMaster)
|
||||
{
|
||||
throw new InvalidOperationException("Only the master node can request port forwarding");
|
||||
}
|
||||
|
||||
var vm = ClusterState.DistributedVms.GetValueOrDefault(vmId);
|
||||
if (vm == null)
|
||||
{
|
||||
return new PortForwardingResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = false,
|
||||
ErrorMessage = "VM not found"
|
||||
};
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var actualPublicPort = publicPort ?? await GetAvailablePortAsync();
|
||||
var success = await _upnpManager.AddPortMappingAsync(actualPublicPort, privatePort, $"QEMU VM {vmId}");
|
||||
|
||||
if (success)
|
||||
{
|
||||
var externalIp = await _upnpManager.GetExternalIpAddressAsync();
|
||||
vm.PublicEndpoint = new NetworkEndpoint
|
||||
{
|
||||
PublicIp = externalIp ?? IPAddress.Any,
|
||||
PublicPort = actualPublicPort,
|
||||
PrivateIp = vm.Configuration.Network.Interfaces.FirstOrDefault()?.Mac != null ?
|
||||
IPAddress.Parse("192.168.1.100") : IPAddress.Any, // Simplified
|
||||
PrivatePort = privatePort,
|
||||
Protocol = "TCP",
|
||||
Description = $"QEMU VM {vmId}"
|
||||
};
|
||||
|
||||
return new PortForwardingResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = true,
|
||||
PublicIp = externalIp,
|
||||
PublicPort = actualPublicPort
|
||||
};
|
||||
}
|
||||
|
||||
return new PortForwardingResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = false,
|
||||
ErrorMessage = "Failed to create port mapping"
|
||||
};
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Failed to request port forwarding for VM {VmId}", vmId);
|
||||
return new PortForwardingResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = false,
|
||||
ErrorMessage = ex.Message
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<VmMigrationResponse> MigrateVmAsync(string vmId, string targetNodeId)
|
||||
{
|
||||
var vm = ClusterState.DistributedVms.GetValueOrDefault(vmId);
|
||||
if (vm == null)
|
||||
{
|
||||
return new VmMigrationResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = false,
|
||||
ErrorMessage = "VM not found"
|
||||
};
|
||||
}
|
||||
|
||||
if (vm.NodeId == targetNodeId)
|
||||
{
|
||||
return new VmMigrationResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = true,
|
||||
ErrorMessage = "VM is already on target node"
|
||||
};
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Stop VM on source node
|
||||
await StopVmAsync(vmId);
|
||||
|
||||
// Start VM on target node
|
||||
var newVm = await StartVmAsync(vm.Configuration, targetNodeId);
|
||||
|
||||
return new VmMigrationResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = true
|
||||
};
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Failed to migrate VM {VmId} to node {TargetNodeId}", vmId, targetNodeId);
|
||||
return new VmMigrationResponse
|
||||
{
|
||||
VmId = vmId,
|
||||
Success = false,
|
||||
ErrorMessage = ex.Message
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private async Task InitializeNetworkAsync()
|
||||
{
|
||||
// Initialize UDP client for discovery and heartbeats
|
||||
_udpClient = new UdpClient(_port);
|
||||
_udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
||||
|
||||
// Initialize TCP listener for direct communication
|
||||
_tcpListener = new TcpListener(IPAddress.Any, _port);
|
||||
_tcpListener.Start();
|
||||
|
||||
// Start listening for incoming connections
|
||||
_ = Task.Run(() => ListenForConnectionsAsync(_cancellationTokenSource!.Token));
|
||||
}
|
||||
|
||||
private async Task HeartbeatLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var heartbeat = new HeartbeatMessage
|
||||
{
|
||||
NodeId = _nodeId,
|
||||
Role = _currentRole,
|
||||
Term = _currentTerm,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
Metadata = new Dictionary<string, object>
|
||||
{
|
||||
["running_vms"] = _localVms.Count,
|
||||
["cpu_usage"] = await GetCpuUsageAsync(),
|
||||
["memory_usage"] = await GetMemoryUsageAsync()
|
||||
}
|
||||
};
|
||||
|
||||
await BroadcastHeartbeatAsync(heartbeat);
|
||||
|
||||
// Check for stale nodes
|
||||
await CleanupStaleNodesAsync();
|
||||
|
||||
await Task.Delay(5000, cancellationToken); // Send heartbeat every 5 seconds
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error in heartbeat loop");
|
||||
await Task.Delay(1000, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ElectionLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var electionTimeout = TimeSpan.FromSeconds(10 + Random.Shared.Next(10)); // 10-20 seconds
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_currentRole == NodeRole.Follower)
|
||||
{
|
||||
// Wait for heartbeat from master
|
||||
var timeout = DateTime.UtcNow.Add(electionTimeout);
|
||||
while (DateTime.UtcNow < timeout && !cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
if (_lastHeartbeat > DateTime.UtcNow.AddSeconds(-5))
|
||||
{
|
||||
await Task.Delay(1000, cancellationToken);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// No heartbeat received, start election
|
||||
await StartElectionAsync();
|
||||
}
|
||||
}
|
||||
else if (_currentRole == NodeRole.Candidate)
|
||||
{
|
||||
// Wait for election results
|
||||
await Task.Delay(5000, cancellationToken);
|
||||
}
|
||||
else if (_currentRole == NodeRole.Master)
|
||||
{
|
||||
// Master continues to send heartbeats
|
||||
await Task.Delay(1000, cancellationToken);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error in election loop");
|
||||
await Task.Delay(1000, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task DiscoveryLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Send discovery message
|
||||
var discoveryMessage = JsonSerializer.Serialize(new
|
||||
{
|
||||
type = "discovery",
|
||||
nodeId = _nodeId,
|
||||
timestamp = DateTime.UtcNow
|
||||
});
|
||||
|
||||
var data = System.Text.Encoding.UTF8.GetBytes(discoveryMessage);
|
||||
await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(IPAddress.Broadcast, _port));
|
||||
|
||||
await Task.Delay(30000, cancellationToken); // Send discovery every 30 seconds
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error in discovery loop");
|
||||
await Task.Delay(5000, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StartElectionAsync()
|
||||
{
|
||||
_logger?.LogInformation("Starting election for term {Term}", _currentTerm + 1);
|
||||
|
||||
_currentTerm++;
|
||||
_currentRole = NodeRole.Candidate;
|
||||
_votedFor = _nodeId;
|
||||
|
||||
var electionRequest = new ElectionRequest
|
||||
{
|
||||
CandidateId = _nodeId,
|
||||
Term = _currentTerm,
|
||||
Timestamp = DateTime.UtcNow
|
||||
};
|
||||
|
||||
var votes = 1; // Vote for self
|
||||
var requiredVotes = (_knownNodes.Count + 1) / 2 + 1; // Majority
|
||||
|
||||
// Request votes from other nodes
|
||||
foreach (var node in _knownNodes.Values)
|
||||
{
|
||||
try
|
||||
{
|
||||
var response = await RequestVoteAsync(node, electionRequest);
|
||||
if (response.VoteGranted && response.Term == _currentTerm)
|
||||
{
|
||||
votes++;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to request vote from node {NodeId}", node.NodeId);
|
||||
}
|
||||
}
|
||||
|
||||
if (votes >= requiredVotes)
|
||||
{
|
||||
await BecomeMasterAsync();
|
||||
}
|
||||
else
|
||||
{
|
||||
_currentRole = NodeRole.Follower;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task BecomeMasterAsync()
|
||||
{
|
||||
_logger?.LogInformation("Becoming master for term {Term}", _currentTerm);
|
||||
|
||||
_currentRole = NodeRole.Master;
|
||||
CurrentNode.Role = NodeRole.Master;
|
||||
|
||||
// Update cluster state
|
||||
lock (_clusterLock)
|
||||
{
|
||||
ClusterState.MasterNode = CurrentNode;
|
||||
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
RoleChanged?.Invoke(this, NodeRole.Master);
|
||||
|
||||
// Start master-specific tasks
|
||||
if (IsMaster)
|
||||
{
|
||||
await StartMasterTasksAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StartMasterTasksAsync()
|
||||
{
|
||||
// Master-specific initialization
|
||||
_logger?.LogInformation("Starting master tasks");
|
||||
|
||||
// Check UPnP availability
|
||||
var upnpAvailable = await _upnpManager.IsUPnPAvailableAsync();
|
||||
_logger?.LogInformation("UPnP available: {Available}", upnpAvailable);
|
||||
}
|
||||
|
||||
private async Task<VmInstance> StartLocalVmAsync(string vmId, VmConfiguration config)
|
||||
{
|
||||
try
|
||||
{
|
||||
var success = await _qemuManager.StartVmAsync(config);
|
||||
if (!success)
|
||||
{
|
||||
throw new InvalidOperationException("Failed to start QEMU VM");
|
||||
}
|
||||
|
||||
var vmInstance = new VmInstance
|
||||
{
|
||||
VmId = vmId,
|
||||
VmName = config.Name,
|
||||
NodeId = _nodeId,
|
||||
State = VmState.Running,
|
||||
StartedAt = DateTime.UtcNow,
|
||||
Configuration = config
|
||||
};
|
||||
|
||||
_localVms[vmId] = vmInstance;
|
||||
|
||||
lock (_clusterLock)
|
||||
{
|
||||
ClusterState.DistributedVms[vmId] = vmInstance;
|
||||
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
VmStarted?.Invoke(this, vmInstance);
|
||||
_logger?.LogInformation("Started local VM {VmId} ({VmName})", vmId, config.Name);
|
||||
|
||||
return vmInstance;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Failed to start local VM {VmId}", vmId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<bool> StopLocalVmAsync(string vmId)
|
||||
{
|
||||
if (!_localVms.TryGetValue(vmId, out var vm))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var success = await _qemuManager.StopVmAsync(vm.VmName, true);
|
||||
if (success)
|
||||
{
|
||||
_localVms.Remove(vmId);
|
||||
|
||||
lock (_clusterLock)
|
||||
{
|
||||
ClusterState.DistributedVms.Remove(vmId);
|
||||
ClusterState.LastUpdated = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
VmStopped?.Invoke(this, vm);
|
||||
_logger?.LogInformation("Stopped local VM {VmId}", vmId);
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Failed to stop local VM {VmId}", vmId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<VmInstance> RequestRemoteVmStartAsync(string vmId, VmConfiguration config, string targetNodeId)
|
||||
{
|
||||
// This would implement RPC to remote node
|
||||
throw new NotImplementedException("Remote VM start not yet implemented");
|
||||
}
|
||||
|
||||
private async Task<bool> RequestRemoteVmStopAsync(string vmId, string targetNodeId)
|
||||
{
|
||||
// This would implement RPC to remote node
|
||||
throw new NotImplementedException("Remote VM stop not yet implemented");
|
||||
}
|
||||
|
||||
private async Task<ElectionResponse> RequestVoteAsync(NodeInfo node, ElectionRequest request)
|
||||
{
|
||||
// This would implement RPC to request vote
|
||||
throw new NotImplementedException("Vote request not yet implemented");
|
||||
}
|
||||
|
||||
private async Task BroadcastHeartbeatAsync(HeartbeatMessage heartbeat)
|
||||
{
|
||||
var message = JsonSerializer.Serialize(heartbeat);
|
||||
var data = System.Text.Encoding.UTF8.GetBytes(message);
|
||||
|
||||
foreach (var node in _knownNodes.Values)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _udpClient!.SendAsync(data, data.Length, new IPEndPoint(node.IpAddress, node.Port));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to send heartbeat to node {NodeId}", node.NodeId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CleanupStaleNodesAsync()
|
||||
{
|
||||
var staleThreshold = DateTime.UtcNow.AddSeconds(-30);
|
||||
var staleNodes = _knownNodes.Values.Where(n => n.LastSeen < staleThreshold).ToList();
|
||||
|
||||
foreach (var node in staleNodes)
|
||||
{
|
||||
_knownNodes.Remove(node.NodeId);
|
||||
NodeLeft?.Invoke(this, node);
|
||||
_logger?.LogInformation("Removed stale node {NodeId}", node.NodeId);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ListenForConnectionsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var client = await _tcpListener!.AcceptTcpClientAsync(cancellationToken);
|
||||
_ = Task.Run(() => HandleClientAsync(client, cancellationToken));
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error accepting client connection");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleClientAsync(TcpClient client, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var stream = client.GetStream();
|
||||
using var reader = new StreamReader(stream);
|
||||
using var writer = new StreamWriter(stream);
|
||||
|
||||
var message = await reader.ReadLineAsync();
|
||||
if (message != null)
|
||||
{
|
||||
await ProcessMessageAsync(message, writer);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error handling client connection");
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.Close();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessMessageAsync(string message, StreamWriter writer)
|
||||
{
|
||||
try
|
||||
{
|
||||
var data = JsonSerializer.Deserialize<JsonElement>(message);
|
||||
var messageType = data.GetProperty("type").GetString();
|
||||
|
||||
switch (messageType)
|
||||
{
|
||||
case "heartbeat":
|
||||
await ProcessHeartbeatAsync(data);
|
||||
break;
|
||||
case "discovery":
|
||||
await ProcessDiscoveryAsync(data);
|
||||
break;
|
||||
case "election_request":
|
||||
await ProcessElectionRequestAsync(data, writer);
|
||||
break;
|
||||
case "election_response":
|
||||
await ProcessElectionResponseAsync(data);
|
||||
break;
|
||||
default:
|
||||
_logger?.LogWarning("Unknown message type: {MessageType}", messageType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Error processing message: {Message}", message);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessHeartbeatAsync(JsonElement data)
|
||||
{
|
||||
var nodeId = data.GetProperty("nodeId").GetString()!;
|
||||
var role = Enum.Parse<NodeRole>(data.GetProperty("role").GetString()!);
|
||||
var term = data.GetProperty("term").GetInt64();
|
||||
var timestamp = data.GetProperty("timestamp").GetDateTime();
|
||||
|
||||
if (term > _currentTerm)
|
||||
{
|
||||
_currentTerm = term;
|
||||
_currentRole = NodeRole.Follower;
|
||||
_votedFor = null;
|
||||
}
|
||||
|
||||
if (role == NodeRole.Master && term >= _currentTerm)
|
||||
{
|
||||
_currentRole = NodeRole.Follower;
|
||||
_lastHeartbeat = timestamp;
|
||||
}
|
||||
|
||||
// Update node info
|
||||
if (_knownNodes.TryGetValue(nodeId, out var node))
|
||||
{
|
||||
node.LastSeen = DateTime.UtcNow;
|
||||
node.Role = role;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessDiscoveryAsync(JsonElement data)
|
||||
{
|
||||
var nodeId = data.GetProperty("nodeId").GetString()!;
|
||||
|
||||
if (nodeId != _nodeId && !_knownNodes.ContainsKey(nodeId))
|
||||
{
|
||||
var newNode = new NodeInfo
|
||||
{
|
||||
NodeId = nodeId,
|
||||
LastSeen = DateTime.UtcNow
|
||||
};
|
||||
|
||||
_knownNodes[nodeId] = newNode;
|
||||
NodeJoined?.Invoke(this, newNode);
|
||||
_logger?.LogInformation("Discovered new node {NodeId}", nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessElectionRequestAsync(JsonElement data, StreamWriter writer)
|
||||
{
|
||||
var candidateId = data.GetProperty("candidateId").GetString()!;
|
||||
var term = data.GetProperty("term").GetInt64();
|
||||
|
||||
var response = new ElectionResponse
|
||||
{
|
||||
VoterId = _nodeId,
|
||||
Term = _currentTerm,
|
||||
VoteGranted = false
|
||||
};
|
||||
|
||||
if (term > _currentTerm)
|
||||
{
|
||||
_currentTerm = term;
|
||||
_currentRole = NodeRole.Follower;
|
||||
_votedFor = null;
|
||||
}
|
||||
|
||||
if (term == _currentTerm && (_votedFor == null || _votedFor == candidateId))
|
||||
{
|
||||
_votedFor = candidateId;
|
||||
response.VoteGranted = true;
|
||||
}
|
||||
|
||||
var responseJson = JsonSerializer.Serialize(response);
|
||||
await writer.WriteLineAsync(responseJson);
|
||||
await writer.FlushAsync();
|
||||
}
|
||||
|
||||
private async Task ProcessElectionResponseAsync(JsonElement data)
|
||||
{
|
||||
// Handle election response
|
||||
var voterId = data.GetProperty("voterId").GetString()!;
|
||||
var voteGranted = data.GetProperty("voteGranted").GetBoolean();
|
||||
var term = data.GetProperty("term").GetInt64();
|
||||
|
||||
if (voteGranted && term == _currentTerm && _currentRole == NodeRole.Candidate)
|
||||
{
|
||||
// Count votes and potentially become master
|
||||
// This is simplified - in a real implementation you'd track votes
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<int> GetAvailablePortAsync()
|
||||
{
|
||||
// Find an available port for UPnP mapping
|
||||
using var listener = new TcpListener(IPAddress.Any, 0);
|
||||
listener.Start();
|
||||
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
||||
listener.Stop();
|
||||
return port;
|
||||
}
|
||||
|
||||
private async Task<double> GetCpuUsageAsync()
|
||||
{
|
||||
// Simplified CPU usage calculation
|
||||
return Environment.ProcessorCount * 0.1; // 10% per core
|
||||
}
|
||||
|
||||
private async Task<double> GetMemoryUsageAsync()
|
||||
{
|
||||
// Simplified memory usage calculation
|
||||
return GC.GetTotalMemory(false) / (1024.0 * 1024.0); // MB
|
||||
}
|
||||
|
||||
private IPAddress GetLocalIpAddress()
|
||||
{
|
||||
try
|
||||
{
|
||||
var host = Dns.GetHostEntry(Dns.GetHostName());
|
||||
return host.AddressList.FirstOrDefault(ip =>
|
||||
ip.AddressFamily == AddressFamily.InterNetwork &&
|
||||
!IPAddress.IsLoopback(ip)) ?? IPAddress.Any;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return IPAddress.Any;
|
||||
}
|
||||
}
|
||||
|
||||
private SystemInfo GetSystemInfo()
|
||||
{
|
||||
return new SystemInfo
|
||||
{
|
||||
OsName = Environment.OSVersion.Platform.ToString(),
|
||||
OsVersion = Environment.OSVersion.VersionString,
|
||||
Architecture = Environment.GetEnvironmentVariable("PROCESSOR_ARCHITECTURE") ?? "Unknown",
|
||||
CpuCores = Environment.ProcessorCount,
|
||||
TotalMemory = GC.GetGCMemoryInfo().TotalAvailableMemoryBytes / (1024 * 1024), // MB
|
||||
AvailableMemory = GC.GetTotalMemory(false) / (1024 * 1024), // MB
|
||||
AvailableVirtualization = _qemuManager.GetAvailableVirtualization(),
|
||||
QemuInstalled = _qemuManager.IsQemuInstalled(),
|
||||
QemuVersion = _qemuManager.GetQemuVersion()
|
||||
};
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
StopAsync().Wait();
|
||||
_udpClient?.Dispose();
|
||||
_tcpListener?.Stop();
|
||||
_cancellationTokenSource?.Dispose();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user