[#24] Client: Implement pool part2

Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
Pavel Gross 2024-11-01 10:30:28 +03:00
parent c9a75ea025
commit ee20798379
63 changed files with 801 additions and 526 deletions

View file

@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -13,6 +12,8 @@ using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using Grpc.Core;
using Microsoft.Extensions.Logging;
@ -36,7 +37,7 @@ public partial class Pool : IFrostFSClient
private ECDsa Key { get; set; }
private byte[] PublicKey { get; }
private string PublicKey { get; }
private OwnerID? _ownerId;
private FrostFsOwner? _owner;
@ -65,7 +66,7 @@ public partial class Pool : IFrostFSClient
internal CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
private SessionCache Cache { get; set; }
internal SessionCache SessionCache { get; set; }
private ulong SessionTokenDuration { get; set; }
@ -75,7 +76,7 @@ public partial class Pool : IFrostFSClient
private bool disposedValue;
private ILogger? Logger { get; set; }
private ILogger? logger { get; set; }
private ulong MaxObjectSize { get; set; }
@ -91,20 +92,20 @@ public partial class Pool : IFrostFSClient
if (options.Key == null)
{
throw new FrostFsException($"Missed required parameter {nameof(options.Key)}");
throw new ArgumentException($"Missed required parameter {nameof(options.Key)}");
}
var nodesParams = AdjustNodeParams(options.NodeParams);
var cache = new SessionCache(options.SessionExpirationDuration);
FillDefaultInitParams(options, cache);
FillDefaultInitParams(options, this);
Key = options.Key;
PublicKey = Key.PublicKey();
PublicKey = $"{Key.PublicKey()}";
Cache = cache;
Logger = options.Logger;
SessionCache = cache;
logger = options.Logger;
SessionTokenDuration = options.SessionExpirationDuration;
RebalanceParams = new RebalanceParameters(
@ -148,47 +149,54 @@ public partial class Pool : IFrostFSClient
for (int j = 0; j < nodeParams.Addresses.Count; j++)
{
var client = ClientBuilder(nodeParams.Addresses[j]);
clients[j] = client;
var error = await client.Dial(ctx).ConfigureAwait(false);
if (!string.IsNullOrEmpty(error))
{
Logger?.LogWarning("Failed to build client. Address {Address}, {Error})", client.WrapperPrm.Address, error);
continue;
}
ClientWrapper? client = null;
bool dialed = false;
try
{
client = clients[j] = ClientBuilder(nodeParams.Addresses[j]);
await client.Dial(ctx).ConfigureAwait(false);
dialed = true;
var token = await InitSessionForDuration(ctx, client, RebalanceParams.SessionExpirationDuration, Key, false)
.ConfigureAwait(false);
var key = FormCacheKey(nodeParams.Addresses[j], Key, false);
_ = Cache.Cache[key] = token;
var key = FormCacheKey(nodeParams.Addresses[j], Key.PrivateKey().ToString());
_ = SessionCache.Cache[key] = token;
atLeastOneHealthy = true;
}
catch (FrostFsException ex)
catch (RpcException ex)
{
client.StatusMonitor.SetUnhealthy();
Logger?.LogWarning("Failed to create frostfs session token for client. Address {Address}, {Error})",
client.WrapperPrm.Address, ex.Message);
if (!dialed)
client!.SetUnhealthyOnDial();
else
client!.SetUnhealthy();
continue;
if (logger != null)
{
FrostFsMessages.SessionCreationError(logger, client!.WrapperPrm.Address, ex.Message);
}
}
catch (FrostFsInvalidObjectException)
{
break;
}
atLeastOneHealthy = true;
}
var sampler = new Sampler(nodeParams.Weights.ToArray());
inner[i] = new InnerPool(sampler, clients);
i++;
}
if (!atLeastOneHealthy)
return "at least one node must be healthy";
return "At least one node must be healthy";
InnerPools = inner;
var res = await GetNetworkSettingsAsync(new PrmNetworkSettings { Context = ctx }).ConfigureAwait(false);
var res = await GetNetworkSettingsAsync(new PrmNetworkSettings(ctx)).ConfigureAwait(false);
MaxObjectSize = res.MaxObjectSize;
@ -252,7 +260,7 @@ public partial class Pool : IFrostFSClient
return adjusted;
}
private static void FillDefaultInitParams(InitParameters parameters, SessionCache cache)
private static void FillDefaultInitParams(InitParameters parameters, Pool pool)
{
if (parameters.SessionExpirationDuration == 0)
parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration;
@ -275,8 +283,8 @@ public partial class Pool : IFrostFSClient
if (parameters.NodeStreamTimeout <= 0)
parameters.NodeStreamTimeout = defaultStreamTimeout;
if (cache.TokenDuration == 0)
cache.TokenDuration = defaultSessionTokenExpirationDuration;
if (parameters.SessionExpirationDuration == 0)
parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration;
parameters.ClientBuilder ??= new Func<string, ClientWrapper>((address) =>
{
@ -291,29 +299,29 @@ public partial class Pool : IFrostFSClient
GracefulCloseOnSwitchTimeout = parameters.GracefulCloseOnSwitchTimeout
};
return new ClientWrapper(wrapperPrm);
return new ClientWrapper(wrapperPrm, pool);
}
);
}
private FrostFSClient? Сonnection()
private ClientWrapper Сonnection()
{
foreach (var pool in InnerPools!)
{
var client = pool.Connection();
if (client != null)
{
return client.Client;
return client;
}
}
return null;
throw new FrostFsException("Cannot find alive client");
}
private static async Task<FrostFsSessionToken?> InitSessionForDuration(CallContext ctx, ClientWrapper cw, ulong duration, ECDsa key, bool clientCut)
{
var client = cw.Client;
var networkInfo = await client!.GetNetworkSettingsAsync(new PrmNetworkSettings { Context = ctx }).ConfigureAwait(false);
var networkInfo = await client!.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx)).ConfigureAwait(false);
var epoch = networkInfo.Epoch;
@ -321,17 +329,14 @@ public partial class Pool : IFrostFSClient
? ulong.MaxValue
: epoch + duration;
var prmSessionCreate = new PrmSessionCreate(exp) { Context = ctx };
var prmSessionCreate = new PrmSessionCreate(exp, ctx);
return await client.CreateSessionAsync(prmSessionCreate).ConfigureAwait(false);
}
private static string FormCacheKey(string address, ECDsa key, bool clientCut)
internal static string FormCacheKey(string address, string key)
{
var k = key.PrivateKey;
var stype = clientCut ? "client" : "server";
return $"{address}{stype}{k}";
return $"{address}{key}";
}
public void Close()
@ -343,7 +348,7 @@ public partial class Pool : IFrostFSClient
// close all clients
foreach (var innerPool in InnerPools)
foreach (var client in innerPool.Clients)
if (client.StatusMonitor.IsDialed())
if (client.IsDialed())
client.Client?.Close();
}
}
@ -355,7 +360,7 @@ public partial class Pool : IFrostFSClient
for (int i = 0; i < RebalanceParams.NodesParams.Length; i++)
{
var parameters = this.RebalanceParams.NodesParams[i];
var parameters = RebalanceParams.NodesParams[i];
buffers[i] = new double[parameters.Weights.Count];
Task.Run(async () =>
@ -405,25 +410,27 @@ public partial class Pool : IFrostFSClient
try
{
// check timeout settings
changed = await client.RestartIfUnhealthy(ctx).ConfigureAwait(false);
changed = await client!.RestartIfUnhealthy(ctx).ConfigureAwait(false);
healthy = true;
bufferWeights[j] = options.NodesParams[poolIndex].Weights[j];
}
// TODO: specify
catch (FrostFsException e)
{
error = e.Message;
bufferWeights[j] = 0;
Cache.DeleteByPrefix(client.StatusMonitor.Address);
SessionCache.DeleteByPrefix(client.Address);
}
if (changed)
{
StringBuilder fields = new($"address {client.StatusMonitor.Address}, healthy {healthy}");
if (string.IsNullOrEmpty(error))
if (!string.IsNullOrEmpty(error))
{
fields.Append($", reason {error}");
Logger?.Log(LogLevel.Warning, "Health has changed: {Fields}", fields.ToString());
if (logger != null)
{
FrostFsMessages.HealthChanged(logger, client.Address, healthy, error!);
}
Interlocked.Exchange(ref healthyChanged, 1);
}
@ -443,6 +450,8 @@ public partial class Pool : IFrostFSClient
}
}
// TODO: remove
private bool CheckSessionTokenErr(Exception error, string address)
{
if (error == null)
@ -452,7 +461,7 @@ public partial class Pool : IFrostFSClient
if (error is SessionNotFoundException || error is SessionExpiredException)
{
this.Cache.DeleteByPrefix(address);
this.SessionCache.DeleteByPrefix(address);
return true;
}
@ -463,14 +472,13 @@ public partial class Pool : IFrostFSClient
{
if (InnerPools == null)
{
throw new InvalidObjectException(nameof(Pool));
throw new FrostFsInvalidObjectException(nameof(Pool));
}
var statistics = new Statistic();
foreach (var inner in InnerPools)
{
int nodeIndex = 0;
int valueIndex = 0;
var nodes = new string[inner.Clients.Length];
@ -478,20 +486,22 @@ public partial class Pool : IFrostFSClient
{
foreach (var client in inner.Clients)
{
if (client.StatusMonitor.IsHealthy())
if (client.IsHealthy())
{
nodes[valueIndex++] = client.StatusMonitor.Address;
nodes[valueIndex] = client.Address;
}
var node = new NodeStatistic
{
Address = client.StatusMonitor.Address,
Methods = client.StatusMonitor.MethodsStatus(),
OverallErrors = client.StatusMonitor.GetOverallErrorRate(),
CurrentErrors = client.StatusMonitor.GetCurrentErrorRate()
Address = client.Address,
Methods = client.MethodsStatus(),
OverallErrors = client.GetOverallErrorRate(),
CurrentErrors = client.GetCurrentErrorRate()
};
statistics.Nodes[nodeIndex++] = node;
statistics.Nodes.Add(node);
valueIndex++;
statistics.OverallErrors += node.OverallErrors;
}
@ -508,120 +518,234 @@ public partial class Pool : IFrostFSClient
public async Task<FrostFsNetmapSnapshot> GetNetmapSnapshotAsync(PrmNetmapSnapshot? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetNetmapSnapshotAsync(args).ConfigureAwait(false);
var client = Сonnection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNetmapSnapshotAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsNodeInfo> GetNodeInfoAsync(PrmNodeInfo? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetNodeInfoAsync(args).ConfigureAwait(false);
var client = Сonnection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNodeInfoAsync(args).ConfigureAwait(false);
}
public async Task<NetworkSettings> GetNetworkSettingsAsync(PrmNetworkSettings? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetNetworkSettingsAsync(args).ConfigureAwait(false);
var client = Сonnection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNetworkSettingsAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.CreateSessionAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.CreateSessionAsync(args).ConfigureAwait(false);
}
public async Task<byte[]> AddChainAsync(PrmApeChainAdd args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.AddChainAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.AddChainAsync(args).ConfigureAwait(false);
}
public async Task RemoveChainAsync(PrmApeChainRemove args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
await client.RemoveChainAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.RemoveChainAsync(args).ConfigureAwait(false);
}
public async Task<Chain[]> ListChainAsync(PrmApeChainList args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.ListChainAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.ListChainAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsContainerInfo> GetContainerAsync(PrmContainerGet args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetContainerAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetContainerAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsContainerId> ListContainersAsync(PrmContainerGetAll? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return client.ListContainersAsync(args);
var client = Сonnection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return client.Client!.ListContainersAsync(args);
}
public async Task<FrostFsContainerId> CreateContainerAsync(PrmContainerCreate args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.CreateContainerAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.CreateContainerAsync(args).ConfigureAwait(false);
}
public async Task DeleteContainerAsync(PrmContainerDelete args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
await client.DeleteContainerAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.DeleteContainerAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetObjectHeadAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetObjectHeadAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetObjectAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.PutObjectAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.PutObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.PutSingleObjectAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.PutSingleObjectAsync(args).ConfigureAwait(false);
}
public async Task DeleteObjectAsync(PrmObjectDelete args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
await client.DeleteObjectAsync(args).ConfigureAwait(false);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.DeleteObjectAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return client.SearchObjectsAsync(args);
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Сonnection();
args.Context.PoolErrorHandler = client.HandleError;
return client.Client!.SearchObjectsAsync(args);
}
public async Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args = null)
public async Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetBalanceAsync(args).ConfigureAwait(false);
}
var client = Сonnection();
public bool RestartIfUnhealthy(CallContext ctx)
{
throw new NotImplementedException();
}
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
public bool IsHealthy()
{
throw new NotImplementedException();
return await client.Client!.GetBalanceAsync(args).ConfigureAwait(false);
}
protected virtual void Dispose(bool disposing)