[#24] Client: Implement pool part1 #31

Merged
PavelGrossSpb merged 3 commits from PavelGrossSpb/frostfs-sdk-csharp:poll into master 2024-11-05 11:44:51 +00:00
103 changed files with 3263 additions and 669 deletions

View file

@ -1,23 +0,0 @@
using Microsoft.Extensions.Caching.Memory;
namespace FrostFS.SDK.ClientV2
{
internal static class Cache
{
private static readonly IMemoryCache _ownersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 256
});
private static readonly IMemoryCache _containersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 1024
});
internal static IMemoryCache Owners => _ownersCache;
internal static IMemoryCache Containers => _containersCache;
}
}

View file

@ -0,0 +1,22 @@
using Microsoft.Extensions.Caching.Memory;
namespace FrostFS.SDK.ClientV2;
internal static class Caches
{
private static readonly IMemoryCache _ownersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 256
});
private static readonly IMemoryCache _containersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 1024
});
internal static IMemoryCache Owners => _ownersCache;
internal static IMemoryCache Containers => _containersCache;
}

View file

@ -4,12 +4,11 @@ using FrostFS.SDK.Cryptography;
using Google.Protobuf;
namespace FrostFS.SDK.ClientV2
{
public class ClientKey(ECDsa key)
{
internal ECDsa ECDsaKey { get; } = key;
namespace FrostFS.SDK.ClientV2;
internal ByteString PublicKeyProto { get; } = ByteString.CopyFrom(key.PublicKey());
}
public class ClientKey(ECDsa key)
{
internal ECDsa ECDsaKey { get; } = key;
internal ByteString PublicKeyProto { get; } = ByteString.CopyFrom(key.PublicKey());
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class FrostFsInvalidObjectException : FrostFsException
{
public FrostFsInvalidObjectException()
{
}
public FrostFsInvalidObjectException(string message) : base(message)
{
}
public FrostFsInvalidObjectException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,25 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class FrostFsResponseException : FrostFsException
{
public FrostFsResponseStatus? Status { get; private set; }
public FrostFsResponseException()
{
}
public FrostFsResponseException(FrostFsResponseStatus status)
{
Status = status;
}
public FrostFsResponseException(string message) : base(message)
{
}
public FrostFsResponseException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class FrostFsStreamException : FrostFsException
{
public FrostFsStreamException()
{
}
public FrostFsStreamException(string message) : base(message)
{
}
public FrostFsStreamException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -1,18 +0,0 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class InvalidObjectException : Exception
{
public InvalidObjectException()
{
}
public InvalidObjectException(string message) : base(message)
{
}
public InvalidObjectException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -1,25 +0,0 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class ResponseException : Exception
{
public FrostFsResponseStatus? Status { get; private set; }
public ResponseException()
{
}
public ResponseException(FrostFsResponseStatus status)
{
Status = status;
}
public ResponseException(string message) : base(message)
{
}
public ResponseException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class SessionExpiredException : FrostFsException
{
public SessionExpiredException()
{
}
public SessionExpiredException(string message) : base(message)
{
}
public SessionExpiredException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class SessionNotFoundException : FrostFsException
{
public SessionNotFoundException()
{
}
public SessionNotFoundException(string message) : base(message)
{
}
public SessionNotFoundException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -22,10 +22,10 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageReference Include="System.Runtime.Caching" Version="8.0.0" />
<PackageReference Include="System.Runtime.Caching" Version="8.0.1" />
</ItemGroup>
<ItemGroup>

View file

@ -6,10 +6,12 @@ using System.Threading.Tasks;
using Frostfs.V2.Ape;
using Frostfs.V2.Apemanager;
using FrostFS.Accounting;
using FrostFS.Container;
using FrostFS.Netmap;
using FrostFS.Object;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Services;
using FrostFS.SDK.Cryptography;
using FrostFS.Session;
@ -35,7 +37,9 @@ public class FrostFSClient : IFrostFSClient
internal ObjectService.ObjectServiceClient? ObjectServiceClient { get; set; }
internal ClientEnvironment ClientCtx { get; set; }
internal AccountingService.AccountingServiceClient? AccountingServiceClient { get; set; }
internal ClientContext ClientCtx { get; set; }
public static IFrostFSClient GetInstance(IOptions<ClientSettings> clientOptions, GrpcChannelOptions? channelOptions = null)
{
@ -89,7 +93,7 @@ public class FrostFSClient : IFrostFSClient
var ecdsaKey = settings.Value.Key.LoadWif();
FrostFsOwner.FromKey(ecdsaKey);
ClientCtx = new ClientEnvironment(
ClientCtx = new ClientContext(
client: this,
key: ecdsaKey,
owner: FrostFsOwner.FromKey(ecdsaKey),
@ -104,13 +108,13 @@ public class FrostFSClient : IFrostFSClient
private FrostFSClient(IOptions<ClientSettings> options, GrpcChannelOptions? channelOptions)
{
var clientSettings = (options?.Value) ?? throw new ArgumentException("Options must be initialized");
var clientSettings = (options?.Value) ?? throw new ArgumentNullException(nameof(options), "Options value must be initialized");
clientSettings.Validate();
var channel = InitGrpcChannel(clientSettings.Host, channelOptions);
ClientCtx = new ClientEnvironment(
ClientCtx = new ClientContext(
this,
key: null,
owner: null,
@ -123,7 +127,7 @@ public class FrostFSClient : IFrostFSClient
private FrostFSClient(IOptions<SingleOwnerClientSettings> options, GrpcChannelOptions? channelOptions)
{
var clientSettings = (options?.Value) ?? throw new ArgumentException("Options must be initialized");
var clientSettings = (options?.Value) ?? throw new ArgumentNullException(nameof(options), "Options value must be initialized");
clientSettings.Validate();
@ -131,7 +135,7 @@ public class FrostFSClient : IFrostFSClient
var channel = InitGrpcChannel(clientSettings.Host, channelOptions);
ClientCtx = new ClientEnvironment(
ClientCtx = new ClientContext(
this,
key: ecdsaKey,
owner: FrostFsOwner.FromKey(ecdsaKey),
@ -142,6 +146,19 @@ public class FrostFSClient : IFrostFSClient
// CheckFrostFsVersionSupport(new Context { Timeout = TimeSpan.FromSeconds(20) });
}
internal FrostFSClient(WrapperPrm prm, SessionCache cache)
{
ClientCtx = new ClientContext(
client: this,
key: prm.Key,
owner: FrostFsOwner.FromKey(prm.Key!),
channel: InitGrpcChannel(prm.Address, null), //prm.GrpcChannelOptions),
version: new FrostFsVersion(2, 13))
{
SessionCache = cache
};
}
public void Dispose()
{
Dispose(true);
@ -305,16 +322,16 @@ public class FrostFSClient : IFrostFSClient
}
#endregion
#region SessionImplementation
#region Session Implementation
public async Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args)
{
if (args is null)
throw new ArgumentNullException(nameof(args));
var session = await CreateSessionInternalAsync(args).ConfigureAwait(false);
var token = session.Serialize();
return new FrostFsSessionToken(token);
var token = session.Serialize();
return new FrostFsSessionToken(token, session.Body.Id.ToUuid());
}
internal Task<SessionToken> CreateSessionInternalAsync(PrmSessionCreate args)
@ -327,8 +344,18 @@ public class FrostFSClient : IFrostFSClient
}
#endregion
#region Accounting Implementation
public async Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args)
{
args ??= new PrmBalance();
var service = GetAccouningService(args);
return await service.GetBallance(args).ConfigureAwait(false);
}
#endregion
#region ToolsImplementation
public FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, Context ctx)
public FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx)
{
if (header == null)
throw new ArgumentNullException(nameof(header));
@ -337,12 +364,12 @@ public class FrostFSClient : IFrostFSClient
}
#endregion
private async void CheckFrostFsVersionSupport(Context? ctx = default)
private async void CheckFrostFsVersionSupport(CallContext? ctx = default)
{
var args = new PrmNodeInfo { Context = ctx };
var args = new PrmNodeInfo(ctx);
if (ctx?.Version == null)
throw new InvalidObjectException(nameof(ctx.Version));
throw new ArgumentNullException(nameof(ctx), "Version must be initialized");
var service = GetNetmapService(args);
var localNodeInfo = await service.GetLocalNodeInfoAsync(args).ConfigureAwait(false);
@ -354,18 +381,16 @@ public class FrostFSClient : IFrostFSClient
}
}
private CallInvoker? SetupEnvironment(IContext ctx)
private CallInvoker? SetupClientContext(IContext ctx)
{
if (isDisposed)
throw new InvalidObjectException("Client is disposed.");
throw new FrostFsInvalidObjectException("Client is disposed.");
ctx.Context ??= new Context();
if (ctx.Context.Key == null)
if (ctx.Context!.Key == null)
{
if (ClientCtx.Key == null)
{
throw new InvalidObjectException("Key is not initialized.");
throw new ArgumentNullException(nameof(ctx), "Key is not initialized.");
}
ctx.Context.Key = ClientCtx.Key.ECDsaKey;
@ -380,24 +405,23 @@ public class FrostFSClient : IFrostFSClient
{
if (ClientCtx.Version == null)
{
throw new InvalidObjectException("Version is not initialized.");
throw new ArgumentNullException(nameof(ctx), "Version is not initialized.");
}
ctx.Context.Version = ClientCtx.Version;
}
CallInvoker? callInvoker = null;
if (ctx.Context.Interceptors != null && ctx.Context.Interceptors.Count > 0)
{
foreach (var interceptor in ctx.Context.Interceptors)
{
callInvoker = AddInvoker(callInvoker, interceptor);
}
}
foreach (var interceptor in ctx.Context.Interceptors)
callInvoker = AddInvoker(callInvoker, interceptor);
if (ctx.Context.Callback != null)
callInvoker = AddInvoker(callInvoker, new MetricsInterceptor(ctx.Context.Callback));
if (ctx.Context.PoolErrorHandler != null)
callInvoker = AddInvoker(callInvoker, new ErrorInterceptor(ctx.Context.PoolErrorHandler));
return callInvoker;
CallInvoker AddInvoker(CallInvoker? callInvoker, Interceptor interceptor)
@ -405,7 +429,7 @@ public class FrostFSClient : IFrostFSClient
if (callInvoker == null)
callInvoker = ClientCtx.Channel.Intercept(interceptor);
else
callInvoker.Intercept(interceptor);
callInvoker = callInvoker.Intercept(interceptor);
return callInvoker;
}
@ -413,7 +437,7 @@ public class FrostFSClient : IFrostFSClient
private NetmapServiceProvider GetNetmapService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = NetmapServiceClient ?? (callInvoker != null
? new NetmapService.NetmapServiceClient(callInvoker)
: new NetmapService.NetmapServiceClient(ClientCtx.Channel));
@ -423,7 +447,7 @@ public class FrostFSClient : IFrostFSClient
private SessionServiceProvider GetSessionService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = SessionServiceClient ?? (callInvoker != null
? new SessionService.SessionServiceClient(callInvoker)
: new SessionService.SessionServiceClient(ClientCtx.Channel));
@ -433,7 +457,7 @@ public class FrostFSClient : IFrostFSClient
private ApeManagerServiceProvider GetApeManagerService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = ApeManagerServiceClient ?? (callInvoker != null
? new APEManagerService.APEManagerServiceClient(callInvoker)
: new APEManagerService.APEManagerServiceClient(ClientCtx.Channel));
@ -441,9 +465,19 @@ public class FrostFSClient : IFrostFSClient
return new ApeManagerServiceProvider(client, ClientCtx);
}
private AccountingServiceProvider GetAccouningService(IContext ctx)
{
var callInvoker = SetupClientContext(ctx);
var client = AccountingServiceClient ?? (callInvoker != null
? new AccountingService.AccountingServiceClient(callInvoker)
: new AccountingService.AccountingServiceClient(ClientCtx.Channel));
return new AccountingServiceProvider(client, ClientCtx);
}
private ContainerServiceProvider GetContainerService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = ContainerServiceClient ?? (callInvoker != null
? new ContainerService.ContainerServiceClient(callInvoker)
: new ContainerService.ContainerServiceClient(ClientCtx.Channel));
@ -453,7 +487,7 @@ public class FrostFSClient : IFrostFSClient
private ObjectServiceProvider GetObjectService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = ObjectServiceClient ?? (callInvoker != null
? new ObjectService.ObjectServiceClient(callInvoker)
: new ObjectService.ObjectServiceClient(ClientCtx.Channel));
@ -461,6 +495,16 @@ public class FrostFSClient : IFrostFSClient
return new ObjectServiceProvider(client, ClientCtx);
}
private AccountingServiceProvider GetAccountService(IContext ctx)
{
var callInvoker = SetupClientContext(ctx);
var client = AccountingServiceClient ?? (callInvoker != null
? new AccountingService.AccountingServiceClient(callInvoker)
: new AccountingService.AccountingServiceClient(ClientCtx.Channel));
return new AccountingServiceProvider(client, ClientCtx);
}
private static GrpcChannel InitGrpcChannel(string host, GrpcChannelOptions? channelOptions)
{
try
@ -480,4 +524,24 @@ public class FrostFSClient : IFrostFSClient
throw new ArgumentException($"Host '{host}' has invalid format. Error: {e.Message}");
}
}
public async Task<string?> Dial(CallContext ctx)
{
var prm = new PrmBalance(ctx);
var service = GetAccouningService(prm);
_ = await service.GetBallance(prm).ConfigureAwait(false);
return null;
}
public bool RestartIfUnhealthy(CallContext ctx)
{
dstepanov-yadro marked this conversation as resolved
Review

To tell the truth, it doesn't look pretty. Without examining the source code, it is unclear what the returned string means.

To tell the truth, it doesn't look pretty. Without examining the source code, it is unclear what the returned string means.
Review

refactored

refactored
throw new NotImplementedException();
}
public void Close()
{
Dispose();
}
}

View file

@ -0,0 +1,8 @@
// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.
using System.Diagnostics.CodeAnalysis;
[assembly: SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "<Pending>", Scope = "member", Target = "~M:FrostFS.SDK.ClientV2.Sampler.Next~System.Int32")]

View file

@ -0,0 +1,68 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Interceptors;
namespace FrostFS.SDK.ClientV2;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1062:Validate arguments of public methods",
Justification = "parameters are provided by GRPC infrastructure")]
public class ErrorInterceptor(Action<Exception> handler) : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
var call = continuation(request, context);
return new AsyncUnaryCall<TResponse>(
HandleUnaryResponse(call),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> context,
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
{
var call = continuation(context);
return new AsyncClientStreamingCall<TRequest, TResponse>(
call.RequestStream,
HandleStreamResponse(call),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}
private async Task<TResponse> HandleUnaryResponse<TResponse>(AsyncUnaryCall<TResponse> call)
{
try
{
return await call;
}
catch (Exception ex)
{
handler(ex);
throw;
}
}
private async Task<TResponse> HandleStreamResponse<TRequest, TResponse>(AsyncClientStreamingCall<TRequest, TResponse> call)
{
try
{
return await call;
}
catch (Exception ex)
{
handler(ex);
throw;
}
}
}

View file

@ -7,6 +7,8 @@ using Grpc.Core.Interceptors;
namespace FrostFS.SDK.ClientV2;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1062:Validate arguments of public methods",
Justification = "parameters are provided by GRPC infrastructure")]
public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
@ -14,11 +16,6 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
if (continuation is null)
{
throw new ArgumentNullException(nameof(continuation));
}
var call = continuation(request, context);
return new AsyncUnaryCall<TResponse>(
@ -33,9 +30,6 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
ClientInterceptorContext<TRequest, TResponse> context,
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
{
if (continuation is null)
throw new ArgumentNullException(nameof(continuation));
var call = continuation(context);
return new AsyncClientStreamingCall<TRequest, TResponse>(
@ -52,7 +46,7 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
var watch = new Stopwatch();
watch.Start();
var response = await call.ResponseAsync.ConfigureAwait(false);
var response = await call;
watch.Stop();
@ -68,7 +62,7 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
var watch = new Stopwatch();
watch.Start();
var response = await call.ResponseAsync.ConfigureAwait(false);
var response = await call;
watch.Stop();

View file

@ -19,7 +19,7 @@ public interface IFrostFSClient : IDisposable
Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args);
#endregion
#region ApeMAnager
#region ApeManager
Task<byte[]> AddChainAsync(PrmApeChainAdd args);
Task RemoveChainAsync(PrmApeChainRemove args);
@ -51,7 +51,15 @@ public interface IFrostFSClient : IDisposable
IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args);
#endregion
#region Tools
FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, Context ctx);
#region Account
Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args = null);
#endregion
#region Tools
FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx);
#endregion
public Task<string?> Dial(CallContext ctx);
public void Close();
}

View file

@ -0,0 +1,24 @@
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
internal static partial class FrostFsMessages
{
[LoggerMessage(100,
LogLevel.Warning,
"Failed to create frostfs session token for client. Address {address}, {error}",
EventName = nameof(SessionCreationError))]
internal static partial void SessionCreationError(ILogger logger, string address, string error);
[LoggerMessage(101,
LogLevel.Warning,
"Error threshold reached. Address {address}, threshold {threshold}",
EventName = nameof(ErrorЕhresholdReached))]
internal static partial void ErrorЕhresholdReached(ILogger logger, string address, uint threshold);
[LoggerMessage(102,
LogLevel.Warning,
"Health has changed: {address} healthy {healthy}, reason {error}",
EventName = nameof(HealthChanged))]
internal static partial void HealthChanged(ILogger logger, string address, bool healthy, string error);
}

View file

@ -24,14 +24,14 @@ public static class ContainerIdMapper
var containerId = model.GetValue() ?? throw new ArgumentNullException(nameof(model));
if (!Cache.Containers.TryGetValue(containerId, out ContainerID? message))
if (!Caches.Containers.TryGetValue(containerId, out ContainerID? message))
{
message = new ContainerID
{
Value = ByteString.CopyFrom(Base58.Decode(containerId))
};
Cache.Containers.Set(containerId, message, _oneHourExpiration);
Caches.Containers.Set(containerId, message, _oneHourExpiration);
}
return message!;

View file

@ -22,14 +22,14 @@ public static class OwnerIdMapper
throw new ArgumentNullException(nameof(model));
}
if (!Cache.Owners.TryGetValue(model, out OwnerID? message))
if (!Caches.Owners.TryGetValue(model, out OwnerID? message))
{
message = new OwnerID
{
Value = ByteString.CopyFrom(model.ToHash())
};
Cache.Owners.Set(model, message, _oneHourExpiration);
Caches.Owners.Set(model, message, _oneHourExpiration);
}
return message!;
@ -42,11 +42,11 @@ public static class OwnerIdMapper
throw new ArgumentNullException(nameof(message));
}
if (!Cache.Owners.TryGetValue(message, out FrostFsOwner? model))
if (!Caches.Owners.TryGetValue(message, out FrostFsOwner? model))
{
model = new FrostFsOwner(Base58.Encode(message.Value.ToByteArray()));
Cache.Owners.Set(message, model, _oneHourExpiration);
Caches.Owners.Set(message, model, _oneHourExpiration);
}
return model!;

View file

@ -2,62 +2,61 @@
using Frostfs.V2.Ape;
namespace FrostFS.SDK.ClientV2
namespace FrostFS.SDK.ClientV2;
public struct FrostFsChainTarget(FrostFsTargetType type, string name) : IEquatable<FrostFsChainTarget>
{
public struct FrostFsChainTarget(FrostFsTargetType type, string name) : IEquatable<FrostFsChainTarget>
private ChainTarget? chainTarget;
public FrostFsTargetType Type { get; } = type;
public string Name { get; } = name;
internal ChainTarget GetChainTarget()
{
private ChainTarget? chainTarget;
public FrostFsTargetType Type { get; } = type;
public string Name { get; } = name;
internal ChainTarget GetChainTarget()
return chainTarget ??= new ChainTarget
{
return chainTarget ??= new ChainTarget
{
Type = GetTargetType(Type),
Name = Name
};
}
Type = GetTargetType(Type),
Name = Name
};
}
private static TargetType GetTargetType(FrostFsTargetType type)
private static TargetType GetTargetType(FrostFsTargetType type)
{
return type switch
{
return type switch
{
FrostFsTargetType.Undefined => TargetType.Undefined,
FrostFsTargetType.Namespace => TargetType.Namespace,
FrostFsTargetType.Container => TargetType.Container,
FrostFsTargetType.User => TargetType.User,
FrostFsTargetType.Group => TargetType.Group,
_ => throw new ArgumentException("Unexpected value for TargetType", nameof(type)),
};
}
FrostFsTargetType.Undefined => TargetType.Undefined,
FrostFsTargetType.Namespace => TargetType.Namespace,
FrostFsTargetType.Container => TargetType.Container,
FrostFsTargetType.User => TargetType.User,
FrostFsTargetType.Group => TargetType.Group,
_ => throw new ArgumentException("Unexpected value for TargetType", nameof(type)),
};
}
public override readonly bool Equals(object obj)
{
var target = (FrostFsChainTarget)obj;
return Equals(target);
}
public override readonly bool Equals(object obj)
{
var target = (FrostFsChainTarget)obj;
return Equals(target);
}
public override readonly int GetHashCode()
{
return $"{Name}{Type}".GetHashCode();
}
public override readonly int GetHashCode()
{
return $"{Name}{Type}".GetHashCode();
}
public static bool operator ==(FrostFsChainTarget left, FrostFsChainTarget right)
{
return left.Equals(right);
}
public static bool operator ==(FrostFsChainTarget left, FrostFsChainTarget right)
{
return left.Equals(right);
}
public static bool operator !=(FrostFsChainTarget left, FrostFsChainTarget right)
{
return !(left == right);
}
public static bool operator !=(FrostFsChainTarget left, FrostFsChainTarget right)
{
return !(left == right);
}
public readonly bool Equals(FrostFsChainTarget other)
{
return Type == other.Type && Name.Equals(other.Name, StringComparison.Ordinal);
}
public readonly bool Equals(FrostFsChainTarget other)
{
return Type == other.Type && Name.Equals(other.Name, StringComparison.Ordinal);
}
}

View file

@ -1,42 +1,41 @@
using Google.Protobuf;
namespace FrostFS.SDK.ClientV2
namespace FrostFS.SDK.ClientV2;
public struct FrostFsChain(byte[] raw) : System.IEquatable<FrostFsChain>
{
public struct FrostFsChain(byte[] raw) : System.IEquatable<FrostFsChain>
private ByteString? grpcRaw;
public byte[] Raw { get; } = raw;
internal ByteString GetRaw()
{
private ByteString? grpcRaw;
return grpcRaw ??= ByteString.CopyFrom(Raw);
}
public byte[] Raw { get; } = raw;
public override readonly bool Equals(object obj)
{
var chain = (FrostFsChain)obj;
return Equals(chain);
}
internal ByteString GetRaw()
{
return grpcRaw ??= ByteString.CopyFrom(Raw);
}
public override readonly int GetHashCode()
{
return Raw.GetHashCode();
}
public override readonly bool Equals(object obj)
{
var chain = (FrostFsChain)obj;
return Equals(chain);
}
public static bool operator ==(FrostFsChain left, FrostFsChain right)
{
return left.Equals(right);
}
public override readonly int GetHashCode()
{
return Raw.GetHashCode();
}
public static bool operator !=(FrostFsChain left, FrostFsChain right)
{
return !(left == right);
}
public static bool operator ==(FrostFsChain left, FrostFsChain right)
{
return left.Equals(right);
}
public static bool operator !=(FrostFsChain left, FrostFsChain right)
{
return !(left == right);
}
public readonly bool Equals(FrostFsChain other)
{
return Raw == other.Raw;
}
public readonly bool Equals(FrostFsChain other)
{
return Raw == other.Raw;
}
}

View file

@ -1,11 +1,10 @@
namespace FrostFS.SDK.ClientV2
namespace FrostFS.SDK.ClientV2;
public enum FrostFsTargetType
{
public enum FrostFsTargetType
{
Undefined = 0,
Namespace,
Container,
User,
Group
}
Undefined = 0,
Namespace,
Container,
User,
Group
}

View file

@ -15,7 +15,7 @@ public class ClientSettings
{
var errors = CheckFields();
if (errors != null)
ThrowException(errors);
ThrowSettingsException(errors);
}
protected Collection<string>? CheckFields()
@ -29,7 +29,7 @@ public class ClientSettings
return null;
}
protected static void ThrowException(Collection<string> errors)
protected static void ThrowSettingsException(Collection<string> errors)
{
if (errors is null)
{
@ -55,7 +55,7 @@ public class SingleOwnerClientSettings : ClientSettings
{
var errors = CheckFields();
if (errors != null)
ThrowException(errors);
ThrowSettingsException(errors);
}
protected new Collection<string>? CheckFields()

View file

@ -31,7 +31,7 @@ public class FrostFsContainerId
return this.modelId;
}
throw new InvalidObjectException();
throw new FrostFsInvalidObjectException();
}
internal ContainerID ContainerID
@ -47,7 +47,7 @@ public class FrostFsContainerId
return this.containerID;
}
throw new InvalidObjectException();
throw new FrostFsInvalidObjectException();
}
}

View file

@ -88,7 +88,7 @@ public class FrostFsContainerInfo
{
if (PlacementPolicy == null)
{
throw new InvalidObjectException("PlacementPolicy is null");
throw new ArgumentNullException("PlacementPolicy is null");
}
this.container = new Container.Container()

View file

@ -1,4 +1,4 @@
using FrostFS.SDK.ClientV2;
using System;
namespace FrostFS.SDK;
@ -67,7 +67,7 @@ public class FrostFsObject
public void SetParent(FrostFsObjectHeader largeObjectHeader)
{
if (Header?.Split == null)
throw new InvalidObjectException("The object is not initialized properly");
throw new ArgumentNullException(nameof(largeObjectHeader), "Split value must not be null");
Header.Split.ParentHeader = largeObjectHeader;
}

View file

@ -1,6 +1,9 @@
using System;
namespace FrostFS.SDK;
public class FrostFsSessionToken(byte[] token)
public class FrostFsSessionToken(byte[] token, Guid id)
{
public Guid Id { get; private set; } = id;
public byte[] Token { get; private set; } = token;
}

View file

@ -11,12 +11,12 @@ using Grpc.Core.Interceptors;
namespace FrostFS.SDK.ClientV2;
public class Context()
public class CallContext()
{
private ReadOnlyCollection<Interceptor>? interceptors;
private ByteString? publicKeyCache;
internal Action<Exception>? PoolErrorHandler { get; set; }
public ECDsa? Key { get; set; }
public FrostFsOwner? OwnerId { get; set; }
@ -31,11 +31,7 @@ public class Context()
public Action<CallStatistics>? Callback { get; set; }
public ReadOnlyCollection<Interceptor>? Interceptors
{
get { return this.interceptors; }
set { this.interceptors = value; }
}
public Collection<Interceptor> Interceptors { get; } = [];
public ByteString? GetPublicKeyCache()
{

View file

@ -7,5 +7,5 @@ public interface IContext
/// callbacks, interceptors.
/// </summary>
/// <value>Additional parameters for calling the method</value>
Context? Context { get; set; }
CallContext? Context { get; }
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmApeChainList(FrostFsChainTarget target) : PrmBase
public sealed class PrmApeChainList(FrostFsChainTarget target, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsChainTarget Target { get; } = target;
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmApeChainRemove(FrostFsChainTarget target, FrostFsChain chain) : PrmBase
public sealed class PrmApeChainRemove(FrostFsChainTarget target, FrostFsChain chain, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsChainTarget Target { get; } = target;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmApeChainAdd(FrostFsChainTarget target, FrostFsChain chain) : PrmBase
public sealed class PrmApeChainAdd(FrostFsChainTarget target, FrostFsChain chain, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsChainTarget Target { get; } = target;

View file

@ -0,0 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmBalance(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -2,7 +2,7 @@
namespace FrostFS.SDK.ClientV2;
public class PrmBase(NameValueCollection? xheaders = null) : IContext
public class PrmBase(CallContext? ctx, NameValueCollection? xheaders = null) : IContext
{
/// <summary>
/// FrostFS request X-Headers
@ -10,5 +10,5 @@ public class PrmBase(NameValueCollection? xheaders = null) : IContext
public NameValueCollection XHeaders { get; } = xheaders ?? [];
/// <inheritdoc />
public Context? Context { get; set; }
public CallContext Context { get; } = ctx ?? new CallContext();
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerCreate(FrostFsContainerInfo container) : PrmBase, ISessionToken
public sealed class PrmContainerCreate(FrostFsContainerInfo container, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerInfo Container { get; set; } = container;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerDelete(FrostFsContainerId containerId) : PrmBase, ISessionToken
public sealed class PrmContainerDelete(FrostFsContainerId containerId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerGet(FrostFsContainerId container) : PrmBase
public sealed class PrmContainerGet(FrostFsContainerId container, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsContainerId Container { get; set; } = container;
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerGetAll() : PrmBase()
public sealed class PrmContainerGetAll(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmNetmapSnapshot() : PrmBase
public sealed class PrmNetmapSnapshot(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmNetworkSettings() : PrmBase
public sealed class PrmNetworkSettings(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmNodeInfo() : PrmBase
public sealed class PrmNodeInfo(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectDelete(FrostFsContainerId containerId, FrostFsObjectId objectId) : PrmBase, ISessionToken
public sealed class PrmObjectDelete(FrostFsContainerId containerId, FrostFsObjectId objectId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectGet(FrostFsContainerId containerId, FrostFsObjectId objectId) : PrmBase, ISessionToken
public sealed class PrmObjectGet(FrostFsContainerId containerId, FrostFsObjectId objectId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectHeadGet(FrostFsContainerId containerId, FrostFsObjectId objectId) : PrmBase, ISessionToken
public sealed class PrmObjectHeadGet(FrostFsContainerId containerId, FrostFsObjectId objectId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -2,7 +2,7 @@ using System.IO;
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectPut : PrmBase, ISessionToken
public sealed class PrmObjectPut(CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
/// <summary>
/// Need to provide values like <c>ContainerId</c> and <c>ObjectType</c> to create and object.

View file

@ -2,7 +2,7 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectSearch(FrostFsContainerId containerId, params IObjectFilter[] filters) : PrmBase, ISessionToken
public sealed class PrmObjectSearch(FrostFsContainerId containerId, CallContext? ctx = null, params IObjectFilter[] filters) : PrmBase(ctx), ISessionToken
{
/// <summary>
/// Defines container for the search

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmSessionCreate(ulong expiration) : PrmBase
public sealed class PrmSessionCreate(ulong expiration, CallContext? ctx = null) : PrmBase(ctx)
{
public ulong Expiration { get; set; } = expiration;
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmSingleObjectPut(FrostFsObject frostFsObject) : PrmBase, ISessionToken
public sealed class PrmSingleObjectPut(FrostFsObject frostFsObject, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsObject FrostFsObject { get; set; } = frostFsObject;

View file

@ -0,0 +1,163 @@
using System;
using System.Threading;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
// clientStatusMonitor count error rate and other statistics for connection.
public class ClientStatusMonitor : IClientStatus
{
private static readonly MethodIndex[] MethodIndexes =
[
MethodIndex.methodBalanceGet,
MethodIndex.methodContainerPut,
MethodIndex.methodContainerGet,
MethodIndex.methodContainerList,
MethodIndex.methodContainerDelete,
MethodIndex.methodEndpointInfo,
MethodIndex.methodNetworkInfo,
MethodIndex.methodNetMapSnapshot,
MethodIndex.methodObjectPut,
MethodIndex.methodObjectDelete,
MethodIndex.methodObjectGet,
MethodIndex.methodObjectHead,
MethodIndex.methodObjectRange,
MethodIndex.methodObjectPatch,
MethodIndex.methodSessionCreate,
MethodIndex.methodAPEManagerAddChain,
MethodIndex.methodAPEManagerRemoveChain,
MethodIndex.methodAPEManagerListChains
];
public static string GetMethodName(MethodIndex index)
{
return index switch
{
MethodIndex.methodBalanceGet => "BalanceGet",
MethodIndex.methodContainerPut => "ContainerPut",
MethodIndex.methodContainerGet => "ContainerGet",
MethodIndex.methodContainerList => "ContainerList",
MethodIndex.methodContainerDelete => "ContainerDelete",
MethodIndex.methodEndpointInfo => "EndpointInfo",
MethodIndex.methodNetworkInfo => "NetworkInfo",
MethodIndex.methodNetMapSnapshot => "NetMapSnapshot",
MethodIndex.methodObjectPut => "ObjectPut",
MethodIndex.methodObjectDelete => "ObjectDelete",
MethodIndex.methodObjectGet => "ObjectGet",
MethodIndex.methodObjectHead => "ObjectHead",
MethodIndex.methodObjectRange => "ObjectRange",
MethodIndex.methodObjectPatch => "ObjectPatch",
MethodIndex.methodSessionCreate => "SessionCreate",
MethodIndex.methodAPEManagerAddChain => "APEManagerAddChain",
MethodIndex.methodAPEManagerRemoveChain => "APEManagerRemoveChain",
MethodIndex.methodAPEManagerListChains => "APEManagerListChains",
_ => throw new ArgumentException("Unknown method", nameof(index)),
};
}
private readonly object _lock = new();
private readonly ILogger? logger;
private int healthy;
public ClientStatusMonitor(ILogger? logger, string address)
{
this.logger = logger;
healthy = (int)HealthyStatus.Healthy;
Address = address;
Methods = new MethodStatus[MethodIndexes.Length];
for (int i = 0; i < MethodIndexes.Length; i++)
{
Methods[i] = new MethodStatus(GetMethodName(MethodIndexes[i]));
}
}
public string Address { get; }
internal uint ErrorThreshold { get; set; }
public uint CurrentErrorCount { get; set; }
public ulong OverallErrorCount { get; set; }
public MethodStatus[] Methods { get; private set; }
public bool IsHealthy()
{
var res = Interlocked.CompareExchange(ref healthy, -1, -1) == (int)HealthyStatus.Healthy;
return res;
}
public bool IsDialed()
{
return Interlocked.CompareExchange(ref healthy, -1, -1) != (int)HealthyStatus.UnhealthyOnDial;
}
public void SetHealthy()
{
Interlocked.Exchange(ref healthy, (int)HealthyStatus.Healthy);
}
public void SetUnhealthy()
{
Interlocked.Exchange(ref healthy, (int)HealthyStatus.UnhealthyOnRequest);
}
public void SetUnhealthyOnDial()
{
Interlocked.Exchange(ref healthy, (int)HealthyStatus.UnhealthyOnDial);
}
public void IncErrorRate()
{
bool thresholdReached;
lock (_lock)
{
CurrentErrorCount++;
OverallErrorCount++;
thresholdReached = CurrentErrorCount >= ErrorThreshold;
if (thresholdReached)
{
SetUnhealthy();
CurrentErrorCount = 0;
}
}
if (thresholdReached && logger != null)
{
FrostFsMessages.ErrorЕhresholdReached(logger, Address, ErrorThreshold);
}
}
public uint GetCurrentErrorRate()
{
lock (_lock)
{
return CurrentErrorCount;
}
}
public ulong GetOverallErrorRate()
{
lock (_lock)
{
return OverallErrorCount;
}
}
public StatusSnapshot[] MethodsStatus()
{
var result = new StatusSnapshot[Methods.Length];
for (int i = 0; i < result.Length; i++)
{
result[i] = Methods[i].Snapshot!;
}
return result;
}
}

View file

@ -0,0 +1,143 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
namespace FrostFS.SDK.ClientV2;
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
public class ClientWrapper : ClientStatusMonitor
{
private readonly object _lock = new();
private SessionCache sessionCache;
internal ClientWrapper(WrapperPrm wrapperPrm, Pool pool) : base(wrapperPrm.Logger, wrapperPrm.Address)
{
WrapperPrm = wrapperPrm;
ErrorThreshold = wrapperPrm.ErrorThreshold;
sessionCache = pool.SessionCache;
Client = new FrostFSClient(WrapperPrm, sessionCache);
}
internal FrostFSClient? Client { get; private set; }
internal WrapperPrm WrapperPrm { get; }
internal FrostFSClient? GetClient()
{
lock (_lock)
{
if (IsHealthy())
{
return Client;
}
return null;
}
}
// dial establishes a connection to the server from the FrostFS network.
// Returns an error describing failure reason. If failed, the client
// SHOULD NOT be used.
internal async Task Dial(CallContext ctx)
{
var client = GetClient() ?? throw new FrostFsInvalidObjectException("pool client unhealthy");
await client.Dial(ctx).ConfigureAwait(false);
}
internal void HandleError(Exception ex)
{
if (ex is FrostFsResponseException responseException && responseException.Status != null)
{
switch (responseException.Status.Code)
{
case FrostFsStatusCode.Internal:
case FrostFsStatusCode.WrongMagicNumber:
case FrostFsStatusCode.SignatureVerificationFailure:
case FrostFsStatusCode.NodeUnderMaintenance:
IncErrorRate();
return;
}
}
IncErrorRate();
}
private async Task ScheduleGracefulClose()
{
if (Client == null)
return;
await Task.Delay((int)WrapperPrm.GracefulCloseOnSwitchTimeout).ConfigureAwait(false);
Client.Close();
}
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
internal async Task<bool> RestartIfUnhealthy(CallContext ctx)
{
bool wasHealthy;
try
{
var prmNodeInfo = new PrmNodeInfo(ctx);
var response = await Client!.GetNodeInfoAsync(prmNodeInfo).ConfigureAwait(false);
return false;
}
catch (RpcException)
{
wasHealthy = true;
}
// if connection is dialed before, to avoid routine/connection leak,
// pool has to close it and then initialize once again.
Review

Could be resolved with

var client FrostFSClient
try {
client = new(WrapperPrm);
...
Client = client;
client = null;
}
finally {
client?.Dispose();
}
Could be resolved with ``` var client FrostFSClient try { client = new(WrapperPrm); ... Client = client; client = null; } finally { client?.Dispose(); } ```
Review

and why do I need a reference to disposed object?

and why do I need a reference to disposed object?
Review

I did not understand the question: where is the reference to disposed object?
There are 4 main steps:

  1. Create temp reference to disposable object
  2. Inside try block create an disposable object to this reference
  3. Inside try block transfer ownership of disposable object from temp reference to new owner and set null to temp reference: now Client field holds reference to disposable object, not temp reference
  4. Dispose disposable object if it is not null
I did not understand the question: where is the reference to disposed object? There are 4 main steps: 1. Create temp reference to disposable object 2. Inside `try` block create an disposable object to this reference 3. Inside `try` block transfer ownership of disposable object from temp reference to new owner and set `null` to temp reference: now `Client` field holds reference to disposable object, not temp reference 4. Dispose disposable object if it is not `null`
Review

Got it. But it looks like a strange trick to avoid dispose pattern warning that follows another analizer warning about "always null variable". The suppressing comment is a good reminder about dispose necessity and keeps the code clean and clear.

Got it. But it looks like a strange trick to avoid dispose pattern warning that follows another analizer warning about "always null variable". The suppressing comment is a good reminder about dispose necessity and keeps the code clean and clear.
Review

I don't think it is a trick, but if you ok with it, I'm ok too.

I don't think it is a trick, but if you ok with it, I'm ok too.
if (IsDialed())
{
await ScheduleGracefulClose().ConfigureAwait(false);
}
#pragma warning disable CA2000 // Dispose objects before losing scope: will be disposed manually
FrostFSClient client = new(WrapperPrm, sessionCache);
#pragma warning restore CA2000
//TODO: set additioanl params
var error = await client.Dial(ctx).ConfigureAwait(false);
if (!string.IsNullOrEmpty(error))
{
SetUnhealthyOnDial();
return wasHealthy;
}
lock (_lock)
{
Client = client;
}
try
{
var prmNodeInfo = new PrmNodeInfo(ctx);
var res = await client.GetNodeInfoAsync(prmNodeInfo).ConfigureAwait(false);
}
catch (FrostFsException)
{
SetUnhealthy();
return wasHealthy;
}
SetHealthy();
return !wasHealthy;
}
internal void IncRequests(ulong elapsed, MethodIndex method)
{
var methodStat = Methods[(int)method];
methodStat.IncRequests(elapsed);
}
}

View file

@ -0,0 +1,18 @@
namespace FrostFS.SDK.ClientV2;
// values for healthy status of clientStatusMonitor.
public enum HealthyStatus
{
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
UnhealthyOnDial,
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
UnhealthyOnRequest,
// statusHealthy is set when connection is ready to be used by the pool.
Healthy
}

View file

@ -0,0 +1,28 @@
namespace FrostFS.SDK.ClientV2;
public interface IClientStatus
{
// isHealthy checks if the connection can handle requests.
bool IsHealthy();
// isDialed checks if the connection was created.
bool IsDialed();
// setUnhealthy marks client as unhealthy.
void SetUnhealthy();
// address return address of endpoint.
string Address { get; }
// currentErrorRate returns current errors rate.
// After specific threshold connection is considered as unhealthy.
// Pool.startRebalance routine can make this connection healthy again.
uint GetCurrentErrorRate();
// overallErrorRate returns the number of all happened errors.
ulong GetOverallErrorRate();
// methodsStatus returns statistic for all used methods.
StatusSnapshot[] MethodsStatus();
}

View file

@ -0,0 +1,36 @@
using System;
using System.Security.Cryptography;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
// InitParameters contains values used to initialize connection Pool.
public class InitParameters
{
public ECDsa? Key { get; set; }
public ulong NodeDialTimeout { get; set; }
public ulong NodeStreamTimeout { get; set; }
public ulong HealthcheckTimeout { get; set; }
public ulong ClientRebalanceInterval { get; set; }
public ulong SessionExpirationDuration { get; set; }
public uint ErrorThreshold { get; set; }
public NodeParam[]? NodeParams { get; set; }
public GrpcChannelOptions[]? DialOptions { get; set; }
public Func<string, ClientWrapper>? ClientBuilder { get; set; }
public ulong GracefulCloseOnSwitchTimeout { get; set; }
public ILogger? Logger { get; set; }
}

View file

@ -0,0 +1,47 @@
using FrostFS.SDK.ClientV2;
internal sealed class InnerPool
{
private readonly object _lock = new();
internal InnerPool(Sampler sampler, ClientWrapper[] clients)
{
Clients = clients;
Sampler = sampler;
}
internal Sampler Sampler { get; set; }
internal ClientWrapper[] Clients { get; }
internal ClientWrapper? Connection()
{
lock (_lock)
{
if (Clients.Length == 1)
{
var client = Clients[0];
if (client.IsHealthy())
{
return client;
}
}
else
{
var attempts = 3 * Clients.Length;
for (int i = 0; i < attempts; i++)
{
int index = Sampler.Next();
if (Clients[index].IsHealthy())
{
return Clients[index];
}
}
}
return null;
}
}
}

View file

@ -0,0 +1,24 @@
namespace FrostFS.SDK.ClientV2;
public enum MethodIndex
{
methodBalanceGet,
methodContainerPut,
methodContainerGet,
methodContainerList,
methodContainerDelete,
methodEndpointInfo,
methodNetworkInfo,
methodNetMapSnapshot,
methodObjectPut,
methodObjectDelete,
methodObjectGet,
methodObjectHead,
methodObjectRange,
methodObjectPatch,
methodSessionCreate,
methodAPEManagerAddChain,
methodAPEManagerRemoveChain,
methodAPEManagerListChains,
methodLast
}

View file

@ -0,0 +1,19 @@
namespace FrostFS.SDK.ClientV2;
public class MethodStatus(string name)
{
private readonly object _lock = new();
public string Name { get; } = name;
public StatusSnapshot Snapshot { get; set; } = new StatusSnapshot();
internal void IncRequests(ulong elapsed)
{
lock (_lock)
{
Snapshot.AllTime += elapsed;
Snapshot.AllRequests++;
}
}
}

View file

@ -0,0 +1,12 @@
namespace FrostFS.SDK.ClientV2;
// NodeParam groups parameters of remote node.
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1815:Override equals and operator equals on value types", Justification = "<Pending>")]
public readonly struct NodeParam(int priority, string address, float weight)
{
public int Priority { get; } = priority;
public string Address { get; } = address;
public float Weight { get; } = weight;
}

View file

@ -0,0 +1,12 @@
namespace FrostFS.SDK.ClientV2;
public class NodeStatistic
{
public string? Address { get; internal set; }
public StatusSnapshot[]? Methods { get; internal set; }
public ulong OverallErrors { get; internal set; }
public uint CurrentErrors { get; internal set; }
}

View file

@ -0,0 +1,12 @@
using System.Collections.ObjectModel;
namespace FrostFS.SDK.ClientV2;
public class NodesParam(int priority)
{
public int Priority { get; } = priority;
public Collection<string> Addresses { get; } = [];
public Collection<double> Weights { get; } = [];
}

View file

@ -0,0 +1,776 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Frostfs.V2.Ape;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using Grpc.Core;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
public partial class Pool : IFrostFSClient
{
const int defaultSessionTokenExpirationDuration = 100; // in epochs
const int defaultErrorThreshold = 100;
const int defaultGracefulCloseOnSwitchTimeout = 10; //Seconds;
const int defaultRebalanceInterval = 15; //Seconds;
const int defaultHealthcheckTimeout = 4; //Seconds;
const int defaultDialTimeout = 5; //Seconds;
const int defaultStreamTimeout = 10; //Seconds;
private readonly object _lock = new();
private InnerPool[]? InnerPools { get; set; }
private ECDsa Key { get; set; }
private string PublicKey { get; }
private OwnerID? _ownerId;
private FrostFsOwner? _owner;
private FrostFsOwner Owner
{
get
{
_owner ??= new FrostFsOwner(Key.PublicKey().PublicKeyToAddress());
return _owner;
}
}
private OwnerID OwnerId
{
get
{
if (_ownerId == null)
{
_owner = new FrostFsOwner(Key.PublicKey().PublicKeyToAddress());
_ownerId = _owner.ToMessage();
}
return _ownerId;
}
}
internal CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
internal SessionCache SessionCache { get; set; }
private ulong SessionTokenDuration { get; set; }
private RebalanceParameters RebalanceParams { get; set; }
private Func<string, ClientWrapper> ClientBuilder;
private bool disposedValue;
private ILogger? logger { get; set; }
private ulong MaxObjectSize { get; set; }
public IClientStatus? ClientStatus { get; }
// NewPool creates connection pool using parameters.
public Pool(InitParameters options)
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}
if (options.Key == null)
{
throw new ArgumentException($"Missed required parameter {nameof(options.Key)}");
}
var nodesParams = AdjustNodeParams(options.NodeParams);
var cache = new SessionCache(options.SessionExpirationDuration);
FillDefaultInitParams(options, this);
Key = options.Key;
PublicKey = $"{Key.PublicKey()}";
SessionCache = cache;
logger = options.Logger;
SessionTokenDuration = options.SessionExpirationDuration;
RebalanceParams = new RebalanceParameters(
nodesParams.ToArray(),
options.HealthcheckTimeout,
options.ClientRebalanceInterval,
options.SessionExpirationDuration);
ClientBuilder = options.ClientBuilder!;
}
private void SetupContext(CallContext ctx)
{
if (ctx == null)
{
throw new ArgumentNullException(nameof(ctx));
}
ctx.Key ??= Key;
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
public async Task<string?> Dial(CallContext ctx)
{
SetupContext(ctx);
var inner = new InnerPool[RebalanceParams.NodesParams.Length];
bool atLeastOneHealthy = false;
int i = 0;
foreach (var nodeParams in RebalanceParams.NodesParams)
{
var clients = new ClientWrapper[nodeParams.Weights.Count];
for (int j = 0; j < nodeParams.Addresses.Count; j++)
{
ClientWrapper? client = null;
bool dialed = false;
try
{
client = clients[j] = ClientBuilder(nodeParams.Addresses[j]);
await client.Dial(ctx).ConfigureAwait(false);
dialed = true;
var token = await InitSessionForDuration(ctx, client, RebalanceParams.SessionExpirationDuration, Key, false)
.ConfigureAwait(false);
var key = FormCacheKey(nodeParams.Addresses[j], Key.PrivateKey().ToString());
_ = SessionCache.Cache[key] = token;
atLeastOneHealthy = true;
}
catch (RpcException ex)
{
if (!dialed)
client!.SetUnhealthyOnDial();
else
client!.SetUnhealthy();
if (logger != null)
{
FrostFsMessages.SessionCreationError(logger, client!.WrapperPrm.Address, ex.Message);
}
}
catch (FrostFsInvalidObjectException)
{
break;
}
}
var sampler = new Sampler(nodeParams.Weights.ToArray());
inner[i] = new InnerPool(sampler, clients);
i++;
}
if (!atLeastOneHealthy)
return "At least one node must be healthy";
InnerPools = inner;
var res = await GetNetworkSettingsAsync(new PrmNetworkSettings(ctx)).ConfigureAwait(false);
MaxObjectSize = res.MaxObjectSize;
StartRebalance(ctx);
return null;
}
private static IEnumerable<NodesParam> AdjustNodeParams(NodeParam[]? nodeParams)
{
if (nodeParams == null || nodeParams.Length == 0)
{
throw new ArgumentException("No FrostFS peers configured");
}
Dictionary<int, NodesParam> nodesParamsDict = new(nodeParams.Length);
foreach (var nodeParam in nodeParams)
{
if (!nodesParamsDict.TryGetValue(nodeParam.Priority, out var nodes))
{
nodes = new NodesParam(nodeParam.Priority);
nodesParamsDict[nodeParam.Priority] = nodes;
}
nodes.Addresses.Add(nodeParam.Address);
nodes.Weights.Add(nodeParam.Weight);
}
var nodesParams = new List<NodesParam>(nodesParamsDict.Count);
foreach (var key in nodesParamsDict.Keys)
{
var nodes = nodesParamsDict[key];
var newWeights = AdjustWeights([.. nodes.Weights]);
nodes.Weights.Clear();
foreach (var weight in newWeights)
{
nodes.Weights.Add(weight);
}
nodesParams.Add(nodes);
}
return nodesParams.OrderBy(n => n.Priority);
}
private static double[] AdjustWeights(double[] weights)
{
var adjusted = new double[weights.Length];
var sum = weights.Sum();
if (sum > 0)
{
for (int i = 0; i < weights.Length; i++)
{
adjusted[i] = weights[i] / sum;
}
}
return adjusted;
}
private static void FillDefaultInitParams(InitParameters parameters, Pool pool)
{
if (parameters.SessionExpirationDuration == 0)
parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration;
if (parameters.ErrorThreshold == 0)
parameters.ErrorThreshold = defaultErrorThreshold;
if (parameters.ClientRebalanceInterval <= 0)
parameters.ClientRebalanceInterval = defaultRebalanceInterval;
if (parameters.GracefulCloseOnSwitchTimeout <= 0)
parameters.GracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout;
if (parameters.HealthcheckTimeout <= 0)
parameters.HealthcheckTimeout = defaultHealthcheckTimeout;
if (parameters.NodeDialTimeout <= 0)
parameters.NodeDialTimeout = defaultDialTimeout;
if (parameters.NodeStreamTimeout <= 0)
parameters.NodeStreamTimeout = defaultStreamTimeout;
if (parameters.SessionExpirationDuration == 0)
parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration;
parameters.ClientBuilder ??= new Func<string, ClientWrapper>((address) =>
{
var wrapperPrm = new WrapperPrm
{
Address = address,
Key = parameters.Key,
Logger = parameters.Logger,
DialTimeout = parameters.NodeDialTimeout,
StreamTimeout = parameters.NodeStreamTimeout,
ErrorThreshold = parameters.ErrorThreshold,
dstepanov-yadro marked this conversation as resolved
Review

See warning

See warning
Review

fixed

fixed
GracefulCloseOnSwitchTimeout = parameters.GracefulCloseOnSwitchTimeout
};
return new ClientWrapper(wrapperPrm, pool);
}
);
}
private ClientWrapper Connection()
{
foreach (var pool in InnerPools!)
{
var client = pool.Connection();
if (client != null)
{
return client;
}
}
throw new FrostFsException("Cannot find alive client");
}
private static async Task<FrostFsSessionToken?> InitSessionForDuration(CallContext ctx, ClientWrapper cw, ulong duration, ECDsa key, bool clientCut)
{
var client = cw.Client;
var networkInfo = await client!.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx)).ConfigureAwait(false);
var epoch = networkInfo.Epoch;
ulong exp = ulong.MaxValue - epoch < duration
? ulong.MaxValue
: epoch + duration;
var prmSessionCreate = new PrmSessionCreate(exp, ctx);
return await client.CreateSessionAsync(prmSessionCreate).ConfigureAwait(false);
}
internal static string FormCacheKey(string address, string key)
{
return $"{address}{key}";
}
public void Close()
{
CancellationTokenSource.Cancel();
if (InnerPools != null)
{
// close all clients
foreach (var innerPool in InnerPools)
foreach (var client in innerPool.Clients)
if (client.IsDialed())
client.Client?.Close();
}
}
// startRebalance runs loop to monitor connection healthy status.
internal void StartRebalance(CallContext ctx)
{
var buffers = new double[RebalanceParams.NodesParams.Length][];
for (int i = 0; i < RebalanceParams.NodesParams.Length; i++)
{
var parameters = RebalanceParams.NodesParams[i];
buffers[i] = new double[parameters.Weights.Count];
Task.Run(async () =>
{
await Task.Delay((int)RebalanceParams.ClientRebalanceInterval).ConfigureAwait(false);
UpdateNodesHealth(ctx, buffers);
});
}
}
private void UpdateNodesHealth(CallContext ctx, double[][] buffers)
{
var tasks = new Task[InnerPools!.Length];
for (int i = 0; i < InnerPools.Length; i++)
{
var bufferWeights = buffers[i];
tasks[i] = Task.Run(() => UpdateInnerNodesHealth(ctx, i, bufferWeights));
}
Task.WaitAll(tasks);
}
private async ValueTask UpdateInnerNodesHealth(CallContext ctx, int poolIndex, double[] bufferWeights)
{
if (poolIndex > InnerPools!.Length - 1)
{
return;
}
var pool = InnerPools[poolIndex];
var options = RebalanceParams;
int healthyChanged = 0;
var tasks = new Task[pool.Clients.Length];
for (int j = 0; j < pool.Clients.Length; j++)
{
var client = pool.Clients[j];
var healthy = false;
string? error = null;
var changed = false;
try
{
// check timeout settings
changed = await client!.RestartIfUnhealthy(ctx).ConfigureAwait(false);
healthy = true;
bufferWeights[j] = options.NodesParams[poolIndex].Weights[j];
}
// TODO: specify
catch (FrostFsException e)
{
error = e.Message;
bufferWeights[j] = 0;
SessionCache.DeleteByPrefix(client.Address);
}
if (changed)
{
if (!string.IsNullOrEmpty(error))
{
if (logger != null)
{
FrostFsMessages.HealthChanged(logger, client.Address, healthy, error!);
}
Interlocked.Exchange(ref healthyChanged, 1);
}
}
await Task.WhenAll(tasks).ConfigureAwait(false);
if (Interlocked.CompareExchange(ref healthyChanged, -1, -1) == 1)
{
var probabilities = AdjustWeights(bufferWeights);
lock (_lock)
{
pool.Sampler = new Sampler(probabilities);
}
}
}
}
// TODO: remove
private bool CheckSessionTokenErr(Exception error, string address)
{
if (error == null)
{
return false;
}
if (error is SessionNotFoundException || error is SessionExpiredException)
{
this.SessionCache.DeleteByPrefix(address);
return true;
}
return false;
}
public Statistic Statistic()
{
if (InnerPools == null)
{
throw new FrostFsInvalidObjectException(nameof(Pool));
}
var statistics = new Statistic();
foreach (var inner in InnerPools)
{
int valueIndex = 0;
var nodes = new string[inner.Clients.Length];
lock (_lock)
{
foreach (var client in inner.Clients)
{
if (client.IsHealthy())
{
nodes[valueIndex] = client.Address;
}
var node = new NodeStatistic
{
Address = client.Address,
Methods = client.MethodsStatus(),
OverallErrors = client.GetOverallErrorRate(),
CurrentErrors = client.GetCurrentErrorRate()
};
statistics.Nodes.Add(node);
valueIndex++;
statistics.OverallErrors += node.OverallErrors;
}
if (statistics.CurrentNodes == null || statistics.CurrentNodes.Length == 0)
{
statistics.CurrentNodes = nodes;
}
}
}
return statistics;
}
public async Task<FrostFsNetmapSnapshot> GetNetmapSnapshotAsync(PrmNetmapSnapshot? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNetmapSnapshotAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsNodeInfo> GetNodeInfoAsync(PrmNodeInfo? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNodeInfoAsync(args).ConfigureAwait(false);
}
public async Task<NetworkSettings> GetNetworkSettingsAsync(PrmNetworkSettings? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNetworkSettingsAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.CreateSessionAsync(args).ConfigureAwait(false);
}
public async Task<byte[]> AddChainAsync(PrmApeChainAdd args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.AddChainAsync(args).ConfigureAwait(false);
}
public async Task RemoveChainAsync(PrmApeChainRemove args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.RemoveChainAsync(args).ConfigureAwait(false);
}
public async Task<Chain[]> ListChainAsync(PrmApeChainList args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.ListChainAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsContainerInfo> GetContainerAsync(PrmContainerGet args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetContainerAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsContainerId> ListContainersAsync(PrmContainerGetAll? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return client.Client!.ListContainersAsync(args);
}
public async Task<FrostFsContainerId> CreateContainerAsync(PrmContainerCreate args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.CreateContainerAsync(args).ConfigureAwait(false);
}
public async Task DeleteContainerAsync(PrmContainerDelete args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.DeleteContainerAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetObjectHeadAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.PutObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.PutSingleObjectAsync(args).ConfigureAwait(false);
}
public async Task DeleteObjectAsync(PrmObjectDelete args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.DeleteObjectAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return client.Client!.SearchObjectsAsync(args);
}
public async Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetBalanceAsync(args).ConfigureAwait(false);
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
Close();
}
disposedValue = true;
}
}
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
public FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx)
{
throw new NotImplementedException();
}
}

View file

@ -0,0 +1,16 @@
namespace FrostFS.SDK.ClientV2;
public class RebalanceParameters(
NodesParam[] nodesParams,
ulong nodeRequestTimeout,
ulong clientRebalanceInterval,
ulong sessionExpirationDuration)
{
public NodesParam[] NodesParams { get; set; } = nodesParams;
public ulong NodeRequestTimeout { get; set; } = nodeRequestTimeout;
public ulong ClientRebalanceInterval { get; set; } = clientRebalanceInterval;
public ulong SessionExpirationDuration { get; set; } = sessionExpirationDuration;
}

View file

@ -0,0 +1,14 @@
using System;
namespace FrostFS.SDK.ClientV2;
// RequestInfo groups info about pool request.
struct RequestInfo
{
public string Address { get; set; }
public MethodIndex MethodIndex { get; set; }
public TimeSpan Elapsed { get; set; }
}

View file

@ -0,0 +1,85 @@
using System;
namespace FrostFS.SDK.ClientV2;
internal sealed class Sampler
{
private readonly object _lock = new();
private Random random = new();
internal double[] Probabilities { get; set; }
internal int[] Alias { get; set; }
internal Sampler(double[] probabilities)
{
var small = new WorkList();
var large = new WorkList();
var n = probabilities.Length;
// sampler.randomGenerator = rand.New(source)
Probabilities = new double[n];
Alias = new int[n];
// Compute scaled probabilities.
var p = new double[n];
for (int i = 0; i < n; i++)
{
p[i] = probabilities[i] * n;
if (p[i] < 1)
small.Add(i);
else
large.Add(i);
}
while (small.Length > 0 && large.Length > 0)
{
var l = small.Remove();
var g = large.Remove();
Probabilities[l] = p[l];
Alias[l] = g;
p[g] = p[g] + p[l] - 1;
if (p[g] < 1)
small.Add(g);
else
large.Add(g);
}
while (large.Length > 0)
{
var g = large.Remove();
Probabilities[g] = 1;
}
while (small.Length > 0)
{
var l = small.Remove();
probabilities[l] = 1;
}
}
internal int Next()
{
var n = Alias.Length;
int i;
double f;
lock (_lock)
{
i = random.Next(0, n - 1);
f = random.NextDouble();
}
if (f < Probabilities[i])
{
return i;
}
return Alias[i];
}
}

View file

@ -0,0 +1,24 @@
using System;
using System.Collections;
namespace FrostFS.SDK.ClientV2;
internal struct SessionCache(ulong sessionExpirationDuration)
{
internal Hashtable Cache { get; } = [];
internal ulong CurrentEpoch { get; set; }
internal ulong TokenDuration { get; set; } = sessionExpirationDuration;
internal void DeleteByPrefix(string prefix)
{
foreach (var key in Cache.Keys)
{
if (((string)key).StartsWith(prefix, StringComparison.Ordinal))
{
Cache.Remove(key);
}
}
}
}

View file

@ -0,0 +1,12 @@
using System.Collections.ObjectModel;
namespace FrostFS.SDK.ClientV2;
public sealed class Statistic
{
public ulong OverallErrors { get; internal set; }
public Collection<NodeStatistic> Nodes { get; } = [];
public string[]? CurrentNodes { get; internal set; }
}

View file

@ -0,0 +1,8 @@
namespace FrostFS.SDK.ClientV2;
public class StatusSnapshot()
{
public ulong AllTime { get; internal set; }
public ulong AllRequests { get; internal set; }
}

View file

@ -0,0 +1,26 @@
using System.Collections.Generic;
using System.Linq;
namespace FrostFS.SDK.ClientV2;
internal sealed class WorkList
{
private readonly List<int> elements = [];
internal int Length
{
get { return elements.Count; }
}
internal void Add(int element)
{
elements.Add(element);
}
internal int Remove()
{
int last = elements.LastOrDefault();
elements.RemoveAt(elements.Count - 1);
return last;
}
}

View file

@ -0,0 +1,34 @@
using System;
using System.Security.Cryptography;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
// wrapperPrm is params to create clientWrapper.
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1815:Override equals and operator equals on value types", Justification = "<Pending>")]
public struct WrapperPrm
{
internal ILogger? Logger { get; set; }
internal string Address { get; set; }
internal ECDsa? Key { get; set; }
internal ulong DialTimeout { get; set; }
internal ulong StreamTimeout { get; set; }
internal uint ErrorThreshold { get; set; }
internal Action ResponseInfoCallback { get; set; }
internal Action PoolRequestInfoCallback { get; set; }
internal GrpcChannelOptions GrpcChannelOptions { get; set; }
internal ulong GracefulCloseOnSwitchTimeout { get; set; }
}

View file

@ -0,0 +1,40 @@
using System.Threading.Tasks;
using FrostFS.Accounting;
namespace FrostFS.SDK.ClientV2;
internal sealed class AccountingServiceProvider : ContextAccessor
{
private readonly AccountingService.AccountingServiceClient? _accountingServiceClient;
internal AccountingServiceProvider(
AccountingService.AccountingServiceClient? accountingServiceClient,
ClientContext context)
: base(context)
{
_accountingServiceClient = accountingServiceClient;
}
internal async Task<Decimal> GetBallance(PrmBalance args)
{
var ctx = args.Context!;
BalanceRequest request = new()
{
Body = new()
{
OwnerId = ctx.OwnerId!.OwnerID
}
};
request.AddMetaHeader(args.XHeaders);
request.Sign(ctx.Key!);
var response = await _accountingServiceClient!.BalanceAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
return response.Body.Balance;
}
}

View file

@ -1,15 +1,16 @@
using System;
using System.Threading.Tasks;
using Frostfs.V2.Ape;
using Frostfs.V2.Apemanager;
namespace FrostFS.SDK.ClientV2;
namespace FrostFS.SDK.ClientV2.Services;
internal sealed class ApeManagerServiceProvider : ContextAccessor
{
private readonly APEManagerService.APEManagerServiceClient? _apeManagerServiceClient;
internal ApeManagerServiceProvider(APEManagerService.APEManagerServiceClient? apeManagerServiceClient, ClientEnvironment context)
internal ApeManagerServiceProvider(APEManagerService.APEManagerServiceClient? apeManagerServiceClient, ClientContext context)
: base(context)
{
_apeManagerServiceClient = apeManagerServiceClient;
@ -18,10 +19,10 @@ internal sealed class ApeManagerServiceProvider : ContextAccessor
internal async Task<byte[]> AddChainAsync(PrmApeChainAdd args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
AddChainRequest request = new()
{
@ -45,10 +46,10 @@ internal sealed class ApeManagerServiceProvider : ContextAccessor
internal async Task RemoveChainAsync(PrmApeChainRemove args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
RemoveChainRequest request = new()
{
@ -70,10 +71,10 @@ internal sealed class ApeManagerServiceProvider : ContextAccessor
internal async Task<Chain[]> ListChainAsync(PrmApeChainList args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
ListChainsRequest request = new()
{

View file

@ -12,20 +12,28 @@ using FrostFS.Session;
namespace FrostFS.SDK.ClientV2;
internal sealed class ContainerServiceProvider(ContainerService.ContainerServiceClient service, ClientEnvironment context) : ContextAccessor(context), ISessionProvider
internal sealed class ContainerServiceProvider(ContainerService.ContainerServiceClient service, ClientContext clientCtx) : ContextAccessor(clientCtx), ISessionProvider
{
readonly SessionProvider sessions = new(context);
private SessionProvider? sessions;
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
{
sessions ??= new(ClientContext);
if (ClientContext.SessionCache.Cache != null &&
ClientContext.SessionCache.Cache.ContainsKey(ClientContext.SessionCacheKey))
{
return (SessionToken)ClientContext.SessionCache.Cache[ClientContext.SessionCacheKey];
}
return await sessions.GetOrCreateSession(args, ctx).ConfigureAwait(false);
}
internal async Task<FrostFsContainerInfo> GetContainerAsync(PrmContainerGet args)
{
GetRequest request = GetContainerRequest(args.Container.ContainerID, args.XHeaders, args.Context!);
GetRequest request = GetContainerRequest(args.Container.ContainerID, args.XHeaders, args.Context);
var response = await service.GetAsync(request, null, args.Context!.Deadline, args.Context.CancellationToken);
var response = await service.GetAsync(request, null, args.Context.Deadline, args.Context.CancellationToken);
Verifier.CheckResponse(response);
@ -35,13 +43,13 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
internal async IAsyncEnumerable<FrostFsContainerId> ListContainersAsync(PrmContainerGetAll args)
{
var ctx = args.Context!;
ctx.OwnerId ??= Context.Owner;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.OwnerId ??= ClientContext.Owner;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
if (ctx.OwnerId == null)
throw new InvalidObjectException(nameof(ctx.OwnerId));
throw new ArgumentException(nameof(ctx.OwnerId));
var request = new ListRequest
{
@ -74,11 +82,11 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
grpcContainer.Version ??= ctx.Version?.ToMessage();
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
if (grpcContainer.OwnerId == null)
throw new InvalidObjectException(nameof(grpcContainer.OwnerId));
throw new ArgumentException(nameof(grpcContainer.OwnerId));
if (grpcContainer.Version == null)
throw new InvalidObjectException(nameof(grpcContainer.Version));
throw new ArgumentException(nameof(grpcContainer.Version));
var request = new PutRequest
{
@ -114,7 +122,7 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new DeleteRequest
{
@ -147,10 +155,10 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
Verifier.CheckResponse(response);
}
private static GetRequest GetContainerRequest(ContainerID id, NameValueCollection? xHeaders, Context ctx)
private static GetRequest GetContainerRequest(ContainerID id, NameValueCollection? xHeaders, CallContext ctx)
{
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(ctx), "Key is null");
var request = new GetRequest
{
@ -172,7 +180,7 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
Removed
}
private async Task WaitForContainer(WaitExpects expect, ContainerID id, PrmWait? waitParams, Context ctx)
private async Task WaitForContainer(WaitExpects expect, ContainerID id, PrmWait? waitParams, CallContext ctx)
{
var request = GetContainerRequest(id, null, ctx);
@ -207,7 +215,7 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
await Task.Delay(waitParams.PollInterval).ConfigureAwait(false);
}
catch (ResponseException ex)
catch (FrostFsResponseException ex)
{
if (DateTime.UtcNow >= deadLine)
throw new TimeoutException();

View file

@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -12,27 +13,33 @@ internal sealed class NetmapServiceProvider : ContextAccessor
{
private readonly NetmapService.NetmapServiceClient netmapServiceClient;
internal NetmapServiceProvider(NetmapService.NetmapServiceClient netmapServiceClient, ClientEnvironment context)
internal NetmapServiceProvider(NetmapService.NetmapServiceClient netmapServiceClient, ClientContext context)
: base(context)
{
this.netmapServiceClient = netmapServiceClient;
}
internal async Task<NetworkSettings> GetNetworkSettingsAsync(Context ctx)
internal async Task<NetworkSettings> GetNetworkSettingsAsync(CallContext ctx)
{
if (Context.NetworkSettings != null)
return Context.NetworkSettings;
if (ClientContext.NetworkSettings != null)
return ClientContext.NetworkSettings;
var info = await GetNetworkInfoAsync(ctx).ConfigureAwait(false);
var response = await GetNetworkInfoAsync(ctx).ConfigureAwait(false);
var settings = new NetworkSettings();
foreach (var param in info.Body.NetworkInfo.NetworkConfig.Parameters)
var info = response.Body.NetworkInfo;
settings.Epoch = info.CurrentEpoch;
settings.MagicNumber = info.MagicNumber;
settings.MsPerBlock = info.MsPerBlock;
foreach (var param in info.NetworkConfig.Parameters)
{
SetNetworksParam(param, settings);
}
Context.NetworkSettings = settings;
ClientContext.NetworkSettings = settings;
return settings;
}
@ -40,10 +47,10 @@ internal sealed class NetmapServiceProvider : ContextAccessor
internal async Task<FrostFsNodeInfo> GetLocalNodeInfoAsync(PrmNodeInfo args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new LocalNodeInfoRequest
{
@ -60,12 +67,12 @@ internal sealed class NetmapServiceProvider : ContextAccessor
return response.Body.ToModel();
}
internal async Task<NetworkInfoResponse> GetNetworkInfoAsync(Context ctx)
internal async Task<NetworkInfoResponse> GetNetworkInfoAsync(CallContext ctx)
{
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(ctx), "Key is null");
var request = new NetworkInfoRequest();
@ -83,10 +90,10 @@ internal sealed class NetmapServiceProvider : ContextAccessor
internal async Task<FrostFsNetmapSnapshot> GetNetmapSnapshotAsync(PrmNetmapSnapshot args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new NetmapSnapshotRequest();

View file

@ -15,30 +15,32 @@ using Google.Protobuf;
namespace FrostFS.SDK.ClientV2;
internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientContext clientCtx)
: ContextAccessor(clientCtx), ISessionProvider
{
private readonly SessionProvider sessions;
private ObjectService.ObjectServiceClient client;
private SessionProvider? sessions;
private readonly ObjectService.ObjectServiceClient client = client;
internal ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment env)
: base(env)
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
{
this.sessions = new(Context);
this.client = client;
}
sessions ??= new(ClientContext);
if (ClientContext.SessionCache.Cache != null &&
ClientContext.SessionCache.Cache.ContainsKey(ClientContext.SessionCacheKey))
{
return (SessionToken)ClientContext.SessionCache.Cache[ClientContext.SessionCacheKey];
}
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
{
return await sessions.GetOrCreateSession(args, ctx).ConfigureAwait(false);
}
internal async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new HeadRequest
{
@ -74,10 +76,10 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new GetRequest
{
@ -108,10 +110,10 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
internal async Task DeleteObjectAsync(PrmObjectDelete args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new DeleteRequest
{
@ -145,7 +147,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new SearchRequest
{
@ -183,10 +185,10 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
throw new ArgumentNullException(nameof(args));
if (args.Header == null)
throw new ArgumentException(nameof(args.Header));
throw new ArgumentNullException(nameof(args), "Header is null");
if (args.Payload == null)
throw new ArgumentException(nameof(args.Payload));
throw new ArgumentNullException(nameof(args), "Payload is null");
if (args.ClientCut)
return await PutClientCutObject(args).ConfigureAwait(false);
@ -206,7 +208,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ctx);
@ -238,7 +240,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var ctx = args.Context!;
var tokenRaw = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var token = new FrostFsSessionToken(tokenRaw.Serialize());
var token = new FrostFsSessionToken(tokenRaw.Serialize(), tokenRaw.Body.Id.ToUuid());
args.SessionToken = token;
@ -254,7 +256,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
if (args.MaxObjectSizeCache == 0)
{
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings() { Context = ctx })
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx))
.ConfigureAwait(false);
args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
@ -306,7 +308,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds);
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject) { Context = args.Context }).ConfigureAwait(false);
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject, args.Context)).ConfigureAwait(false);
var parentHeader = args.Header.GetHeader();
@ -331,7 +333,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var payload = args.Payload!;
@ -352,7 +354,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
}
else
{
chunkBuffer = Context.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
chunkBuffer = ClientContext.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
isRentBuffer = true;
}
@ -404,12 +406,12 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
}
}
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, CallContext ctx)
{
var header = args.Header!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
header.OwnerId ??= ctx.OwnerId;
header.Version ??= ctx.Version;
@ -449,7 +451,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return await PutObjectInit(initRequest, ctx).ConfigureAwait(false);
}
private async Task<FrostFsObject> GetObject(GetRequest request, Context ctx)
private async Task<FrostFsObject> GetObject(GetRequest request, CallContext ctx)
{
var reader = GetObjectInit(request, ctx);
@ -461,7 +463,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return modelObject;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
private ObjectReader GetObjectInit(GetRequest initRequest, CallContext ctx)
{
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
@ -471,7 +473,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return new ObjectReader(call);
}
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, Context ctx)
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, CallContext ctx)
{
if (initRequest is null)
{
@ -485,7 +487,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return new ObjectStreamer(call);
}
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, Context ctx)
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, CallContext ctx)
{
using var stream = GetSearchReader(request, ctx);
@ -503,7 +505,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
}
}
private SearchReader GetSearchReader(SearchRequest initRequest, Context ctx)
private SearchReader GetSearchReader(SearchRequest initRequest, CallContext ctx)
{
if (initRequest is null)
{

View file

@ -10,7 +10,7 @@ internal sealed class SessionServiceProvider : ContextAccessor
{
private readonly SessionService.SessionServiceClient? _sessionServiceClient;
internal SessionServiceProvider(SessionService.SessionServiceClient? sessionServiceClient, ClientEnvironment context)
internal SessionServiceProvider(SessionService.SessionServiceClient? sessionServiceClient, ClientContext context)
: base(context)
{
_sessionServiceClient = sessionServiceClient;
@ -20,7 +20,7 @@ internal sealed class SessionServiceProvider : ContextAccessor
{
var ctx = args.Context!;
ctx.OwnerId ??= Context.Owner;
ctx.OwnerId ??= ClientContext.Owner;
var request = new CreateRequest
{
@ -37,7 +37,7 @@ internal sealed class SessionServiceProvider : ContextAccessor
return await CreateSession(request, args.Context!).ConfigureAwait(false);
}
internal async Task<SessionToken> CreateSession(CreateRequest request, Context ctx)
internal async Task<SessionToken> CreateSession(CreateRequest request, CallContext ctx)
{
var response = await _sessionServiceClient!.CreateAsync(request, null, ctx.Deadline, ctx.CancellationToken);

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
internal class ContextAccessor(ClientEnvironment context)
internal class ContextAccessor(ClientContext context)
{
protected ClientEnvironment Context { get; set; } = context;
protected ClientContext ClientContext { get; set; } = context;
}

View file

@ -4,16 +4,16 @@ namespace FrostFS.SDK.ClientV2;
internal interface ISessionProvider
{
ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx);
ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx);
}
internal sealed class SessionProvider(ClientEnvironment env)
internal sealed class SessionProvider(ClientContext envCtx)
{
public async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
public async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
{
if (args.SessionToken is null)
{
return await env.Client.CreateSessionInternalAsync(new PrmSessionCreate(uint.MaxValue) { Context = ctx })
return await envCtx.Client.CreateSessionInternalAsync(new PrmSessionCreate(uint.MaxValue, ctx))
.ConfigureAwait(false);
}

View file

@ -2,16 +2,21 @@ using System;
using System.Buffers;
using System.Security.Cryptography;
using FrostFS.SDK.Cryptography;
using Grpc.Net.Client;
namespace FrostFS.SDK.ClientV2;
public class ClientEnvironment(FrostFSClient client, ECDsa? key, FrostFsOwner? owner, GrpcChannel channel, FrostFsVersion version) : IDisposable
public class ClientContext(FrostFSClient client, ECDsa? key, FrostFsOwner? owner, GrpcChannel channel, FrostFsVersion version) : IDisposable
{
private ArrayPool<byte>? _arrayPool;
private string? sessionKey;
internal FrostFsOwner? Owner { get; } = owner;
internal string? Address { get; } = channel.Target;
internal GrpcChannel Channel { get; private set; } = channel;
internal FrostFsVersion Version { get; } = version;
@ -22,6 +27,21 @@ public class ClientEnvironment(FrostFSClient client, ECDsa? key, FrostFsOwner? o
internal ClientKey? Key { get; } = key != null ? new ClientKey(key) : null;
internal SessionCache SessionCache { get; set; }
internal string? SessionCacheKey
{
get
{
if (sessionKey == null && Key != null && Address != null)
{
sessionKey = Pool.FormCacheKey(Address, Key.ECDsaKey.PrivateKey().ToString());
}
return sessionKey;
}
}
/// <summary>
/// Custom pool is used for predefined sizes of buffers like grpc chunk
/// </summary>

View file

@ -4,6 +4,10 @@ namespace FrostFS.SDK.ClientV2;
public class NetworkSettings
{
public ulong Epoch { get; internal set; }
public ulong MagicNumber { get; internal set; }
public long MsPerBlock { get; internal set; }
public ulong AuditFee { get; internal set; }
public ulong BasicIncomeRate { get; internal set; }
public ulong ContainerFee { get; internal set; }

View file

@ -17,13 +17,13 @@ public sealed class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : I
internal async Task<Object.Object> ReadHeader()
{
if (!await Call.ResponseStream.MoveNext().ConfigureAwait(false))
throw new InvalidOperationException("unexpected end of stream");
throw new FrostFsStreamException("unexpected end of stream");
var response = Call.ResponseStream.Current;
Verifier.CheckResponse(response);
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Init)
throw new InvalidOperationException("unexpected message type");
throw new FrostFsStreamException("unexpected message type");
return new Object.Object
{
@ -41,7 +41,7 @@ public sealed class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : I
Verifier.CheckResponse(response);
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Chunk)
throw new InvalidOperationException("unexpected message type");
throw new FrostFsStreamException("unexpected message type");
return response.Body.Chunk.Memory;
}

View file

@ -11,7 +11,7 @@ namespace FrostFS.SDK.ClientV2;
internal static class ObjectTools
{
internal static FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, Context ctx)
internal static FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx)
{
var grpcHeader = CreateHeader(header, [], ctx);
@ -21,7 +21,7 @@ internal static class ObjectTools
return new ObjectID { Value = grpcHeader.Sha256() }.ToModel();
}
internal static Object.Object CreateObject(FrostFsObject @object, Context ctx)
internal static Object.Object CreateObject(FrostFsObject @object, CallContext ctx)
{
@object.Header.OwnerId ??= ctx.OwnerId;
@object.Header.Version ??= ctx.Version;
@ -53,13 +53,13 @@ internal static class ObjectTools
return obj;
}
internal static void SetSplitValues(Header grpcHeader, FrostFsSplit split, Context ctx)
internal static void SetSplitValues(Header grpcHeader, FrostFsSplit split, CallContext ctx)
{
if (split == null)
return;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new FrostFsInvalidObjectException(nameof(ctx.Key));
grpcHeader.Split = new Header.Types.Split
{
@ -85,7 +85,7 @@ internal static class ObjectTools
grpcHeader.Split.Previous = split.Previous?.ToMessage();
}
internal static Header CreateHeader(FrostFsObjectHeader header, byte[]? payload, Context ctx)
internal static Header CreateHeader(FrostFsObjectHeader header, byte[]? payload, CallContext ctx)
{
header.OwnerId ??= ctx.OwnerId;
header.Version ??= ctx.Version;

View file

@ -76,6 +76,11 @@ public static class RequestSigner
public static byte[] SignData(this ECDsa key, byte[] data)
{
if (key is null)
{
throw new ArgumentNullException(nameof(key));
}
var hash = new byte[65];
hash[0] = 0x04;

View file

@ -122,7 +122,7 @@ public static class Verifier
var status = resp.MetaHeader.Status.ToModel();
if (status != null && !status.IsSuccess)
throw new ResponseException(status);
throw new FrostFsResponseException(status);
}
/// <summary>
@ -137,6 +137,6 @@ public static class Verifier
}
if (!request.Verify())
throw new FormatException($"invalid response, type={request.GetType()}");
throw new FrostFsResponseException($"invalid response, type={request.GetType()}");
}
}

View file

@ -0,0 +1,62 @@
using FrostFS.SDK.ProtosV2.Interfaces;
using FrostFS.Session;
using Google.Protobuf;
namespace FrostFS.Accounting;
public partial class BalanceRequest : IRequest
{
IMetaHeader IVerifiableMessage.GetMetaHeader()
{
return MetaHeader;
}
IVerificationHeader IVerifiableMessage.GetVerificationHeader()
{
return VerifyHeader;
}
void IVerifiableMessage.SetMetaHeader(IMetaHeader metaHeader)
{
MetaHeader = (RequestMetaHeader)metaHeader;
}
void IVerifiableMessage.SetVerificationHeader(IVerificationHeader verificationHeader)
{
VerifyHeader = (RequestVerificationHeader)verificationHeader;
}
public IMessage GetBody()
{
return Body;
}
}
public partial class BalanceResponse : IResponse
{
IMetaHeader IVerifiableMessage.GetMetaHeader()
{
return MetaHeader;
}
IVerificationHeader IVerifiableMessage.GetVerificationHeader()
{
return VerifyHeader;
}
void IVerifiableMessage.SetMetaHeader(IMetaHeader metaHeader)
{
MetaHeader = (ResponseMetaHeader)metaHeader;
}
void IVerifiableMessage.SetVerificationHeader(IVerificationHeader verificationHeader)
{
VerifyHeader = (ResponseVerificationHeader)verificationHeader;
}
public IMessage GetBody()
{
return Body;
}
}

View file

@ -1,11 +1,11 @@
using System.Diagnostics;
using Grpc.Core;
using Grpc.Core;
using Grpc.Core.Interceptors;
namespace FrostFS.SDK.SmokeTests;
public class MetricsInterceptor() : Interceptor
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1062:Validate arguments of public methods",
Justification = "parameters are provided by GRPC infrastructure")]
public class CallbackInterceptor(Action<string>? callback = null) : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
@ -22,17 +22,11 @@ public class MetricsInterceptor() : Interceptor
call.Dispose);
}
private static async Task<TResponse> HandleUnaryResponse<TResponse>(AsyncUnaryCall<TResponse> call)
private async Task<TResponse> HandleUnaryResponse<TResponse>(AsyncUnaryCall<TResponse> call)
{
var watch = new Stopwatch();
watch.Start();
var response = await call;
var response = await call.ResponseAsync;
watch.Stop();
// Do something with call info
// var elapsed = watch.ElapsedTicks * 1_000_000/Stopwatch.Frequency;
callback?.Invoke($"elapsed");
return response;
}

View file

@ -1,49 +1,14 @@
using System.Diagnostics.CodeAnalysis;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using Google.Protobuf;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ContainerTestsBase
{
protected readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected ContainerMocker Mocker { get; set; }
protected ContainerTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ContainerMocker(this.key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
new NetworkMocker(this.key).GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
Mocker.GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public class ContainerTest : ContainerTestsBase
{
[Fact]

View file

@ -0,0 +1,40 @@
using FrostFS.SDK.ClientV2.Interfaces;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ContainerTestsBase
{
internal readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected ContainerMocker Mocker { get; set; }
protected ContainerTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ContainerMocker(this.key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
new NetworkMocker(this.key).GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
Mocker.GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}

View file

@ -0,0 +1,6 @@
// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.

View file

@ -26,8 +26,11 @@ public class AsyncStreamReaderMock(string key, FrostFsObjectHeader objectHeader)
OwnerId = objectHeader.OwnerId!.ToMessage()
};
foreach (var attr in objectHeader.Attributes)
header.Attributes.Add(attr.ToMessage());
if (objectHeader.Attributes != null)
{
foreach (var attr in objectHeader.Attributes)
header.Attributes.Add(attr.ToMessage());
}
var response = new GetResponse
{

View file

@ -1,3 +1,5 @@
using System.Collections.ObjectModel;
using FrostFS.SDK.ProtosV2.Interfaces;
using Grpc.Core;
@ -6,13 +8,15 @@ namespace FrostFS.SDK.Tests;
public class ClientStreamWriter : IClientStreamWriter<IRequest>
{
public List<IRequest> Messages { get; set; } = [];
private WriteOptions? _options;
public Collection<IRequest> Messages { get; } = [];
public bool CompletedTask { get; private set; }
public WriteOptions? WriteOptions
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
get => _options;
set => _options = value;
}
public Task CompleteAsync()

View file

@ -25,7 +25,10 @@ public abstract class ServiceBase(string key)
public static FrostFsVersion DefaultVersion { get; } = new(2, 13);
public static FrostFsPlacementPolicy DefaultPlacementPolicy { get; } = new FrostFsPlacementPolicy(true, new FrostFsReplica(1));
public Metadata Metadata { get; protected set; }
#pragma warning disable CA2227 // this is specific object, should be treated as is
public Metadata? Metadata { get; set; }
#pragma warning restore CA2227
public DateTime? DateTime { get; protected set; }
public CancellationToken CancellationToken { get; protected set; }
@ -35,6 +38,8 @@ public abstract class ServiceBase(string key)
protected ResponseVerificationHeader GetResponseVerificationHeader(IResponse response)
{
ArgumentNullException.ThrowIfNull(response);
var verifyHeader = new ResponseVerificationHeader
{
MetaSignature = new Refs.Signature

View file

@ -1,3 +1,5 @@
using System.Collections.ObjectModel;
using FrostFS.Container;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2;
@ -41,7 +43,7 @@ public class ContainerMocker(string key) : ContainerServiceBase(key)
putResponse.VerifyHeader = GetResponseVerificationHeader(putResponse);
var metadata = new Metadata();
var putContainerResponse = new AsyncUnaryCall<PutResponse>(
using var putContainerResponse = new AsyncUnaryCall<PutResponse>(
Task.FromResult(putResponse),
Task.FromResult(metadata),
() => new Grpc.Core.Status(StatusCode.OK, string.Empty),
@ -180,7 +182,7 @@ public class ContainerMocker(string key) : ContainerServiceBase(key)
public bool ReturnContainerRemoved { get; set; }
public List<byte[]> ContainerIds { get; set; } = [];
public Collection<byte[]> ContainerIds { get; } = [];
public List<RequestData<DeleteRequest>> Requests { get; set; } = [];
public Collection<RequestData<DeleteRequest>> Requests { get; } = [];
}

View file

@ -25,18 +25,17 @@ public class NetworkMocker(string key) : ServiceBase(key)
"MaintenanceModeAllowed"
];
public Dictionary<string, byte[]>? Parameters { get; set; }
public Dictionary<string, byte[]> Parameters { get; } = [];
public LocalNodeInfoResponse NodeInfoResponse { get; set; }
public LocalNodeInfoResponse? NodeInfoResponse { get; set; }
public LocalNodeInfoRequest LocalNodeInfoRequest { get; set; }
public LocalNodeInfoRequest? LocalNodeInfoRequest { get; set; }
public NetworkInfoRequest NetworkInfoRequest { get; set; }
public NetworkInfoRequest? NetworkInfoRequest { get; set; }
public NetmapSnapshotResponse NetmapSnapshotResponse { get; set; }
public NetmapSnapshotRequest NetmapSnapshotRequest { get; set; }
public NetmapSnapshotResponse? NetmapSnapshotResponse { get; set; }
public NetmapSnapshotRequest? NetmapSnapshotRequest { get; set; }
public Mock<NetmapService.NetmapServiceClient> GetMock()
{

View file

@ -1,3 +1,4 @@
using System.Collections.ObjectModel;
using System.Security.Cryptography;
using FrostFS.Object;
@ -92,7 +93,7 @@ public class ObjectMocker(string key) : ObjectServiceBase(key)
}
if (ResultObjectIds != null)
if (ResultObjectIds != null && ResultObjectIds.Count > 0)
{
PutResponse putResponse = new()
{
@ -197,14 +198,14 @@ public class ObjectMocker(string key) : ObjectServiceBase(key)
public Header? HeadResponse { get; set; }
public List<byte[]>? ResultObjectIds { get; set; }
public Collection<byte[]>? ResultObjectIds { get; } = [];
public ClientStreamWriter? ClientStreamWriter { get; private set; } = new();
public List<PutSingleRequest> PutSingleRequests { get; private set; } = [];
public Collection<PutSingleRequest> PutSingleRequests { get; private set; } = [];
public List<DeleteRequest> DeleteRequests { get; private set; } = [];
public Collection<DeleteRequest> DeleteRequests { get; private set; } = [];
public List<HeadRequest> HeadRequests { get; private set; } = [];
public Collection<HeadRequest> HeadRequests { get; private set; } = [];
}

View file

@ -8,13 +8,14 @@ using Moq;
namespace FrostFS.SDK.Tests;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "No secure purpose")]
public class SessionMocker(string key) : ServiceBase(key)
{
public byte[]? SessionId { get; set; }
public byte[]? SessionKey { get; set; }
public CreateRequest CreateSessionRequest { get; private set; }
public CreateRequest? CreateSessionRequest { get; private set; }
public Mock<SessionService.SessionServiceClient> GetMock()
{
@ -24,7 +25,7 @@ public class SessionMocker(string key) : ServiceBase(key)
if (SessionId == null)
{
SessionId = new byte[32];
SessionId = new byte[16];
rand.NextBytes(SessionId);
}

View file

@ -1,55 +1,13 @@
using System.Security.Cryptography;
using System.Diagnostics.CodeAnalysis;
using FrostFS.Netmap;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Google.Protobuf;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class NetworkTestsBase
{
protected readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsVersion Version { get; set; } = new FrostFsVersion(2, 13);
protected ECDsa ECDsaKey { get; set; }
protected FrostFsOwner OwnerId { get; set; }
protected NetworkMocker Mocker { get; set; }
protected NetworkTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
ECDsaKey = key.LoadWif();
OwnerId = FrostFsOwner.FromKey(ECDsaKey);
Mocker = new NetworkMocker(this.key);
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
Mocker.GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
new ContainerMocker(this.key).GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public class NetworkTest : NetworkTestsBase
{
[Theory]
@ -57,35 +15,29 @@ public class NetworkTest : NetworkTestsBase
[InlineData(true)]
public async void NetworkSettingsTest(bool useContext)
{
Mocker.Parameters = new Dictionary<string, byte[]>
{
{ "AuditFee", [1] },
{ "BasicIncomeRate", [2] },
{ "ContainerFee", [3] },
{ "ContainerAliasFee", [4] },
{ "EpochDuration", [5] },
{ "InnerRingCandidateFee", [6] },
{ "MaxECDataCount", [7] },
{ "MaxECParityCount", [8] },
{ "MaxObjectSize", [9] },
{ "WithdrawFee", [10] },
{ "HomomorphicHashingDisabled", [1] },
{ "MaintenanceModeAllowed", [1] },
};
Mocker.Parameters.Add("AuditFee", [1]);
Mocker.Parameters.Add("BasicIncomeRate", [2]);
Mocker.Parameters.Add("ContainerFee", [3]);
Mocker.Parameters.Add("ContainerAliasFee", [4]);
Mocker.Parameters.Add("EpochDuration", [5]);
Mocker.Parameters.Add("InnerRingCandidateFee", [6]);
Mocker.Parameters.Add("MaxECDataCount", [7]);
Mocker.Parameters.Add("MaxECParityCount", [8]);
Mocker.Parameters.Add("MaxObjectSize", [9]);
Mocker.Parameters.Add("WithdrawFee", [10]);
Mocker.Parameters.Add("HomomorphicHashingDisabled", [1]);
Mocker.Parameters.Add("MaintenanceModeAllowed", [1]);
var param = new PrmNetworkSettings();
if (useContext)
{
param.Context = new Context
var param = useContext ?
new PrmNetworkSettings(new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
OwnerId = OwnerId,
Key = ECDsaKey,
Version = Version
};
}
})
: new PrmNetworkSettings();
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -119,6 +71,7 @@ public class NetworkTest : NetworkTestsBase
}
else
{
Assert.NotNull(Mocker.NetworkInfoRequest);
Assert.Empty(Mocker.NetworkInfoRequest.MetaHeader.XHeaders);
Assert.Null(Mocker.DateTime);
}
@ -127,6 +80,7 @@ public class NetworkTest : NetworkTestsBase
[Theory]
[InlineData(false)]
[InlineData(true)]
public async void NetmapSnapshotTest(bool useContext)
{
var body = new NetmapSnapshotResponse.Types.Body
@ -159,12 +113,11 @@ public class NetworkTest : NetworkTestsBase
Mocker.NetmapSnapshotResponse = new NetmapSnapshotResponse { Body = body };
var param = new PrmNetmapSnapshot();
PrmNetmapSnapshot param;
if (useContext)
{
param.XHeaders.Add("headerKey1", "headerValue1");
param.Context = new Context
var ctx = new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
@ -172,6 +125,14 @@ public class NetworkTest : NetworkTestsBase
Key = ECDsaKey,
Version = Version
};
param = new(ctx);
param.XHeaders.Add("headerKey1", "headerValue1");
}
else
{
param = new();
}
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -210,6 +171,7 @@ public class NetworkTest : NetworkTestsBase
if (useContext)
{
Assert.NotNull(Mocker.NetmapSnapshotRequest);
Assert.Single(Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders);
Assert.Equal(param.XHeaders.Keys[0], Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders.First().Key);
Assert.Equal(param.XHeaders[param.XHeaders.Keys[0]], Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders.First().Value);
@ -222,6 +184,7 @@ public class NetworkTest : NetworkTestsBase
}
else
{
Assert.NotNull(Mocker.NetmapSnapshotRequest);
Assert.Empty(Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders);
Assert.Null(Mocker.DateTime);
}
@ -249,12 +212,11 @@ public class NetworkTest : NetworkTestsBase
Mocker.NodeInfoResponse = new LocalNodeInfoResponse { Body = body };
var param = new PrmNodeInfo();
PrmNodeInfo param;
if (useContext)
{
param.XHeaders.Add("headerKey1", "headerValue1");
param.Context = new Context
var ctx = new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
@ -262,6 +224,14 @@ public class NetworkTest : NetworkTestsBase
Key = ECDsaKey,
Version = Version
};
param = new(ctx);
param.XHeaders.Add("headerKey1", "headerValue1");
}
else
{
param = new();
}
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -282,6 +252,7 @@ public class NetworkTest : NetworkTestsBase
Assert.Equal("value1", result.Attributes["key1"]);
Assert.Equal("value2", result.Attributes["key2"]);
Assert.NotNull(Mocker.LocalNodeInfoRequest);
if (useContext)
{
Assert.Single(Mocker.LocalNodeInfoRequest.MetaHeader.XHeaders);

View file

@ -0,0 +1,48 @@
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public abstract class NetworkTestsBase
{
internal readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsVersion Version { get; set; } = new FrostFsVersion(2, 13);
protected ECDsa ECDsaKey { get; set; }
protected FrostFsOwner OwnerId { get; set; }
protected NetworkMocker Mocker { get; set; }
protected NetworkTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
ECDsaKey = key.LoadWif();
OwnerId = FrostFsOwner.FromKey(ECDsaKey);
Mocker = new NetworkMocker(this.key);
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
Mocker.GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
new ContainerMocker(this.key).GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}

View file

@ -1,71 +1,19 @@
using System.Collections.ObjectModel;
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using System.Text;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Google.Protobuf;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ObjectTestsBase
{
protected static readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsContainerId ContainerId { get; set; }
protected NetworkMocker NetworkMocker { get; set; } = new NetworkMocker(key);
protected SessionMocker SessionMocker { get; set; } = new SessionMocker(key);
protected ContainerMocker ContainerMocker { get; set; } = new ContainerMocker(key);
protected ObjectMocker Mocker { get; set; }
protected ObjectTestsBase()
{
var ecdsaKey = key.LoadWif();
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ObjectMocker(key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
ContainerId = new FrostFsContainerId(Base58.Encode(Mocker.ContainerGuid.ToBytes()));
Mocker.ObjectHeader = new(
ContainerId,
FrostFsObjectType.Regular,
[new FrostFsAttributePair("k", "v")],
null,
FrostFsOwner.FromKey(ecdsaKey),
new FrostFsVersion(2, 13));
}
protected IFrostFSClient GetClient()
{
return FrostFSClient.GetTestInstance(
Settings,
null,
NetworkMocker.GetMock().Object,
SessionMocker.GetMock().Object,
ContainerMocker.GetMock().Object,
Mocker.GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
[SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "No secure purpose")]
public class ObjectTest : ObjectTestsBase
{
[Fact]
@ -75,7 +23,7 @@ public class ObjectTest : ObjectTestsBase
var ecdsaKey = key.LoadWif();
var ctx = new Context
var ctx = new CallContext
{
Key = ecdsaKey,
OwnerId = FrostFsOwner.FromKey(ecdsaKey),
@ -84,22 +32,25 @@ public class ObjectTest : ObjectTestsBase
var objectId = client.CalculateObjectId(Mocker.ObjectHeader!, ctx);
var result = await client.GetObjectAsync(new PrmObjectGet(ContainerId, objectId) { Context = ctx });
var result = await client.GetObjectAsync(new PrmObjectGet(ContainerId, objectId, ctx));
Assert.NotNull(result);
Assert.Equal(Mocker.ObjectHeader!.ContainerId.GetValue(), result.Header.ContainerId.GetValue());
Assert.Equal(Mocker.ObjectHeader!.OwnerId!.Value, result.Header.OwnerId!.Value);
Assert.NotNull(Mocker.ObjectHeader);
Assert.Equal(Mocker.ObjectHeader.ContainerId.GetValue(), result.Header.ContainerId.GetValue());
Assert.Equal(Mocker.ObjectHeader.OwnerId!.Value, result.Header.OwnerId!.Value);
Assert.Equal(Mocker.ObjectHeader.PayloadLength, result.Header.PayloadLength);
Assert.NotNull(result.Header.Attributes);
Assert.Single(result.Header.Attributes);
Assert.Equal(Mocker.ObjectHeader.Attributes[0].Key, result.Header.Attributes[0].Key);
Assert.Equal(Mocker.ObjectHeader.Attributes[0].Value, result.Header.Attributes[0].Value);
Assert.Equal(Mocker.ObjectHeader.Attributes![0].Key, result.Header.Attributes[0].Key);
Assert.Equal(Mocker.ObjectHeader.Attributes![0].Value, result.Header.Attributes[0].Value);
}
[Fact]
public async void PutObjectTest()
{
Mocker.ResultObjectIds = new([SHA256.HashData([])]);
Mocker.ResultObjectIds!.Add(SHA256.HashData([]));
Random rnd = new();
var bytes = new byte[1024];
@ -134,7 +85,7 @@ public class ObjectTest : ObjectTestsBase
[Fact]
public async void ClientCutTest()
{
NetworkMocker.Parameters = new Dictionary<string, byte[]>() { { "MaxObjectSize", [0x0, 0xa] } };
NetworkMocker.Parameters.Add("MaxObjectSize", [0x0, 0xa]);
var blockSize = 2560;
byte[] bytes = File.ReadAllBytes(@".\..\..\..\TestData\cat.jpg");
@ -150,17 +101,19 @@ public class ObjectTest : ObjectTestsBase
Random rnd = new();
List<byte[]> objIds = new([new byte[32], new byte[32], new byte[32]]);
Collection<byte[]> objIds = new([new byte[32], new byte[32], new byte[32]]);
rnd.NextBytes(objIds.ElementAt(0));
rnd.NextBytes(objIds.ElementAt(1));
rnd.NextBytes(objIds.ElementAt(2));
Mocker.ResultObjectIds = objIds;
foreach (var objId in objIds)
Mocker.ResultObjectIds!.Add(objId);
var result = await GetClient().PutObjectAsync(param);
var singleObjects = Mocker.PutSingleRequests.ToArray();
Assert.NotNull(Mocker.ClientStreamWriter?.Messages);
var streamObjects = Mocker.ClientStreamWriter.Messages.ToArray();
Assert.Single(singleObjects);
@ -262,6 +215,7 @@ public class ObjectTest : ObjectTestsBase
Assert.Equal(FrostFsObjectType.Regular, response.ObjectType);
Assert.NotNull(response.Attributes);
Assert.Single(response.Attributes);
Assert.Equal(Mocker.HeadResponse.Attributes[0].Key, response.Attributes.First().Key);

View file

@ -0,0 +1,59 @@
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ObjectTestsBase
{
protected static readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsContainerId ContainerId { get; set; }
protected NetworkMocker NetworkMocker { get; set; } = new NetworkMocker(key);
protected SessionMocker SessionMocker { get; set; } = new SessionMocker(key);
protected ContainerMocker ContainerMocker { get; set; } = new ContainerMocker(key);
protected ObjectMocker Mocker { get; set; }
protected ObjectTestsBase()
{
var ecdsaKey = key.LoadWif();
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ObjectMocker(key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
ContainerId = new FrostFsContainerId(Base58.Encode(Mocker.ContainerGuid.ToBytes()));
Mocker.ObjectHeader = new(
ContainerId,
FrostFsObjectType.Regular,
[new FrostFsAttributePair("k", "v")],
null,
FrostFsOwner.FromKey(ecdsaKey),
new FrostFsVersion(2, 13));
}
protected IFrostFSClient GetClient()
{
return FrostFSClient.GetTestInstance(
Settings,
null,
NetworkMocker.GetMock().Object,
SessionMocker.GetMock().Object,
ContainerMocker.GetMock().Object,
Mocker.GetMock().Object);
}
}

View file

@ -0,0 +1,612 @@
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
using static FrostFS.Session.SessionToken.Types.Body;
namespace FrostFS.SDK.SmokeTests;
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
[SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "No secure purpose")]
public class PoolSmokeTests : SmokeTestsBase
{
private static readonly PrmWait lightWait = new(100, 1);
private InitParameters GetDefaultParams()
{
return new InitParameters
{
Key = keyString.LoadWif(),
NodeParams = [new(1, this.url, 100.0f)],
ClientBuilder = null,
GracefulCloseOnSwitchTimeout = 30_000_000,
Logger = null
};
}
[Fact]
public async void NetworkMapTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext());
Assert.Null(error);
var result = await pool.GetNetmapSnapshotAsync(default);
Assert.True(result.Epoch > 0);
Assert.Single(result.NodeInfoCollection);
var item = result.NodeInfoCollection[0];
Assert.Equal(2, item.Version.Major);
Assert.Equal(13, item.Version.Minor);
Assert.Equal(NodeState.Online, item.State);
Assert.True(item.PublicKey.Length > 0);
Assert.Single(item.Addresses);
Assert.Equal(9, item.Attributes.Count);
}
[Fact]
public async void NodeInfoTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext());
Assert.Null(error);
var result = await pool.GetNodeInfoAsync();
Assert.Equal(2, result.Version.Major);
Assert.Equal(13, result.Version.Minor);
Assert.Equal(NodeState.Online, result.State);
Assert.Equal(33, result.PublicKey.Length);
Assert.Single(result.Addresses);
Assert.Equal(9, result.Attributes.Count);
}
[Fact]
public async void NodeInfoStatisticsTwoNodesTest()
{
var options = new InitParameters
{
Key = keyString.LoadWif(),
NodeParams = [
new(1, this.url, 100.0f),
new(2, this.url.Replace('0', '1'), 100.0f)
],
ClientBuilder = null,
GracefulCloseOnSwitchTimeout = 30_000_000,
Logger = null
};
using var pool = new Pool(options);
var callbackText = string.Empty;
var ctx = new CallContext
{
Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"
};
var error = await pool.Dial(ctx).ConfigureAwait(true);
Assert.Null(error);
using var client = FrostFSClient.GetSingleOwnerInstance(GetSingleOwnerOptions(this.keyString, this.url));
var result = await client.GetNodeInfoAsync();
var statistics = pool.Statistic();
Assert.False(string.IsNullOrEmpty(callbackText));
Assert.Contains(" took ", callbackText, StringComparison.Ordinal);
}
[Fact]
public async void NodeInfoStatisticsTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var callbackText = string.Empty;
var ctx = new CallContext
{
Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"
};
var error = await pool.Dial(ctx).ConfigureAwait(true);
Assert.Null(error);
using var client = FrostFSClient.GetSingleOwnerInstance(GetSingleOwnerOptions(this.keyString, this.url));
var result = await client.GetNodeInfoAsync();
Assert.False(string.IsNullOrEmpty(callbackText));
Assert.Contains(" took ", callbackText, StringComparison.Ordinal);
}
[Fact]
public async void GetSessionTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
var prm = new PrmSessionCreate(100);
var token = await pool.CreateSessionAsync(prm).ConfigureAwait(true);
var session = new Session.SessionToken().Deserialize(token.Token);
var ownerHash = Base58.Decode(OwnerId!.Value);
Assert.NotNull(session);
Assert.Null(session.Body.Container);
Assert.Null(session.Body.Object);
Assert.Equal(16, session.Body.Id.Length);
Assert.Equal(100ul, session.Body.Lifetime.Exp);
Assert.Equal(ownerHash, session.Body.OwnerId.Value);
Assert.Equal(33, session.Body.SessionKey.Length);
Assert.Equal(ContextOneofCase.None, session.Body.ContextCase);
}
[Fact]
public async void CreateObjectWithSessionToken()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
var token = await pool.CreateSessionAsync(new PrmSessionCreate(int.MaxValue));
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))));
createContainerParam.XHeaders.Add("key1", "value1");
var containerId = await pool.CreateContainerAsync(createContainerParam);
var bytes = GetRandomBytes(1024);
var param = new PrmObjectPut
{
Header = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = false,
SessionToken = token
};
var objectId = await pool.PutObjectAsync(param).ConfigureAwait(true);
var @object = await pool.GetObjectAsync(new PrmObjectGet(containerId, objectId));
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await Cleanup(pool);
}
[Fact]
public async void FilterTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))))
{
WaitParams = lightWait
};
var containerId = await pool.CreateContainerAsync(createContainerParam);
var bytes = new byte[] { 1, 2, 3 };
var ParentHeader = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular)
{
PayloadLength = 3
};
var param = new PrmObjectPut
{
Header = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")],
new FrostFsSplit()),
Payload = new MemoryStream(bytes),
ClientCut = false
};
var objectId = await pool.PutObjectAsync(param);
var head = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId));
var ecdsaKey = this.keyString.LoadWif();
var networkInfo = await pool.GetNetmapSnapshotAsync();
await CheckFilter(pool, containerId, new FilterByContainerId(FrostFsMatchType.Equals, containerId));
await CheckFilter(pool, containerId, new FilterByOwnerId(FrostFsMatchType.Equals, FrostFsOwner.FromKey(ecdsaKey)));
await CheckFilter(pool, containerId, new FilterBySplitId(FrostFsMatchType.Equals, param.Header.Split!.SplitId));
await CheckFilter(pool, containerId, new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"));
await CheckFilter(pool, containerId, new FilterByObjectId(FrostFsMatchType.Equals, objectId));
await CheckFilter(pool, containerId, new FilterByVersion(FrostFsMatchType.Equals, networkInfo.NodeInfoCollection[0].Version));
await CheckFilter(pool, containerId, new FilterByEpoch(FrostFsMatchType.Equals, networkInfo.Epoch));
await CheckFilter(pool, containerId, new FilterByPayloadLength(FrostFsMatchType.Equals, 3));
var checkSum = CheckSum.CreateCheckSum(bytes);
await CheckFilter(pool, containerId, new FilterByPayloadHash(FrostFsMatchType.Equals, checkSum));
await CheckFilter(pool, containerId, new FilterByPhysicallyStored());
}
private static async Task CheckFilter(Pool pool, FrostFsContainerId containerId, IObjectFilter filter)
{
var resultObjectsCount = 0;
PrmObjectSearch searchParam = new(containerId) { Filters = [filter] };
await foreach (var objId in pool.SearchObjectsAsync(searchParam))
{
resultObjectsCount++;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objId));
}
Assert.True(0 < resultObjectsCount, $"Filter for {filter.Key} doesn't work");
}
[Theory]
[InlineData(1)]
[InlineData(3 * 1024 * 1024)] // exactly one chunk size - 3MB
[InlineData(6 * 1024 * 1024 + 100)]
public async void SimpleScenarioTest(int objectSize)
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
bool callbackInvoked = false;
var ctx = new CallContext
{
// Timeout = TimeSpan.FromSeconds(20),
Callback = new((CallStatistics cs) =>
{
callbackInvoked = true;
Assert.True(cs.ElapsedMicroSeconds > 0);
})
};
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1)), [new("testKey", "testValue")]), ctx);
var createdContainer = await pool.CreateContainerAsync(createContainerParam);
var container = await pool.GetContainerAsync(new PrmContainerGet(createdContainer));
Assert.NotNull(container);
Assert.True(callbackInvoked);
var bytes = GetRandomBytes(objectSize);
var ctx1 = new CallContext
{
Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0))
};
var param = new PrmObjectPut(ctx1)
{
Header = new FrostFsObjectHeader(
containerId: createdContainer,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = false
};
var objectId = await pool.PutObjectAsync(param);
var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test");
bool hasObject = false;
await foreach (var objId in pool.SearchObjectsAsync(new PrmObjectSearch(createdContainer) { Filters = [filter] }))
{
hasObject = true;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(createdContainer, objectId));
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.NotNull(objHeader.Attributes);
Assert.Single(objHeader.Attributes!);
Assert.Equal("fileName", objHeader.Attributes!.First().Key);
Assert.Equal("test", objHeader.Attributes!.First().Value);
}
Assert.True(hasObject);
var @object = await pool.GetObjectAsync(new PrmObjectGet(createdContainer, objectId));
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await Cleanup(pool);
await foreach (var _ in pool.ListContainersAsync())
{
Assert.Fail("Containers exist");
}
}
[Theory]
[InlineData(1)]
[InlineData(3 * 1024 * 1024)] // exactly one chunk size - 3MB
[InlineData(6 * 1024 * 1024 + 100)]
public async void SimpleScenarioWithSessionTest(int objectSize)
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
var token = await pool.CreateSessionAsync(new PrmSessionCreate(int.MaxValue));
await Cleanup(pool);
var ctx = new CallContext
{
Timeout = TimeSpan.FromSeconds(20),
Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0))
};
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))), ctx);
var container = await pool.CreateContainerAsync(createContainerParam);
var containerInfo = await pool.GetContainerAsync(new PrmContainerGet(container, ctx));
Assert.NotNull(containerInfo);
var bytes = GetRandomBytes(objectSize);
var param = new PrmObjectPut(new CallContext { Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0)) })
{
Header = new FrostFsObjectHeader(
containerId: container,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = false,
SessionToken = token
};
var objectId = await pool.PutObjectAsync(param);
var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test");
bool hasObject = false;
await foreach (var objId in pool.SearchObjectsAsync(new PrmObjectSearch(container) { Filters = [filter], SessionToken = token }))
{
hasObject = true;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(container, objectId) { SessionToken = token });
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.NotNull(objHeader.Attributes);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value);
}
Assert.True(hasObject);
var @object = await pool.GetObjectAsync(new PrmObjectGet(container, objectId) { SessionToken = token });
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await Cleanup(pool);
await foreach (var _ in pool.ListContainersAsync())
{
Assert.Fail("Containers exist");
}
}
[Theory]
[InlineData(1)]
[InlineData(64 * 1024 * 1024)] // exactly 1 block size - 64MB
[InlineData(64 * 1024 * 1024 - 1)]
[InlineData(64 * 1024 * 1024 + 1)]
[InlineData(2 * 64 * 1024 * 1024 + 256)]
[InlineData(200)]
public async void ClientCutScenarioTest(int objectSize)
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
var createContainerParam = new PrmContainerCreate(new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))))
{
WaitParams = lightWait
};
var containerId = await pool.CreateContainerAsync(createContainerParam);
var ctx = new CallContext
{
Timeout = TimeSpan.FromSeconds(10),
};
ctx.Interceptors.Add(new CallbackInterceptor());
var container = await pool.GetContainerAsync(new PrmContainerGet(containerId, ctx));
Assert.NotNull(container);
byte[] bytes = GetRandomBytes(objectSize);
var param = new PrmObjectPut
{
Header = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = true
};
var objectId = await pool.PutObjectAsync(param);
var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test");
bool hasObject = false;
await foreach (var objId in pool.SearchObjectsAsync(new PrmObjectSearch(containerId, null, filter)))
{
hasObject = true;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId));
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.NotNull(objHeader.Attributes);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes[0].Key);
Assert.Equal("test", objHeader.Attributes[0].Value);
}
Assert.True(hasObject);
var @object = await pool.GetObjectAsync(new PrmObjectGet(containerId, objectId));
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await CheckFilter(pool, containerId, new FilterByRootObject());
await Cleanup(pool);
await foreach (var cid in pool.ListContainersAsync())
{
Assert.Fail($"Container {cid.GetValue()} exist");
}
}
private static byte[] GetRandomBytes(int size)
{
Random rnd = new();
var bytes = new byte[size];
rnd.NextBytes(bytes);
return bytes;
}
private static IOptions<SingleOwnerClientSettings> GetSingleOwnerOptions(string key, string url)
{
return Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = url
});
}
private static IOptions<ClientSettings> GetOptions(string url)
{
return Options.Create(new ClientSettings
{
Host = url
});
}
static async Task Cleanup(Pool pool)
{
await foreach (var cid in pool.ListContainersAsync())
{
await pool.DeleteContainerAsync(new PrmContainerDelete(cid) { WaitParams = lightWait }).ConfigureAwait(true);
}
}
}

View file

@ -1,56 +1,11 @@
using System.Security.Cryptography;
using System.Diagnostics.CodeAnalysis;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class SessionTestsBase
{
protected readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected ECDsa ECDsaKey { get; set; }
protected FrostFsOwner OwnerId { get; set; }
protected SessionMocker Mocker { get; set; }
protected SessionTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
ECDsaKey = key.LoadWif();
OwnerId = FrostFsOwner.FromKey(ECDsaKey);
Mocker = new SessionMocker(this.key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13)
};
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
new NetworkMocker(this.key).GetMock().Object,
Mocker.GetMock().Object,
new ContainerMocker(this.key).GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public class SessionTest : SessionTestsBase
{
[Theory]
@ -59,12 +14,11 @@ public class SessionTest : SessionTestsBase
public async void CreateSessionTest(bool useContext)
{
var exp = 100u;
var param = new PrmSessionCreate(exp);
PrmSessionCreate param;
if (useContext)
{
param.XHeaders.Add("headerKey1", "headerValue1");
param.Context = new Context
var ctx = new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
@ -72,6 +26,14 @@ public class SessionTest : SessionTestsBase
Key = ECDsaKey,
Version = Mocker.Version
};
param = new PrmSessionCreate(exp, ctx);
param.XHeaders.Add("headerKey1", "headerValue1");
}
else
{
param = new PrmSessionCreate(exp);
}
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -101,7 +63,6 @@ public class SessionTest : SessionTestsBase
Assert.NotNull(Mocker.CreateSessionRequest.MetaHeader);
Assert.Equal(Mocker.Version.ToMessage(), Mocker.CreateSessionRequest.MetaHeader.Version);
Assert.Null(Mocker.Metadata);
if (useContext)

Some files were not shown because too many files have changed in this diff Show more