using System; using System.Collections.Generic; using System.Linq; using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; using Frostfs.V2.Ape; using FrostFS.Refs; using FrostFS.SDK.Client.Interfaces; using FrostFS.SDK.Client.Mappers.GRPC; using FrostFS.SDK.Cryptography; using Grpc.Core; using Microsoft.Extensions.Logging; namespace FrostFS.SDK.Client; public partial class Pool : IFrostFSClient { const int defaultSessionTokenExpirationDuration = 100; // in epochs const int defaultErrorThreshold = 100; const int defaultGracefulCloseOnSwitchTimeout = 10; //Seconds; const int defaultRebalanceInterval = 15; //Seconds; const int defaultHealthcheckTimeout = 4; //Seconds; const int defaultDialTimeout = 5; //Seconds; const int defaultStreamTimeout = 10; //Seconds; private readonly object _lock = new(); private InnerPool[]? InnerPools { get; set; } private ClientKey Key { get; set; } private OwnerID? _ownerId; private FrostFsOwner? _owner; private FrostFsOwner Owner { get { _owner ??= new FrostFsOwner(Key.ECDsaKey.PublicKey().PublicKeyToAddress()); return _owner; } } private OwnerID OwnerId { get { if (_ownerId == null) { _owner = Key.Owner; _ownerId = _owner.ToMessage(); } return _ownerId; } } internal CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource(); internal SessionCache SessionCache { get; set; } private ulong SessionTokenDuration { get; set; } private RebalanceParameters RebalanceParams { get; set; } private Func ClientBuilder; private bool disposedValue; private ILogger? logger { get; set; } private ulong MaxObjectSize { get; set; } public IClientStatus? ClientStatus { get; } // NewPool creates connection pool using parameters. public Pool(InitParameters options) { if (options is null) { throw new ArgumentNullException(nameof(options)); } if (options.Key == null) { throw new ArgumentException($"Missed required parameter {nameof(options.Key)}"); } var nodesParams = AdjustNodeParams(options.NodeParams); var cache = new SessionCache(options.SessionExpirationDuration); FillDefaultInitParams(options, this); Key = new ClientKey(options.Key); SessionCache = cache; logger = options.Logger; SessionTokenDuration = options.SessionExpirationDuration; RebalanceParams = new RebalanceParameters( nodesParams.ToArray(), options.HealthcheckTimeout, options.ClientRebalanceInterval, options.SessionExpirationDuration); ClientBuilder = options.ClientBuilder!; // ClientContext.PoolErrorHandler = client.HandleError; } // Dial establishes a connection to the servers from the FrostFS network. // It also starts a routine that checks the health of the nodes and // updates the weights of the nodes for balancing. // Returns an error describing failure reason. // // If failed, the Pool SHOULD NOT be used. // // See also InitParameters.SetClientRebalanceInterval. public async Task Dial(CallContext ctx) { var inner = new InnerPool[RebalanceParams.NodesParams.Length]; bool atLeastOneHealthy = false; int i = 0; foreach (var nodeParams in RebalanceParams.NodesParams) { var wrappers = new ClientWrapper[nodeParams.Weights.Count]; for (int j = 0; j < nodeParams.Addresses.Count; j++) { ClientWrapper? wrapper = null; bool dialed = false; try { wrapper = wrappers[j] = ClientBuilder(nodeParams.Addresses[j]); await wrapper.Dial(ctx).ConfigureAwait(false); dialed = true; var token = await InitSessionForDuration(ctx, wrapper, RebalanceParams.SessionExpirationDuration, Key.ECDsaKey, false) .ConfigureAwait(false); var key = FormCacheKey(nodeParams.Addresses[j], Key.PublicKey); SessionCache.SetValue(key, token); atLeastOneHealthy = true; } catch (RpcException ex) { if (!dialed) wrapper!.SetUnhealthyOnDial(); else wrapper!.SetUnhealthy(); if (logger != null) { FrostFsMessages.SessionCreationError(logger, wrapper!.WrapperPrm.Address, ex.Message); } } catch (FrostFsInvalidObjectException) { break; } } var sampler = new Sampler(nodeParams.Weights.ToArray()); inner[i] = new InnerPool(sampler, wrappers); i++; } if (!atLeastOneHealthy) return "At least one node must be healthy"; InnerPools = inner; var res = await GetNetworkSettingsAsync(default).ConfigureAwait(false); MaxObjectSize = res.MaxObjectSize; StartRebalance(ctx); return null; } private static IEnumerable AdjustNodeParams(NodeParam[]? nodeParams) { if (nodeParams == null || nodeParams.Length == 0) { throw new ArgumentException("No FrostFS peers configured"); } Dictionary nodesParamsDict = new(nodeParams.Length); foreach (var nodeParam in nodeParams) { if (!nodesParamsDict.TryGetValue(nodeParam.Priority, out var nodes)) { nodes = new NodesParam(nodeParam.Priority); nodesParamsDict[nodeParam.Priority] = nodes; } nodes.Addresses.Add(nodeParam.Address); nodes.Weights.Add(nodeParam.Weight); } var nodesParams = new List(nodesParamsDict.Count); foreach (var key in nodesParamsDict.Keys) { var nodes = nodesParamsDict[key]; var newWeights = AdjustWeights([.. nodes.Weights]); nodes.Weights.Clear(); foreach (var weight in newWeights) { nodes.Weights.Add(weight); } nodesParams.Add(nodes); } return nodesParams.OrderBy(n => n.Priority); } private static double[] AdjustWeights(double[] weights) { var adjusted = new double[weights.Length]; var sum = weights.Sum(); if (sum > 0) { for (int i = 0; i < weights.Length; i++) { adjusted[i] = weights[i] / sum; } } return adjusted; } private static void FillDefaultInitParams(InitParameters parameters, Pool pool) { if (parameters.SessionExpirationDuration == 0) parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration; if (parameters.ErrorThreshold == 0) parameters.ErrorThreshold = defaultErrorThreshold; if (parameters.ClientRebalanceInterval <= 0) parameters.ClientRebalanceInterval = defaultRebalanceInterval; if (parameters.GracefulCloseOnSwitchTimeout <= 0) parameters.GracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout; if (parameters.HealthcheckTimeout <= 0) parameters.HealthcheckTimeout = defaultHealthcheckTimeout; if (parameters.NodeDialTimeout <= 0) parameters.NodeDialTimeout = defaultDialTimeout; if (parameters.NodeStreamTimeout <= 0) parameters.NodeStreamTimeout = defaultStreamTimeout; if (parameters.SessionExpirationDuration == 0) parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration; parameters.ClientBuilder ??= new Func((address) => { var wrapperPrm = new WrapperPrm() { Address = address, Key = parameters.Key!, Logger = parameters.Logger, DialTimeout = parameters.NodeDialTimeout, StreamTimeout = parameters.NodeStreamTimeout, ErrorThreshold = parameters.ErrorThreshold, GracefulCloseOnSwitchTimeout = parameters.GracefulCloseOnSwitchTimeout, Callback = parameters.Callback, Interceptors = parameters.Interceptors, GrpcChannelFactory = parameters.GrpcChannelFactory }; return new ClientWrapper(wrapperPrm, pool); } ); } private ClientWrapper Connection() { foreach (var pool in InnerPools!) { var client = pool.Connection(); if (client != null) { return client; } } throw new FrostFsException("Cannot find alive client"); } private static async Task InitSessionForDuration(CallContext ctx, ClientWrapper cw, ulong duration, ECDsa key, bool clientCut) { var client = cw.Client; var networkInfo = await client!.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); var epoch = networkInfo.Epoch; ulong exp = ulong.MaxValue - epoch < duration ? ulong.MaxValue : epoch + duration; var prmSessionCreate = new PrmSessionCreate(exp); return await client.CreateSessionAsync(prmSessionCreate, ctx).ConfigureAwait(false); } internal static string FormCacheKey(string address, string key) { return $"{address}{key}"; } public void Close() { CancellationTokenSource.Cancel(); //if (InnerPools != null) //{ // // close all clients // foreach (var innerPool in InnerPools) // foreach (var client in innerPool.Clients) // if (client.IsDialed()) // client.Client?.Close(); //} } // startRebalance runs loop to monitor connection healthy status. internal void StartRebalance(CallContext ctx) { var buffers = new double[RebalanceParams.NodesParams.Length][]; for (int i = 0; i < RebalanceParams.NodesParams.Length; i++) { var parameters = RebalanceParams.NodesParams[i]; buffers[i] = new double[parameters.Weights.Count]; Task.Run(async () => { await Task.Delay((int)RebalanceParams.ClientRebalanceInterval).ConfigureAwait(false); UpdateNodesHealth(ctx, buffers); }); } } private void UpdateNodesHealth(CallContext ctx, double[][] buffers) { var tasks = new Task[InnerPools!.Length]; for (int i = 0; i < InnerPools.Length; i++) { var bufferWeights = buffers[i]; tasks[i] = Task.Run(() => UpdateInnerNodesHealth(ctx, i, bufferWeights)); } Task.WaitAll(tasks); } private async ValueTask UpdateInnerNodesHealth(CallContext ctx, int poolIndex, double[] bufferWeights) { if (poolIndex > InnerPools!.Length - 1) { return; } var pool = InnerPools[poolIndex]; var options = RebalanceParams; int healthyChanged = 0; var tasks = new Task[pool.Clients.Length]; for (int j = 0; j < pool.Clients.Length; j++) { var client = pool.Clients[j]; var healthy = false; string? error = null; var changed = false; try { // check timeout settings changed = await client!.RestartIfUnhealthy(ctx).ConfigureAwait(false); healthy = true; bufferWeights[j] = options.NodesParams[poolIndex].Weights[j]; } // TODO: specify catch (FrostFsException e) { error = e.Message; bufferWeights[j] = 0; SessionCache.DeleteByPrefix(client.Address); } if (changed) { if (!string.IsNullOrEmpty(error)) { if (logger != null) { FrostFsMessages.HealthChanged(logger, client.Address, healthy, error!); } Interlocked.Exchange(ref healthyChanged, 1); } } await Task.WhenAll(tasks).ConfigureAwait(false); if (Interlocked.CompareExchange(ref healthyChanged, -1, -1) == 1) { var probabilities = AdjustWeights(bufferWeights); lock (_lock) { pool.Sampler = new Sampler(probabilities); } } } } // TODO: remove private bool CheckSessionTokenErr(Exception error, string address) { if (error == null) { return false; } if (error is SessionNotFoundException || error is SessionExpiredException) { this.SessionCache.DeleteByPrefix(address); return true; } return false; } public Statistic Statistic() { if (InnerPools == null) { throw new FrostFsInvalidObjectException(nameof(Pool)); } var statistics = new Statistic(); foreach (var inner in InnerPools) { int valueIndex = 0; var nodes = new string[inner.Clients.Length]; lock (_lock) { foreach (var client in inner.Clients) { if (client.IsHealthy()) { nodes[valueIndex] = client.Address; } var node = new NodeStatistic { Address = client.Address, Methods = client.MethodsStatus(), OverallErrors = client.GetOverallErrorRate(), CurrentErrors = client.GetCurrentErrorRate() }; statistics.Nodes.Add(node); valueIndex++; statistics.OverallErrors += node.OverallErrors; } if (statistics.CurrentNodes == null || statistics.CurrentNodes.Length == 0) { statistics.CurrentNodes = nodes; } } } return statistics; } public async Task GetNetmapSnapshotAsync(CallContext ctx) { var client = Connection(); return await client.Client!.GetNetmapSnapshotAsync(ctx).ConfigureAwait(false); } public async Task GetNodeInfoAsync(CallContext ctx) { var client = Connection(); return await client.Client!.GetNodeInfoAsync(ctx).ConfigureAwait(false); } public async Task GetNetworkSettingsAsync(CallContext ctx) { var client = Connection(); return await client.Client!.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); } public async Task CreateSessionAsync(PrmSessionCreate args, CallContext ctx) { var client = Connection(); return await client.Client!.CreateSessionAsync(args, ctx).ConfigureAwait(false); } public async Task> AddChainAsync(PrmApeChainAdd args, CallContext ctx) { var client = Connection(); return await client.Client!.AddChainAsync(args, ctx).ConfigureAwait(false); } public async Task RemoveChainAsync(PrmApeChainRemove args, CallContext ctx) { var client = Connection(); await client.Client!.RemoveChainAsync(args, ctx).ConfigureAwait(false); } public async Task ListChainAsync(PrmApeChainList args, CallContext ctx) { var client = Connection(); return await client.Client!.ListChainAsync(args, ctx).ConfigureAwait(false); } public async Task GetContainerAsync(PrmContainerGet args, CallContext ctx) { var client = Connection(); return await client.Client!.GetContainerAsync(args, ctx).ConfigureAwait(false); } public IAsyncEnumerable ListContainersAsync(PrmContainerGetAll args, CallContext ctx) { var client = Connection(); return client.Client!.ListContainersAsync(args, ctx); } public async Task CreateContainerAsync(PrmContainerCreate args, CallContext ctx) { var client = Connection(); return await client.Client!.CreateContainerAsync(args, ctx).ConfigureAwait(false); } public async Task DeleteContainerAsync(PrmContainerDelete args, CallContext ctx) { var client = Connection(); await client.Client!.DeleteContainerAsync(args, ctx).ConfigureAwait(false); } public async Task GetObjectHeadAsync(PrmObjectHeadGet args, CallContext ctx) { var client = Connection(); return await client.Client!.GetObjectHeadAsync(args, ctx).ConfigureAwait(false); } public async Task GetObjectAsync(PrmObjectGet args, CallContext ctx) { var client = Connection(); return await client.Client!.GetObjectAsync(args, ctx).ConfigureAwait(false); } public async Task PutObjectAsync(PrmObjectPut args, CallContext ctx) { var client = Connection(); return await client.Client!.PutObjectAsync(args, ctx).ConfigureAwait(false); } public async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var client = Connection(); return await client.Client!.PutClientCutObjectAsync(args, ctx).ConfigureAwait(false); } public async Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) { var client = Connection(); return await client.Client!.PutSingleObjectAsync(args, ctx).ConfigureAwait(false); } public async Task PatchObjectAsync(PrmObjectPatch args, CallContext ctx) { var client = Connection(); return await client.Client!.PatchObjectAsync(args, ctx).ConfigureAwait(false); } public async Task GetRangeAsync(PrmRangeGet args, CallContext ctx) { var client = Connection(); return await client.Client!.GetRangeAsync(args, ctx).ConfigureAwait(false); } public async Task[]> GetRangeHashAsync(PrmRangeHashGet args, CallContext ctx) { var client = Connection(); return await client.Client!.GetRangeHashAsync(args, ctx).ConfigureAwait(false); } public async Task PatchAsync(PrmObjectPatch args, CallContext ctx) { var client = Connection(); return await client.Client!.PatchObjectAsync(args, ctx).ConfigureAwait(false); } public async Task DeleteObjectAsync(PrmObjectDelete args, CallContext ctx) { var client = Connection(); await client.Client!.DeleteObjectAsync(args, ctx).ConfigureAwait(false); } public IAsyncEnumerable SearchObjectsAsync(PrmObjectSearch args, CallContext ctx) { var client = Connection(); return client.Client!.SearchObjectsAsync(args, ctx); } public async Task GetBalanceAsync(CallContext ctx) { var client = Connection(); return await client.Client!.GetBalanceAsync(ctx).ConfigureAwait(false); } protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { Close(); } disposedValue = true; } } public void Dispose() { Dispose(disposing: true); } }