wip reimplementation of xp gain loop

This commit is contained in:
Toastie 2025-02-06 12:34:22 +13:00
parent fd464731d5
commit 96c4ea0637
Signed by: toastie_t0ast
GPG key ID: 0861BE54AD481DC7
2 changed files with 154 additions and 257 deletions
src/EllieBot
Db/Models
Modules/Xp

View file

@ -1,17 +1,17 @@
#nullable disable
namespace EllieBot.Db.Models;
// FUTURE remove LastLevelUp from here and UserXpStats
public class DiscordUser : DbEntity
{
public const string DEFAULT_USERNAME = "??Unknown";
public ulong UserId { get; set; }
public string Username { get; set; }
// public string Discriminator { get; set; }
public string AvatarId { get; set; }
public string? Username { get; set; }
public string? AvatarId { get; set; }
public int? ClubId { get; set; }
public ClubInfo Club { get; set; }
public ClubInfo? Club { get; set; }
public bool IsClubAdmin { get; set; }
public long TotalXp { get; set; }

View file

@ -1,4 +1,5 @@
#nullable disable warnings
using System.ComponentModel.DataAnnotations;
using LinqToDB;
using Microsoft.EntityFrameworkCore;
using EllieBot.Common.ModuleBehaviors;
@ -11,10 +12,12 @@ using SixLabors.ImageSharp.Formats;
using SixLabors.ImageSharp.PixelFormats;
using SixLabors.ImageSharp.Processing;
using System.Threading.Channels;
using LinqToDB.Data;
using LinqToDB.EntityFrameworkCore;
using LinqToDB.Tools;
using EllieBot.Modules.Administration;
using EllieBot.Modules.Patronage;
using ArgumentOutOfRangeException = System.ArgumentOutOfRangeException;
using Color = SixLabors.ImageSharp.Color;
using Exception = System.Exception;
using Image = SixLabors.ImageSharp.Image;
@ -36,7 +39,7 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
private readonly ConcurrentDictionary<ulong, ConcurrentHashSet<ulong>> _excludedChannels = new();
private readonly ConcurrentHashSet<ulong> _excludedServers;
private XpTemplate _template;
private XpTemplate _template = new();
private readonly DiscordSocketClient _client;
private readonly TypedKey<bool> _xpTemplateReloadKey;
@ -44,9 +47,7 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
private readonly IBotCache _c;
private readonly QueueRunner _levelUpQueue = new(0, 50);
private readonly Channel<UserXpGainData> _xpGainQueue = Channel.CreateUnbounded<UserXpGainData>();
private readonly IMessageSenderService _sender;
private readonly INotifySubscriber _notifySub;
private readonly ShardData _shardData;
@ -62,7 +63,6 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
XpConfigService xpConfig,
IPubSub pubSub,
IPatronageService ps,
IMessageSenderService sender,
INotifySubscriber notifySub,
ShardData shardData)
{
@ -74,7 +74,6 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
_httpFactory = http;
_xpConfig = xpConfig;
_pubSub = pubSub;
_sender = sender;
_notifySub = notifySub;
_shardData = shardData;
_excludedServers = new();
@ -109,9 +108,9 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
public async Task OnReadyAsync()
{
_ = Task.Run(() => _levelUpQueue.RunAsync());
// initialize ignored
ArgumentOutOfRangeException.ThrowIfLessThan(_xpConfig.Data.MessageXpCooldown, 1,
nameof(_xpConfig.Data.MessageXpCooldown));
await using (var ctx = _db.GetDbContext())
{
@ -135,173 +134,69 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
}
}
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
await Task.WhenAll(UpdateTimer(), PeriodClearTimer());
return;
async Task PeriodClearTimer()
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(_xpConfig.Data.MessageXpCooldown));
while (true)
{
await timer.WaitForNextTickAsync();
_usersGainedInPeriod.Clear();
}
}
async Task UpdateTimer()
{
// todo a bigger loop that runs once every XpTimer
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(3));
while (await timer.WaitForNextTickAsync())
{
await UpdateXp();
}
}
}
/// <summary>
/// The current batch of users that will gain xp
/// </summary>
private readonly ConcurrentHashSet<IGuildUser> _usersBatch = new();
private async Task UpdateXp()
{
try
{
var reader = _xpGainQueue.Reader;
var xpAmount = _xpConfig.Data.XpPerMessage;
var currentBatch = _usersBatch.ToArray();
_usersBatch.Clear();
// sum up all gains into a single UserCacheItem
var globalToAdd = new Dictionary<ulong, UserXpGainData>();
var guildToAdd = new Dictionary<ulong, Dictionary<ulong, UserXpGainData>>();
while (reader.TryRead(out var item))
{
// add global xp to these users
if (!globalToAdd.TryGetValue(item.User.Id, out var ci))
globalToAdd[item.User.Id] = item.Clone();
else
ci.XpAmount += item.XpAmount;
await using var ctx = _db.GetDbContext();
await using var lctx = ctx.CreateLinqToDBConnection();
await using var batchTable = await lctx.CreateTempTableAsync<UserXpBatch>();
// ad guild xp in these guilds to these users
if (!guildToAdd.TryGetValue(item.Guild.Id, out var users))
users = guildToAdd[item.Guild.Id] = new();
await batchTable.BulkCopyAsync(currentBatch.Select(x => new UserXpBatch()
{
GuildId = x.GuildId,
UserId = x.Id,
UserName = x.Username,
AvatarId = x.DisplayAvatarId,
if (!users.TryGetValue(item.User.Id, out ci))
users[item.User.Id] = item.Clone();
else
ci.XpAmount += item.XpAmount;
}
}));
var dus = new List<DiscordUser>(globalToAdd.Count);
var gxps = new List<UserXpStats>(globalToAdd.Count);
var conf = _xpConfig.Data;
await using (var ctx = _db.GetDbContext())
{
if (conf.CurrencyPerXp > 0)
{
foreach (var user in globalToAdd)
{
var amount = (long)(user.Value.XpAmount * conf.CurrencyPerXp);
if (amount > 0)
await _cs.AddAsync(user.Key, amount, null);
}
}
await lctx.ExecuteAsync(
$"""
INSERT INTO ${nameof(DiscordUser)}
SELECT ${nameof(UserXpBatch.GuildId)}, ${nameof(UserXpBatch.UserId)}, ${nameof(UserXpBatch.UserName)}, ${nameof(UserXpBatch.AvatarId)}, ${xpAmount}
FROM ${nameof(UserXpBatch)}
ON CONFLICT(${nameof(DiscordUser.UserId)}, ${nameof(DiscordUser.UserId)}) DO UPDATE SET
${nameof(DiscordUser.Username)} = ${nameof(UserXpBatch)}.${nameof(UserXpBatch.UserName)}
${nameof(DiscordUser.AvatarId)} = ${nameof(UserXpBatch)}.${nameof(UserXpBatch.AvatarId)}
${nameof(DiscordUser.TotalXp)} = ${nameof(DiscordUser.TotalXp)} + ${xpAmount}
RETURNING *;
""");
// update global user xp in batches
// group by xp amount and update the same amounts at the same time
foreach (var group in globalToAdd.GroupBy(x => x.Value.XpAmount, x => x.Key))
{
var items = await ctx.Set<DiscordUser>()
.Where(x => group.Contains(x.UserId))
.UpdateWithOutputAsync(old => new()
{
TotalXp = old.TotalXp + group.Key
},
(_, n) => n);
await ctx.Set<ClubInfo>()
.Where(x => x.Members.Any(m => group.Contains(m.UserId)))
.UpdateAsync(old => new()
{
Xp = old.Xp + (group.Key * old.Members.Count(m => group.Contains(m.UserId)))
});
dus.AddRange(items);
}
// update guild user xp in batches
foreach (var (guildId, toAdd) in guildToAdd)
{
foreach (var group in toAdd.GroupBy(x => x.Value.XpAmount, x => x.Key))
{
var items = await ctx
.Set<UserXpStats>()
.Where(x => x.GuildId == guildId)
.Where(x => group.Contains(x.UserId))
.UpdateWithOutputAsync(old => new()
{
Xp = old.Xp + group.Key
},
(_, n) => n);
gxps.AddRange(items);
var missingUserIds = group.Where(userId => !items.Any(x => x.UserId == userId)).ToArray();
foreach (var userId in missingUserIds)
{
await ctx
.Set<UserXpStats>()
.ToLinqToDBTable()
.InsertOrUpdateAsync(() => new UserXpStats()
{
UserId = userId,
GuildId = guildId,
Xp = group.Key,
DateAdded = DateTime.UtcNow,
},
_ => new()
{
},
() => new()
{
UserId = userId
});
}
if (missingUserIds.Length > 0)
{
var missingItems = await ctx.Set<UserXpStats>()
.ToLinqToDBTable()
.Where(x => missingUserIds.Contains(x.UserId))
.ToArrayAsyncLinqToDB();
gxps.AddRange(missingItems);
}
}
}
}
foreach (var du in dus)
{
var oldLevel = new LevelStats(du.TotalXp - globalToAdd[du.UserId].XpAmount);
var newLevel = new LevelStats(du.TotalXp);
if (oldLevel.Level != newLevel.Level)
{
var item = globalToAdd[du.UserId];
await _levelUpQueue.EnqueueAsync(
NotifyUser(item.Guild.Id,
item.Channel.Id,
du.UserId,
false,
oldLevel.Level,
newLevel.Level));
}
}
foreach (var du in gxps)
{
if (guildToAdd.TryGetValue(du.GuildId, out var users)
&& users.TryGetValue(du.UserId, out var xpGainData))
{
var oldLevel = new LevelStats(du.Xp - xpGainData.XpAmount);
var newLevel = new LevelStats(du.Xp);
if (oldLevel.Level < newLevel.Level)
{
await _levelUpQueue.EnqueueAsync(
NotifyUser(xpGainData.Guild.Id,
xpGainData.Channel.Id,
du.UserId,
true,
oldLevel.Level,
newLevel.Level));
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error In the XP update loop");
}
// todo send notifications
}
private Func<Task> NotifyUser(
@ -804,13 +699,7 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
if (!await SetUserRewardedAsync(user.Id))
return;
await _xpGainQueue.Writer.WriteAsync(new()
{
Guild = user.Guild,
Channel = arg.Channel,
User = user,
XpAmount = xp
});
_usersBatch.Add(user);
});
return Task.CompletedTask;
}
@ -1618,3 +1507,11 @@ public class XpService : IEService, IReadyExecutor, IExecNoCommand
});
}
}
public sealed class UserXpBatch
{
[Key] public ulong UserId { get; set; }
public ulong GuildId { get; set; }
public string UserName { get; set; } = string.Empty;
public string AvatarId { get; set; } = string.Empty;
}