[#24] Client: Implement pool part1

first iteration - base classes and methods

Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
Pavel Gross 2024-10-21 10:48:00 +03:00
parent d1271df207
commit c9a75ea025
72 changed files with 2786 additions and 468 deletions

View file

@ -0,0 +1,651 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Frostfs.V2.Ape;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
public partial class Pool : IFrostFSClient
{
const int defaultSessionTokenExpirationDuration = 100; // in epochs
const int defaultErrorThreshold = 100;
const int defaultGracefulCloseOnSwitchTimeout = 10; //Seconds;
const int defaultRebalanceInterval = 15; //Seconds;
const int defaultHealthcheckTimeout = 4; //Seconds;
const int defaultDialTimeout = 5; //Seconds;
const int defaultStreamTimeout = 10; //Seconds;
private readonly object _lock = new();
private InnerPool[]? InnerPools { get; set; }
private ECDsa Key { get; set; }
private byte[] PublicKey { get; }
private OwnerID? _ownerId;
private FrostFsOwner? _owner;
private FrostFsOwner Owner
{
get
{
_owner ??= new FrostFsOwner(Key.PublicKey().PublicKeyToAddress());
return _owner;
}
}
private OwnerID OwnerId
{
get
{
if (_ownerId == null)
{
_owner = new FrostFsOwner(Key.PublicKey().PublicKeyToAddress());
_ownerId = _owner.ToMessage();
}
return _ownerId;
}
}
internal CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
private SessionCache Cache { get; set; }
private ulong SessionTokenDuration { get; set; }
private RebalanceParameters RebalanceParams { get; set; }
private Func<string, ClientWrapper> ClientBuilder;
private bool disposedValue;
private ILogger? Logger { get; set; }
private ulong MaxObjectSize { get; set; }
public IClientStatus? ClientStatus { get; }
// NewPool creates connection pool using parameters.
public Pool(InitParameters options)
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}
if (options.Key == null)
{
throw new FrostFsException($"Missed required parameter {nameof(options.Key)}");
}
var nodesParams = AdjustNodeParams(options.NodeParams);
var cache = new SessionCache(options.SessionExpirationDuration);
FillDefaultInitParams(options, cache);
Key = options.Key;
PublicKey = Key.PublicKey();
Cache = cache;
Logger = options.Logger;
SessionTokenDuration = options.SessionExpirationDuration;
RebalanceParams = new RebalanceParameters(
nodesParams.ToArray(),
options.HealthcheckTimeout,
options.ClientRebalanceInterval,
options.SessionExpirationDuration);
ClientBuilder = options.ClientBuilder!;
}
private void SetupContext(CallContext ctx)
{
if (ctx == null)
{
throw new ArgumentNullException(nameof(ctx));
}
ctx.Key ??= Key;
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
public async Task<string?> Dial(CallContext ctx)
{
SetupContext(ctx);
var inner = new InnerPool[RebalanceParams.NodesParams.Length];
bool atLeastOneHealthy = false;
int i = 0;
foreach (var nodeParams in RebalanceParams.NodesParams)
{
var clients = new ClientWrapper[nodeParams.Weights.Count];
for (int j = 0; j < nodeParams.Addresses.Count; j++)
{
var client = ClientBuilder(nodeParams.Addresses[j]);
clients[j] = client;
var error = await client.Dial(ctx).ConfigureAwait(false);
if (!string.IsNullOrEmpty(error))
{
Logger?.LogWarning("Failed to build client. Address {Address}, {Error})", client.WrapperPrm.Address, error);
continue;
}
try
{
var token = await InitSessionForDuration(ctx, client, RebalanceParams.SessionExpirationDuration, Key, false)
.ConfigureAwait(false);
var key = FormCacheKey(nodeParams.Addresses[j], Key, false);
_ = Cache.Cache[key] = token;
}
catch (FrostFsException ex)
{
client.StatusMonitor.SetUnhealthy();
Logger?.LogWarning("Failed to create frostfs session token for client. Address {Address}, {Error})",
client.WrapperPrm.Address, ex.Message);
continue;
}
atLeastOneHealthy = true;
}
var sampler = new Sampler(nodeParams.Weights.ToArray());
inner[i] = new InnerPool(sampler, clients);
}
if (!atLeastOneHealthy)
return "at least one node must be healthy";
InnerPools = inner;
var res = await GetNetworkSettingsAsync(new PrmNetworkSettings { Context = ctx }).ConfigureAwait(false);
MaxObjectSize = res.MaxObjectSize;
StartRebalance(ctx);
return null;
}
private static IEnumerable<NodesParam> AdjustNodeParams(NodeParam[]? nodeParams)
{
if (nodeParams == null || nodeParams.Length == 0)
{
throw new ArgumentException("No FrostFS peers configured");
}
Dictionary<int, NodesParam> nodesParamsDict = new(nodeParams.Length);
foreach (var nodeParam in nodeParams)
{
if (!nodesParamsDict.TryGetValue(nodeParam.Priority, out var nodes))
{
nodes = new NodesParam(nodeParam.Priority);
nodesParamsDict[nodeParam.Priority] = nodes;
}
nodes.Addresses.Add(nodeParam.Address);
nodes.Weights.Add(nodeParam.Weight);
}
var nodesParams = new List<NodesParam>(nodesParamsDict.Count);
foreach (var key in nodesParamsDict.Keys)
{
var nodes = nodesParamsDict[key];
var newWeights = AdjustWeights([.. nodes.Weights]);
nodes.Weights.Clear();
foreach (var weight in newWeights)
{
nodes.Weights.Add(weight);
}
nodesParams.Add(nodes);
}
return nodesParams.OrderBy(n => n.Priority);
}
private static double[] AdjustWeights(double[] weights)
{
var adjusted = new double[weights.Length];
var sum = weights.Sum();
if (sum > 0)
{
for (int i = 0; i < weights.Length; i++)
{
adjusted[i] = weights[i] / sum;
}
}
return adjusted;
}
private static void FillDefaultInitParams(InitParameters parameters, SessionCache cache)
{
if (parameters.SessionExpirationDuration == 0)
parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration;
if (parameters.ErrorThreshold == 0)
parameters.ErrorThreshold = defaultErrorThreshold;
if (parameters.ClientRebalanceInterval <= 0)
parameters.ClientRebalanceInterval = defaultRebalanceInterval;
if (parameters.GracefulCloseOnSwitchTimeout <= 0)
parameters.GracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout;
if (parameters.HealthcheckTimeout <= 0)
parameters.HealthcheckTimeout = defaultHealthcheckTimeout;
if (parameters.NodeDialTimeout <= 0)
parameters.NodeDialTimeout = defaultDialTimeout;
if (parameters.NodeStreamTimeout <= 0)
parameters.NodeStreamTimeout = defaultStreamTimeout;
if (cache.TokenDuration == 0)
cache.TokenDuration = defaultSessionTokenExpirationDuration;
parameters.ClientBuilder ??= new Func<string, ClientWrapper>((address) =>
{
var wrapperPrm = new WrapperPrm
{
Address = address,
Key = parameters.Key,
Logger = parameters.Logger,
DialTimeout = parameters.NodeDialTimeout,
StreamTimeout = parameters.NodeStreamTimeout,
ErrorThreshold = parameters.ErrorThreshold,
GracefulCloseOnSwitchTimeout = parameters.GracefulCloseOnSwitchTimeout
};
return new ClientWrapper(wrapperPrm);
}
);
}
private FrostFSClient? Сonnection()
{
foreach (var pool in InnerPools!)
{
var client = pool.Connection();
if (client != null)
{
return client.Client;
}
}
return null;
}
private static async Task<FrostFsSessionToken?> InitSessionForDuration(CallContext ctx, ClientWrapper cw, ulong duration, ECDsa key, bool clientCut)
{
var client = cw.Client;
var networkInfo = await client!.GetNetworkSettingsAsync(new PrmNetworkSettings { Context = ctx }).ConfigureAwait(false);
var epoch = networkInfo.Epoch;
ulong exp = ulong.MaxValue - epoch < duration
? ulong.MaxValue
: epoch + duration;
var prmSessionCreate = new PrmSessionCreate(exp) { Context = ctx };
return await client.CreateSessionAsync(prmSessionCreate).ConfigureAwait(false);
}
private static string FormCacheKey(string address, ECDsa key, bool clientCut)
{
var k = key.PrivateKey;
var stype = clientCut ? "client" : "server";
return $"{address}{stype}{k}";
}
public void Close()
{
CancellationTokenSource.Cancel();
if (InnerPools != null)
{
// close all clients
foreach (var innerPool in InnerPools)
foreach (var client in innerPool.Clients)
if (client.StatusMonitor.IsDialed())
client.Client?.Close();
}
}
// startRebalance runs loop to monitor connection healthy status.
internal void StartRebalance(CallContext ctx)
{
var buffers = new double[RebalanceParams.NodesParams.Length][];
for (int i = 0; i < RebalanceParams.NodesParams.Length; i++)
{
var parameters = this.RebalanceParams.NodesParams[i];
buffers[i] = new double[parameters.Weights.Count];
Task.Run(async () =>
{
await Task.Delay((int)RebalanceParams.ClientRebalanceInterval).ConfigureAwait(false);
UpdateNodesHealth(ctx, buffers);
});
}
}
private void UpdateNodesHealth(CallContext ctx, double[][] buffers)
{
var tasks = new Task[InnerPools!.Length];
for (int i = 0; i < InnerPools.Length; i++)
{
var bufferWeights = buffers[i];
tasks[i] = Task.Run(() => UpdateInnerNodesHealth(ctx, i, bufferWeights));
}
Task.WaitAll(tasks);
}
private async ValueTask UpdateInnerNodesHealth(CallContext ctx, int poolIndex, double[] bufferWeights)
{
if (poolIndex > InnerPools!.Length - 1)
{
return;
}
var pool = InnerPools[poolIndex];
var options = RebalanceParams;
int healthyChanged = 0;
var tasks = new Task[pool.Clients.Length];
for (int j = 0; j < pool.Clients.Length; j++)
{
var client = pool.Clients[j];
var healthy = false;
string? error = null;
var changed = false;
try
{
// check timeout settings
changed = await client.RestartIfUnhealthy(ctx).ConfigureAwait(false);
healthy = true;
bufferWeights[j] = options.NodesParams[poolIndex].Weights[j];
}
catch (FrostFsException e)
{
error = e.Message;
bufferWeights[j] = 0;
Cache.DeleteByPrefix(client.StatusMonitor.Address);
}
if (changed)
{
StringBuilder fields = new($"address {client.StatusMonitor.Address}, healthy {healthy}");
if (string.IsNullOrEmpty(error))
{
fields.Append($", reason {error}");
Logger?.Log(LogLevel.Warning, "Health has changed: {Fields}", fields.ToString());
Interlocked.Exchange(ref healthyChanged, 1);
}
}
await Task.WhenAll(tasks).ConfigureAwait(false);
if (Interlocked.CompareExchange(ref healthyChanged, -1, -1) == 1)
{
var probabilities = AdjustWeights(bufferWeights);
lock (_lock)
{
pool.Sampler = new Sampler(probabilities);
}
}
}
}
private bool CheckSessionTokenErr(Exception error, string address)
{
if (error == null)
{
return false;
}
if (error is SessionNotFoundException || error is SessionExpiredException)
{
this.Cache.DeleteByPrefix(address);
return true;
}
return false;
}
public Statistic Statistic()
{
if (InnerPools == null)
{
throw new InvalidObjectException(nameof(Pool));
}
var statistics = new Statistic();
foreach (var inner in InnerPools)
{
int nodeIndex = 0;
int valueIndex = 0;
var nodes = new string[inner.Clients.Length];
lock (_lock)
{
foreach (var client in inner.Clients)
{
if (client.StatusMonitor.IsHealthy())
{
nodes[valueIndex++] = client.StatusMonitor.Address;
}
var node = new NodeStatistic
{
Address = client.StatusMonitor.Address,
Methods = client.StatusMonitor.MethodsStatus(),
OverallErrors = client.StatusMonitor.GetOverallErrorRate(),
CurrentErrors = client.StatusMonitor.GetCurrentErrorRate()
};
statistics.Nodes[nodeIndex++] = node;
statistics.OverallErrors += node.OverallErrors;
}
if (statistics.CurrentNodes == null || statistics.CurrentNodes.Length == 0)
{
statistics.CurrentNodes = nodes;
}
}
}
return statistics;
}
public async Task<FrostFsNetmapSnapshot> GetNetmapSnapshotAsync(PrmNetmapSnapshot? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetNetmapSnapshotAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsNodeInfo> GetNodeInfoAsync(PrmNodeInfo? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetNodeInfoAsync(args).ConfigureAwait(false);
}
public async Task<NetworkSettings> GetNetworkSettingsAsync(PrmNetworkSettings? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetNetworkSettingsAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.CreateSessionAsync(args).ConfigureAwait(false);
}
public async Task<byte[]> AddChainAsync(PrmApeChainAdd args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.AddChainAsync(args).ConfigureAwait(false);
}
public async Task RemoveChainAsync(PrmApeChainRemove args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
await client.RemoveChainAsync(args).ConfigureAwait(false);
}
public async Task<Chain[]> ListChainAsync(PrmApeChainList args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.ListChainAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsContainerInfo> GetContainerAsync(PrmContainerGet args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetContainerAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsContainerId> ListContainersAsync(PrmContainerGetAll? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return client.ListContainersAsync(args);
}
public async Task<FrostFsContainerId> CreateContainerAsync(PrmContainerCreate args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.CreateContainerAsync(args).ConfigureAwait(false);
}
public async Task DeleteContainerAsync(PrmContainerDelete args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
await client.DeleteContainerAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetObjectHeadAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.PutObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.PutSingleObjectAsync(args).ConfigureAwait(false);
}
public async Task DeleteObjectAsync(PrmObjectDelete args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
await client.DeleteObjectAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return client.SearchObjectsAsync(args);
}
public async Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args = null)
{
var client = Сonnection() ?? throw new FrostFsException("Cannot find alive client");
return await client.GetBalanceAsync(args).ConfigureAwait(false);
}
public bool RestartIfUnhealthy(CallContext ctx)
{
throw new NotImplementedException();
}
public bool IsHealthy()
{
throw new NotImplementedException();
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
Close();
}
disposedValue = true;
}
}
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
public FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx)
{
throw new NotImplementedException();
}
}