From 9a379044d987238700158ca40a27111e7874964c Mon Sep 17 00:00:00 2001 From: Emotion Date: Tue, 11 Jul 2023 20:32:10 +1200 Subject: [PATCH] Added Ellie.Coordinator --- Ellie.sln | 18 +- src/Ellie.Coordinator/CoordStartup.cs | 48 ++ .../Ellie.Coordinator.csproj | 20 + src/Ellie.Coordinator/LogSetup.cs | 43 ++ src/Ellie.Coordinator/Program.cs | 20 + .../Properties/launchSettings.json | 13 + .../Protos/coordinator.proto | 127 +++++ src/Ellie.Coordinator/README.md | 11 + .../Services/CoordinatorRunner.cs | 457 ++++++++++++++++++ .../Services/CoordinatorService.cs | 144 ++++++ src/Ellie.Coordinator/Shared/Config.cs | 21 + src/Ellie.Coordinator/Shared/CoordState.cs | 9 + .../Shared/JsonStatusObject.cs | 9 + src/Ellie.Coordinator/Shared/ShardStatus.cs | 15 + .../appsettings.Development.json | 9 + src/Ellie.Coordinator/appsettings.json | 20 + src/Ellie.Coordinator/coord.yml | 12 + 17 files changed, 994 insertions(+), 2 deletions(-) create mode 100644 src/Ellie.Coordinator/CoordStartup.cs create mode 100644 src/Ellie.Coordinator/Ellie.Coordinator.csproj create mode 100644 src/Ellie.Coordinator/LogSetup.cs create mode 100644 src/Ellie.Coordinator/Program.cs create mode 100644 src/Ellie.Coordinator/Properties/launchSettings.json create mode 100644 src/Ellie.Coordinator/Protos/coordinator.proto create mode 100644 src/Ellie.Coordinator/README.md create mode 100644 src/Ellie.Coordinator/Services/CoordinatorRunner.cs create mode 100644 src/Ellie.Coordinator/Services/CoordinatorService.cs create mode 100644 src/Ellie.Coordinator/Shared/Config.cs create mode 100644 src/Ellie.Coordinator/Shared/CoordState.cs create mode 100644 src/Ellie.Coordinator/Shared/JsonStatusObject.cs create mode 100644 src/Ellie.Coordinator/Shared/ShardStatus.cs create mode 100644 src/Ellie.Coordinator/appsettings.Development.json create mode 100644 src/Ellie.Coordinator/appsettings.json create mode 100644 src/Ellie.Coordinator/coord.yml diff --git a/Ellie.sln b/Ellie.sln index 25f5aa9..5c73b79 100644 --- a/Ellie.sln +++ b/Ellie.sln @@ -11,9 +11,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ayu", "ayu", "{5284415D-A43 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ayu.Discord.Voice", "src\ayu\Ayu.Discord.Voice\Ayu.Discord.Voice.csproj", "{34E6D136-B151-4B6E-A8E7-7A2FB7B06CA3}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ellie.Tests", "src\Ellie.Tests\Ellie.Tests.csproj", "{6A8CE149-3808-474F-A2E6-B89825BB5DC2}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ellie.Tests", "src\Ellie.Tests\Ellie.Tests.csproj", "{6A8CE149-3808-474F-A2E6-B89825BB5DC2}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ellie.Marmalade", "src\Ellie.Marmalade\Ellie.Marmalade.csproj", "{D6CF9ABE-205E-4699-90CA-0F18ED236490}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ellie.Marmalade", "src\Ellie.Marmalade\Ellie.Marmalade.csproj", "{D6CF9ABE-205E-4699-90CA-0F18ED236490}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ellie.Coordinator", "src\Ellie.Coordinator\Ellie.Coordinator.csproj", "{44BE7271-BABE-46BE-BB41-A5B6F1116C21}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ellie.Bot.Generators.Strings", "src\Ellie.Bot.Generators.Strings\Ellie.Bot.Generators.Strings.csproj", "{11DE9EB6-2793-4540-BE66-701D2D02903A}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -37,6 +41,14 @@ Global {D6CF9ABE-205E-4699-90CA-0F18ED236490}.Debug|Any CPU.Build.0 = Debug|Any CPU {D6CF9ABE-205E-4699-90CA-0F18ED236490}.Release|Any CPU.ActiveCfg = Release|Any CPU {D6CF9ABE-205E-4699-90CA-0F18ED236490}.Release|Any CPU.Build.0 = Release|Any CPU + {44BE7271-BABE-46BE-BB41-A5B6F1116C21}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {44BE7271-BABE-46BE-BB41-A5B6F1116C21}.Debug|Any CPU.Build.0 = Debug|Any CPU + {44BE7271-BABE-46BE-BB41-A5B6F1116C21}.Release|Any CPU.ActiveCfg = Release|Any CPU + {44BE7271-BABE-46BE-BB41-A5B6F1116C21}.Release|Any CPU.Build.0 = Release|Any CPU + {11DE9EB6-2793-4540-BE66-701D2D02903A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {11DE9EB6-2793-4540-BE66-701D2D02903A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {11DE9EB6-2793-4540-BE66-701D2D02903A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {11DE9EB6-2793-4540-BE66-701D2D02903A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -47,6 +59,8 @@ Global {34E6D136-B151-4B6E-A8E7-7A2FB7B06CA3} = {5284415D-A43F-4539-9483-410124199743} {6A8CE149-3808-474F-A2E6-B89825BB5DC2} = {C5E3EF2E-72CF-41BB-B0C5-EB4C08403E67} {D6CF9ABE-205E-4699-90CA-0F18ED236490} = {C5E3EF2E-72CF-41BB-B0C5-EB4C08403E67} + {44BE7271-BABE-46BE-BB41-A5B6F1116C21} = {C5E3EF2E-72CF-41BB-B0C5-EB4C08403E67} + {11DE9EB6-2793-4540-BE66-701D2D02903A} = {C5E3EF2E-72CF-41BB-B0C5-EB4C08403E67} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {878761F1-C7B5-4D38-A00D-3377D703EBBA} diff --git a/src/Ellie.Coordinator/CoordStartup.cs b/src/Ellie.Coordinator/CoordStartup.cs new file mode 100644 index 0000000..42fcf6a --- /dev/null +++ b/src/Ellie.Coordinator/CoordStartup.cs @@ -0,0 +1,48 @@ +using Ellie.Coordinator; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Ellie.Coordinator +{ + public class CoordStartup + { + public IConfiguration Configuration { get; } + + public CoordStartup(IConfiguration config) + => Configuration = config; + + public void ConfigureServices(IServiceCollection services) + { + services.AddGrpc(); + services.AddSingleton(); + services.AddSingleton( + serviceProvider => serviceProvider.GetRequiredService()); + } + + public void Configure(IApplicationBuilder app, IWebHostEnvironment env) + { + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapGrpcService(); + + endpoints.MapGet("/", + async context => + { + await context.Response.WriteAsync( + "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); + }); + }); + } + } +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/Ellie.Coordinator.csproj b/src/Ellie.Coordinator/Ellie.Coordinator.csproj new file mode 100644 index 0000000..765dca0 --- /dev/null +++ b/src/Ellie.Coordinator/Ellie.Coordinator.csproj @@ -0,0 +1,20 @@ + + + + net7.0 + CS8981 + + + + + + + + + + + + + + + diff --git a/src/Ellie.Coordinator/LogSetup.cs b/src/Ellie.Coordinator/LogSetup.cs new file mode 100644 index 0000000..4f6cac0 --- /dev/null +++ b/src/Ellie.Coordinator/LogSetup.cs @@ -0,0 +1,43 @@ +using System; +using System.Text; +using Serilog; +using Serilog.Events; +using Serilog.Sinks.SystemConsole.Themes; + +namespace Ellie.Coordinator +{ + public static class LogSetup + { + public static void SetupLogger(object source) + { + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Override("Microsoft", LogEventLevel.Information) + .MinimumLevel.Override("System", LogEventLevel.Information) + .MinimumLevel.Override("Microsoft.AspNetCore", LogEventLevel.Warning) + .Enrich.FromLogContext() + .WriteTo.File("coord.log", LogEventLevel.Information, + rollOnFileSizeLimit: true, + fileSizeLimitBytes: 10_000_000) + .WriteTo.Console(LogEventLevel.Information, + theme: GetTheme(), + outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] | #{LogSource} | {Message:lj}{NewLine}{Exception}") + .Enrich.WithProperty("LogSource", source) + .CreateLogger(); + + Console.OutputEncoding = Encoding.UTF8; + } + + private static ConsoleTheme GetTheme() + { + if (Environment.OSVersion.Platform == PlatformID.Unix) + return AnsiConsoleTheme.Code; + + +#if DEBUG + return AnsiConsoleTheme.Code; +#else + return ConsoleTheme.None; +#endif + } + } +} diff --git a/src/Ellie.Coordinator/Program.cs b/src/Ellie.Coordinator/Program.cs new file mode 100644 index 0000000..c170211 --- /dev/null +++ b/src/Ellie.Coordinator/Program.cs @@ -0,0 +1,20 @@ +using System; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Hosting; +using Ellie.Coordinator; +using Ellie.Services; +using Serilog; + +// Additional configuration is required to successfully run gRPC on macOS. +// For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682 +static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.UseStartup(); + }); + +LogSetup.SetupLogger("coord"); +Log.Information("Starting coordinator... Pid: {ProcessId}", Environment.ProcessId); + +CreateHostBuilder(args).Build().Run(); \ No newline at end of file diff --git a/src/Ellie.Coordinator/Properties/launchSettings.json b/src/Ellie.Coordinator/Properties/launchSettings.json new file mode 100644 index 0000000..3e9de4f --- /dev/null +++ b/src/Ellie.Coordinator/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "profiles": { + "Ellie.Coordinator": { + "commandName": "Project", + "dotnetRunMessages": "true", + "launchBrowser": false, + "applicationUrl": "http://localhost:3442;https://localhost:3443", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/src/Ellie.Coordinator/Protos/coordinator.proto b/src/Ellie.Coordinator/Protos/coordinator.proto new file mode 100644 index 0000000..f096778 --- /dev/null +++ b/src/Ellie.Coordinator/Protos/coordinator.proto @@ -0,0 +1,127 @@ +syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + +option csharp_namespace = "Ellie.Coordinator"; + +package ellie; + +service Coordinator { + // sends update to coordinator to let it know that the shard is alive + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatReply); + // restarts a shard given the id + rpc RestartShard(RestartShardRequest) returns (RestartShardReply); + // reshards given the new number of shards + rpc Reshard(ReshardRequest) returns (ReshardReply); + // Reload config + rpc Reload(ReloadRequest) returns (ReloadReply); + // Gets status of a single shard + rpc GetStatus(GetStatusRequest) returns (GetStatusReply); + // Get status of all shards + rpc GetAllStatuses(GetAllStatusesRequest) returns (GetAllStatusesReply); + // Restarts all shards. Queues them to be restarted at a normal rate. Setting Nuke to true will kill all shards right + // away + rpc RestartAllShards(RestartAllRequest) returns (RestartAllReply); + + // kill coordinator (and all shards as a consequence) + rpc Die(DieRequest) returns (DieReply); + + rpc SetConfigText(SetConfigTextRequest) returns (SetConfigTextReply); + + rpc GetConfigText(GetConfigTextRequest) returns (GetConfigTextReply); +} + +enum ConnState { + Disconnected = 0; + Connecting = 1; + Connected = 2; +} + +message HeartbeatRequest { + int32 shardId = 1; + int32 guildCount = 2; + ConnState state = 3; +} + +message HeartbeatReply { + bool gracefulImminent = 1; +} + +message RestartShardRequest { + int32 shardId = 1; + // should it be queued for restart, set false to kill it and restart immediately with priority + bool queue = 2; +} + +message RestartShardReply { + +} + +message ReshardRequest { + int32 shards = 1; +} + +message ReshardReply { + +} + +message ReloadRequest { + +} + +message ReloadReply { + +} + +message GetStatusRequest { + int32 shardId = 1; +} + +message GetStatusReply { + int32 shardId = 1; + ConnState state = 2; + int32 guildCount = 3; + google.protobuf.Timestamp lastUpdate = 4; + bool scheduledForRestart = 5; + google.protobuf.Timestamp startedAt = 6; +} + +message GetAllStatusesRequest { + +} + +message GetAllStatusesReply { + repeated GetStatusReply Statuses = 1; +} + +message RestartAllRequest { + bool nuke = 1; +} + +message RestartAllReply { + +} + +message DieRequest { + bool graceful = 1; +} + +message DieReply { + +} + +message GetConfigTextRequest { + +} + +message GetConfigTextReply { + string configYml = 1; +} + +message SetConfigTextRequest { + string configYml = 1; +} + +message SetConfigTextReply { + bool success = 1; + string error = 2; +} diff --git a/src/Ellie.Coordinator/README.md b/src/Ellie.Coordinator/README.md new file mode 100644 index 0000000..594585e --- /dev/null +++ b/src/Ellie.Coordinator/README.md @@ -0,0 +1,11 @@ +# Coordinator project + +Grpc-based coordinator useful for sharded NadekoBot. Its purpose is controlling the lifetime and checking status of the shards it creates. + + +### Supports +- Checking status +- Individual shard restarts +- Full shard restarts +- Graceful coordinator restarts (restart/update coordinator without killing shards) +- Kill/Stop \ No newline at end of file diff --git a/src/Ellie.Coordinator/Services/CoordinatorRunner.cs b/src/Ellie.Coordinator/Services/CoordinatorRunner.cs new file mode 100644 index 0000000..007fd48 --- /dev/null +++ b/src/Ellie.Coordinator/Services/CoordinatorRunner.cs @@ -0,0 +1,457 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Serilog; +using YamlDotNet.Serialization; + +namespace Ellie.Coordinator +{ + public sealed class CoordinatorRunner : BackgroundService + { + private const string CONFIG_PATH = "coord.yml"; + + private const string GRACEFUL_STATE_PATH = "graceful.json"; + private const string GRACEFUL_STATE_BACKUP_PATH = "graceful_old.json"; + + private readonly Serializer _serializer; + private readonly Deserializer _deserializer; + + private Config _config; + private ShardStatus[] _shardStatuses; + + private readonly object locker = new object(); + private readonly Random _rng; + private bool _gracefulImminent; + + public CoordinatorRunner() + { + _serializer = new(); + _deserializer = new(); + _config = LoadConfig(); + _rng = new Random(); + + if (!TryRestoreOldState()) + InitAll(); + } + + private Config LoadConfig() + { + lock (locker) + { + return _deserializer.Deserialize(File.ReadAllText(CONFIG_PATH)); + } + } + + private void SaveConfig(in Config config) + { + lock (locker) + { + var output = _serializer.Serialize(config); + File.WriteAllText(CONFIG_PATH, output); + } + } + + public void ReloadConfig() + { + lock (locker) + { + var oldConfig = _config; + var newConfig = LoadConfig(); + if (oldConfig.TotalShards != newConfig.TotalShards) + { + KillAll(); + } + _config = newConfig; + if (oldConfig.TotalShards != newConfig.TotalShards) + { + InitAll(); + } + } + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // Log.Information("Executing"); + + bool first = true; + while (!stoppingToken.IsCancellationRequested) + { + try + { + bool hadAction = false; + lock (locker) + { + var shardIds = Enumerable.Range(0, 1) // shard 0 is always first + .Append((int)((117523346618318850 >> 22) % _config.TotalShards)) // then nadeko server shard + .Concat(Enumerable.Range(1, _config.TotalShards - 1) + .OrderBy(_ => _rng.Next())) // then all other shards in a random order + .Distinct() + .ToList(); + + if (first) + { + // Log.Information("Startup order: {StartupOrder}",string.Join(' ', shardIds)); + first = false; + } + + foreach (var shardId in shardIds) + { + if (stoppingToken.IsCancellationRequested) + break; + + var status = _shardStatuses[shardId]; + + if (status.ShouldRestart) + { + Log.Warning("Shard {ShardId} is restarting (scheduled)...", shardId); + hadAction = true; + StartShard(shardId); + break; + } + + if (DateTime.UtcNow - status.LastUpdate > + TimeSpan.FromSeconds(_config.UnresponsiveSec)) + { + Log.Warning("Shard {ShardId} is restarting (unresponsive)...", shardId); + hadAction = true; + StartShard(shardId); + break; + } + + if (status.StateCounter > 8 && status.State != ConnState.Connected) + { + Log.Warning("Shard {ShardId} is restarting (stuck)...", shardId); + hadAction = true; + StartShard(shardId); + break; + } + + try + { + if (status.Process is null or { HasExited: true }) + { + Log.Warning("Shard {ShardId} is starting (process)...", shardId); + hadAction = true; + StartShard(shardId); + break; + } + } + catch (InvalidOperationException) + { + Log.Warning("Process for shard {ShardId} is bugged... ", shardId); + hadAction = true; + StartShard(shardId); + break; + } + } + } + + if (hadAction) + { + await Task.Delay(_config.RecheckIntervalMs, stoppingToken).ConfigureAwait(false); + } + } + catch (Exception ex) + { + Log.Error(ex, "Error in coordinator: {Message}", ex.Message); + } + + await Task.Delay(5000, stoppingToken).ConfigureAwait(false); + } + } + + private void StartShard(int shardId) + { + var status = _shardStatuses[shardId]; + try + { + status.Process?.Kill(true); + } + catch + { + } + try + { + status.Process?.Dispose(); + } + catch + { + } + + var proc = StartShardProcess(shardId); + _shardStatuses[shardId] = status with + { + Process = proc, + LastUpdate = DateTime.UtcNow, + State = ConnState.Disconnected, + ShouldRestart = false, + StateCounter = 0, + }; + } + + private Process StartShardProcess(int shardId) + => Process.Start(new ProcessStartInfo() + { + FileName = _config.ShardStartCommand, + Arguments = string.Format(_config.ShardStartArgs, + shardId, + _config.TotalShards), + EnvironmentVariables = + { + {"ELLIE_IS_COORDINATED", "1"} + } + // CreateNoWindow = true, + // UseShellExecute = false, + }); + + public bool Heartbeat(int shardId, int guildCount, ConnState state) + { + lock (locker) + { + if (shardId >= _shardStatuses.Length) + throw new ArgumentOutOfRangeException(nameof(shardId)); + + var status = _shardStatuses[shardId]; + status = _shardStatuses[shardId] = status with + { + GuildCount = guildCount, + State = state, + LastUpdate = DateTime.UtcNow, + StateCounter = status.State == state + ? status.StateCounter + 1 + : 1 + }; + if (status.StateCounter > 1 && status.State == ConnState.Disconnected) + { + Log.Warning("Shard {ShardId} is in DISCONNECTED state! ({StateCounter})", + status.ShardId, + status.StateCounter); + } + + return _gracefulImminent; + } + } + + public void SetShardCount(int totalShards) + { + lock (locker) + { + SaveConfig(new Config( + totalShards, + _config.RecheckIntervalMs, + _config.ShardStartCommand, + _config.ShardStartArgs, + _config.UnresponsiveSec)); + } + } + + public void RestartShard(int shardId, bool queue) + { + lock (locker) + { + if (shardId >= _shardStatuses.Length) + throw new ArgumentOutOfRangeException(nameof(shardId)); + + _shardStatuses[shardId] = _shardStatuses[shardId] with + { + ShouldRestart = true, + StateCounter = 0, + }; + } + } + + public void RestartAll(bool nuke) + { + lock (locker) + { + if (nuke) + { + KillAll(); + } + + QueueAll(); + } + } + + private void KillAll() + { + lock (locker) + { + for (var shardId = 0; shardId < _shardStatuses.Length; shardId++) + { + var status = _shardStatuses[shardId]; + if (status.Process is Process p) + { + try { p.Kill(); } catch { } + try { p.Dispose(); } catch { } + _shardStatuses[shardId] = status with + { + Process = null, + ShouldRestart = true, + LastUpdate = DateTime.UtcNow, + State = ConnState.Disconnected, + StateCounter = 0, + }; + } + } + } + } + + public void SaveState() + { + var coordState = new CoordState() + { + StatusObjects = _shardStatuses + .Select(x => new JsonStatusObject() + { + Pid = x.Process?.Id, + ConnectionState = x.State, + GuildCount = x.GuildCount, + }) + .ToList() + }; + var jsonState = JsonSerializer.Serialize(coordState, new JsonSerializerOptions() + { + WriteIndented = true, + }); + File.WriteAllText(GRACEFUL_STATE_PATH, jsonState); + } + private bool TryRestoreOldState() + { + lock (locker) + { + if (!File.Exists(GRACEFUL_STATE_PATH)) + return false; + + Log.Information("Restoring old coordinator state..."); + + CoordState savedState; + try + { + savedState = JsonSerializer.Deserialize(File.ReadAllText(GRACEFUL_STATE_PATH)); + + if (savedState is null) + throw new Exception("Old state is null?!"); + } + catch (Exception ex) + { + Log.Error(ex, "Error deserializing old state: {Message}", ex.Message); + File.Move(GRACEFUL_STATE_PATH, GRACEFUL_STATE_BACKUP_PATH, overwrite: true); + return false; + } + + if (savedState.StatusObjects.Count != _config.TotalShards) + { + Log.Error("Unable to restore old state because shard count doesn't match"); + File.Move(GRACEFUL_STATE_PATH, GRACEFUL_STATE_BACKUP_PATH, overwrite: true); + return false; + } + + _shardStatuses = new ShardStatus[_config.TotalShards]; + + for (int shardId = 0; shardId < _shardStatuses.Length; shardId++) + { + var statusObj = savedState.StatusObjects[shardId]; + Process p = null; + if (statusObj.Pid is { } pid) + { + try + { + p = Process.GetProcessById(pid); + } + catch (Exception ex) + { + Log.Warning(ex, "Process for shard {ShardId} is not runnning", shardId); + } + } + + _shardStatuses[shardId] = new( + shardId, + DateTime.UtcNow, + statusObj.GuildCount, + statusObj.ConnectionState, + p is null, + p); + } + + File.Move(GRACEFUL_STATE_PATH, GRACEFUL_STATE_BACKUP_PATH, overwrite: true); + Log.Information("Old state restored!"); + return true; + } + } + + private void InitAll() + { + lock (locker) + { + _shardStatuses = new ShardStatus[_config.TotalShards]; + for (var shardId = 0; shardId < _shardStatuses.Length; shardId++) + { + _shardStatuses[shardId] = new ShardStatus(shardId, DateTime.UtcNow); + } + } + } + + private void QueueAll() + { + lock (locker) + { + for (var shardId = 0; shardId < _shardStatuses.Length; shardId++) + { + _shardStatuses[shardId] = _shardStatuses[shardId] with + { + ShouldRestart = true + }; + } + } + } + + + public ShardStatus GetShardStatus(int shardId) + { + lock (locker) + { + if (shardId >= _shardStatuses.Length) + throw new ArgumentOutOfRangeException(nameof(shardId)); + + return _shardStatuses[shardId]; + } + } + + public List GetAllStatuses() + { + lock (locker) + { + var toReturn = new List(_shardStatuses.Length); + toReturn.AddRange(_shardStatuses); + return toReturn; + } + } + + public void PrepareGracefulShutdown() + { + lock (locker) + { + _gracefulImminent = true; + } + } + + public string GetConfigText() + => File.ReadAllText(CONFIG_PATH); + + public void SetConfigText(string text) + { + if (string.IsNullOrWhiteSpace(text)) + throw new ArgumentNullException(nameof(text), "coord.yml can't be empty"); + var config = _deserializer.Deserialize(text); + SaveConfig(in config); + ReloadConfig(); + } + } +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/Services/CoordinatorService.cs b/src/Ellie.Coordinator/Services/CoordinatorService.cs new file mode 100644 index 0000000..675621d --- /dev/null +++ b/src/Ellie.Coordinator/Services/CoordinatorService.cs @@ -0,0 +1,144 @@ +using System; +using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; + +namespace Ellie.Coordinator +{ + public sealed class CoordinatorService : Coordinator.CoordinatorBase + { + private readonly CoordinatorRunner _runner; + + public CoordinatorService(CoordinatorRunner runner) + => _runner = runner; + + public override Task Heartbeat(HeartbeatRequest request, ServerCallContext context) + { + var gracefulImminent = _runner.Heartbeat(request.ShardId, request.GuildCount, request.State); + return Task.FromResult(new HeartbeatReply() + { + GracefulImminent = gracefulImminent + }); + } + + public override Task Reshard(ReshardRequest request, ServerCallContext context) + { + _runner.SetShardCount(request.Shards); + return Task.FromResult(new ReshardReply()); + } + + public override Task RestartShard(RestartShardRequest request, ServerCallContext context) + { + _runner.RestartShard(request.ShardId, request.Queue); + return Task.FromResult(new RestartShardReply()); + } + + public override Task Reload(ReloadRequest request, ServerCallContext context) + { + _runner.ReloadConfig(); + return Task.FromResult(new ReloadReply()); + } + + public override Task GetStatus(GetStatusRequest request, ServerCallContext context) + { + var status = _runner.GetShardStatus(request.ShardId); + + + return Task.FromResult(StatusToStatusReply(status)); + } + + public override Task GetAllStatuses(GetAllStatusesRequest request, + ServerCallContext context) + { + var statuses = _runner + .GetAllStatuses(); + + var reply = new GetAllStatusesReply(); + foreach (var status in statuses) + reply.Statuses.Add(StatusToStatusReply(status)); + + return Task.FromResult(reply); + } + + private static GetStatusReply StatusToStatusReply(ShardStatus status) + { + DateTime startTime; + try + { + startTime = status.Process is null or { HasExited: true } + ? DateTime.MinValue.ToUniversalTime() + : status.Process.StartTime.ToUniversalTime(); + } + catch + { + startTime = DateTime.MinValue.ToUniversalTime(); + } + + var reply = new GetStatusReply() + { + State = status.State, + GuildCount = status.GuildCount, + ShardId = status.ShardId, + LastUpdate = Timestamp.FromDateTime(status.LastUpdate), + ScheduledForRestart = status.ShouldRestart, + StartedAt = Timestamp.FromDateTime(startTime) + }; + + return reply; + } + + public override Task RestartAllShards(RestartAllRequest request, ServerCallContext context) + { + _runner.RestartAll(request.Nuke); + return Task.FromResult(new RestartAllReply()); + } + + public override async Task Die(DieRequest request, ServerCallContext context) + { + if (request.Graceful) + { + _runner.PrepareGracefulShutdown(); + await Task.Delay(10_000); + } + + _runner.SaveState(); + _ = Task.Run(async () => + { + await Task.Delay(250); + Environment.Exit(0); + }); + + return new DieReply(); + } + + public override Task SetConfigText(SetConfigTextRequest request, ServerCallContext context) + { + var error = string.Empty; + var success = true; + try + { + _runner.SetConfigText(request.ConfigYml); + } + catch (Exception ex) + { + error = ex.Message; + success = false; + } + + return Task.FromResult(new(new() + { + Success = success, + Error = error + })); + } + + public override Task GetConfigText(GetConfigTextRequest request, ServerCallContext context) + { + var text = _runner.GetConfigText(); + return Task.FromResult(new GetConfigTextReply() + { + ConfigYml = text, + }); + } + } +} diff --git a/src/Ellie.Coordinator/Shared/Config.cs b/src/Ellie.Coordinator/Shared/Config.cs new file mode 100644 index 0000000..21d0037 --- /dev/null +++ b/src/Ellie.Coordinator/Shared/Config.cs @@ -0,0 +1,21 @@ +namespace Ellie.Coordinator +{ + public readonly struct Config + { + public int TotalShards { get; init; } + public int RecheckIntervalMs { get; init; } + public string ShardStartCommand { get; init; } + public string ShardStartArgs { get; init; } + public double UnresponsiveSec { get; init; } + + public Config(int totalShards, int recheckIntervalMs, string shardStartCommand, string shardStartArgs, double unresponsiveSec) + { + TotalShards = totalShards; + RecheckIntervalMs = recheckIntervalMs; + ShardStartCommand = shardStartCommand; + ShardStartArgs = shardStartArgs; + UnresponsiveSec = unresponsiveSec; + } + + } +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/Shared/CoordState.cs b/src/Ellie.Coordinator/Shared/CoordState.cs new file mode 100644 index 0000000..7299f57 --- /dev/null +++ b/src/Ellie.Coordinator/Shared/CoordState.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Ellie.Coordinator +{ + public class CoordState + { + public List StatusObjects { get; init; } + } +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/Shared/JsonStatusObject.cs b/src/Ellie.Coordinator/Shared/JsonStatusObject.cs new file mode 100644 index 0000000..14c990c --- /dev/null +++ b/src/Ellie.Coordinator/Shared/JsonStatusObject.cs @@ -0,0 +1,9 @@ +namespace Ellie.Coordinator +{ + public class JsonStatusObject + { + public int? Pid { get; init; } + public int GuildCount { get; init; } + public ConnState ConnectionState { get; init; } + } +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/Shared/ShardStatus.cs b/src/Ellie.Coordinator/Shared/ShardStatus.cs new file mode 100644 index 0000000..ce092e4 --- /dev/null +++ b/src/Ellie.Coordinator/Shared/ShardStatus.cs @@ -0,0 +1,15 @@ +using System; +using System.Diagnostics; + +namespace Ellie.Coordinator +{ + public sealed record ShardStatus( + int ShardId, + DateTime LastUpdate, + int GuildCount = 0, + ConnState State = ConnState.Disconnected, + bool ShouldRestart = false, + Process Process = null, + int StateCounter = 0 + ); +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/appsettings.Development.json b/src/Ellie.Coordinator/appsettings.Development.json new file mode 100644 index 0000000..45fe774 --- /dev/null +++ b/src/Ellie.Coordinator/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/appsettings.json b/src/Ellie.Coordinator/appsettings.json new file mode 100644 index 0000000..989c443 --- /dev/null +++ b/src/Ellie.Coordinator/appsettings.json @@ -0,0 +1,20 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "AllowedHosts": "*", + "Kestrel": { + "EndpointDefaults": { + "Protocols": "Http2" + }, + "Endpoints": { + "Http": { + "Url": "http://localhost:3442" + } + } + } +} \ No newline at end of file diff --git a/src/Ellie.Coordinator/coord.yml b/src/Ellie.Coordinator/coord.yml new file mode 100644 index 0000000..dccd607 --- /dev/null +++ b/src/Ellie.Coordinator/coord.yml @@ -0,0 +1,12 @@ +# total number of shards +TotalShards: 3 +# How often do shards ping their state back to the coordinator +RecheckIntervalMs: 5000 +# Command to run the shard +ShardStartCommand: dotnet +# Arguments to run the shard +# {0} = shard id +# {1} = total number of shards +ShardStartArgs: run -p "..\Ellie\Ellie.csproj" --no-build -- {0} {1} +# How long does it take for the shard to be forcefully restarted once it stops reporting its state +UnresponsiveSec: 30