Compare commits

...

3 commits

Author SHA1 Message Date
bff8d67867 [#24] Client: Implement pool part2
Unicode fix

Signed-off-by: Pavel Gross <p.gross@yadro.com>
2024-11-01 10:41:17 +03:00
ee20798379 [#24] Client: Implement pool part2
Signed-off-by: Pavel Gross <p.gross@yadro.com>
2024-11-01 10:30:28 +03:00
c9a75ea025 [#24] Client: Implement pool part1
first iteration - base classes and methods

Signed-off-by: Pavel Gross <p.gross@yadro.com>
2024-10-21 10:48:00 +03:00
103 changed files with 3263 additions and 669 deletions

View file

@ -1,23 +0,0 @@
using Microsoft.Extensions.Caching.Memory;
namespace FrostFS.SDK.ClientV2
{
internal static class Cache
{
private static readonly IMemoryCache _ownersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 256
});
private static readonly IMemoryCache _containersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 1024
});
internal static IMemoryCache Owners => _ownersCache;
internal static IMemoryCache Containers => _containersCache;
}
}

View file

@ -0,0 +1,22 @@
using Microsoft.Extensions.Caching.Memory;
namespace FrostFS.SDK.ClientV2;
internal static class Caches
{
private static readonly IMemoryCache _ownersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 256
});
private static readonly IMemoryCache _containersCache = new MemoryCache(new MemoryCacheOptions
{
// TODO: get from options?
SizeLimit = 1024
});
internal static IMemoryCache Owners => _ownersCache;
internal static IMemoryCache Containers => _containersCache;
}

View file

@ -4,12 +4,11 @@ using FrostFS.SDK.Cryptography;
using Google.Protobuf;
namespace FrostFS.SDK.ClientV2
{
public class ClientKey(ECDsa key)
{
internal ECDsa ECDsaKey { get; } = key;
namespace FrostFS.SDK.ClientV2;
internal ByteString PublicKeyProto { get; } = ByteString.CopyFrom(key.PublicKey());
}
public class ClientKey(ECDsa key)
{
internal ECDsa ECDsaKey { get; } = key;
internal ByteString PublicKeyProto { get; } = ByteString.CopyFrom(key.PublicKey());
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class FrostFsInvalidObjectException : FrostFsException
{
public FrostFsInvalidObjectException()
{
}
public FrostFsInvalidObjectException(string message) : base(message)
{
}
public FrostFsInvalidObjectException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,25 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class FrostFsResponseException : FrostFsException
{
public FrostFsResponseStatus? Status { get; private set; }
public FrostFsResponseException()
{
}
public FrostFsResponseException(FrostFsResponseStatus status)
{
Status = status;
}
public FrostFsResponseException(string message) : base(message)
{
}
public FrostFsResponseException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class FrostFsStreamException : FrostFsException
{
public FrostFsStreamException()
{
}
public FrostFsStreamException(string message) : base(message)
{
}
public FrostFsStreamException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -1,18 +0,0 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class InvalidObjectException : Exception
{
public InvalidObjectException()
{
}
public InvalidObjectException(string message) : base(message)
{
}
public InvalidObjectException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -1,25 +0,0 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class ResponseException : Exception
{
public FrostFsResponseStatus? Status { get; private set; }
public ResponseException()
{
}
public ResponseException(FrostFsResponseStatus status)
{
Status = status;
}
public ResponseException(string message) : base(message)
{
}
public ResponseException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class SessionExpiredException : FrostFsException
{
public SessionExpiredException()
{
}
public SessionExpiredException(string message) : base(message)
{
}
public SessionExpiredException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -0,0 +1,18 @@
using System;
namespace FrostFS.SDK.ClientV2;
public class SessionNotFoundException : FrostFsException
{
public SessionNotFoundException()
{
}
public SessionNotFoundException(string message) : base(message)
{
}
public SessionNotFoundException(string message, Exception innerException) : base(message, innerException)
{
}
}

View file

@ -22,10 +22,10 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageReference Include="System.Runtime.Caching" Version="8.0.0" />
<PackageReference Include="System.Runtime.Caching" Version="8.0.1" />
</ItemGroup>
<ItemGroup>

View file

@ -6,10 +6,12 @@ using System.Threading.Tasks;
using Frostfs.V2.Ape;
using Frostfs.V2.Apemanager;
using FrostFS.Accounting;
using FrostFS.Container;
using FrostFS.Netmap;
using FrostFS.Object;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Services;
using FrostFS.SDK.Cryptography;
using FrostFS.Session;
@ -35,7 +37,9 @@ public class FrostFSClient : IFrostFSClient
internal ObjectService.ObjectServiceClient? ObjectServiceClient { get; set; }
internal ClientEnvironment ClientCtx { get; set; }
internal AccountingService.AccountingServiceClient? AccountingServiceClient { get; set; }
internal ClientContext ClientCtx { get; set; }
public static IFrostFSClient GetInstance(IOptions<ClientSettings> clientOptions, GrpcChannelOptions? channelOptions = null)
{
@ -89,7 +93,7 @@ public class FrostFSClient : IFrostFSClient
var ecdsaKey = settings.Value.Key.LoadWif();
FrostFsOwner.FromKey(ecdsaKey);
ClientCtx = new ClientEnvironment(
ClientCtx = new ClientContext(
client: this,
key: ecdsaKey,
owner: FrostFsOwner.FromKey(ecdsaKey),
@ -104,13 +108,13 @@ public class FrostFSClient : IFrostFSClient
private FrostFSClient(IOptions<ClientSettings> options, GrpcChannelOptions? channelOptions)
{
var clientSettings = (options?.Value) ?? throw new ArgumentException("Options must be initialized");
var clientSettings = (options?.Value) ?? throw new ArgumentNullException(nameof(options), "Options value must be initialized");
clientSettings.Validate();
var channel = InitGrpcChannel(clientSettings.Host, channelOptions);
ClientCtx = new ClientEnvironment(
ClientCtx = new ClientContext(
this,
key: null,
owner: null,
@ -123,7 +127,7 @@ public class FrostFSClient : IFrostFSClient
private FrostFSClient(IOptions<SingleOwnerClientSettings> options, GrpcChannelOptions? channelOptions)
{
var clientSettings = (options?.Value) ?? throw new ArgumentException("Options must be initialized");
var clientSettings = (options?.Value) ?? throw new ArgumentNullException(nameof(options), "Options value must be initialized");
clientSettings.Validate();
@ -131,7 +135,7 @@ public class FrostFSClient : IFrostFSClient
var channel = InitGrpcChannel(clientSettings.Host, channelOptions);
ClientCtx = new ClientEnvironment(
ClientCtx = new ClientContext(
this,
key: ecdsaKey,
owner: FrostFsOwner.FromKey(ecdsaKey),
@ -142,6 +146,19 @@ public class FrostFSClient : IFrostFSClient
// CheckFrostFsVersionSupport(new Context { Timeout = TimeSpan.FromSeconds(20) });
}
internal FrostFSClient(WrapperPrm prm, SessionCache cache)
{
ClientCtx = new ClientContext(
client: this,
key: prm.Key,
owner: FrostFsOwner.FromKey(prm.Key!),
channel: InitGrpcChannel(prm.Address, null), //prm.GrpcChannelOptions),
version: new FrostFsVersion(2, 13))
{
SessionCache = cache
};
}
public void Dispose()
{
Dispose(true);
@ -305,16 +322,16 @@ public class FrostFSClient : IFrostFSClient
}
#endregion
#region SessionImplementation
#region Session Implementation
public async Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args)
{
if (args is null)
throw new ArgumentNullException(nameof(args));
var session = await CreateSessionInternalAsync(args).ConfigureAwait(false);
var token = session.Serialize();
return new FrostFsSessionToken(token);
var token = session.Serialize();
return new FrostFsSessionToken(token, session.Body.Id.ToUuid());
}
internal Task<SessionToken> CreateSessionInternalAsync(PrmSessionCreate args)
@ -327,8 +344,18 @@ public class FrostFSClient : IFrostFSClient
}
#endregion
#region Accounting Implementation
public async Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args)
{
args ??= new PrmBalance();
var service = GetAccouningService(args);
return await service.GetBallance(args).ConfigureAwait(false);
}
#endregion
#region ToolsImplementation
public FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, Context ctx)
public FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx)
{
if (header == null)
throw new ArgumentNullException(nameof(header));
@ -337,12 +364,12 @@ public class FrostFSClient : IFrostFSClient
}
#endregion
private async void CheckFrostFsVersionSupport(Context? ctx = default)
private async void CheckFrostFsVersionSupport(CallContext? ctx = default)
{
var args = new PrmNodeInfo { Context = ctx };
var args = new PrmNodeInfo(ctx);
if (ctx?.Version == null)
throw new InvalidObjectException(nameof(ctx.Version));
throw new ArgumentNullException(nameof(ctx), "Version must be initialized");
var service = GetNetmapService(args);
var localNodeInfo = await service.GetLocalNodeInfoAsync(args).ConfigureAwait(false);
@ -354,18 +381,16 @@ public class FrostFSClient : IFrostFSClient
}
}
private CallInvoker? SetupEnvironment(IContext ctx)
private CallInvoker? SetupClientContext(IContext ctx)
{
if (isDisposed)
throw new InvalidObjectException("Client is disposed.");
throw new FrostFsInvalidObjectException("Client is disposed.");
ctx.Context ??= new Context();
if (ctx.Context.Key == null)
if (ctx.Context!.Key == null)
{
if (ClientCtx.Key == null)
{
throw new InvalidObjectException("Key is not initialized.");
throw new ArgumentNullException(nameof(ctx), "Key is not initialized.");
}
ctx.Context.Key = ClientCtx.Key.ECDsaKey;
@ -380,24 +405,23 @@ public class FrostFSClient : IFrostFSClient
{
if (ClientCtx.Version == null)
{
throw new InvalidObjectException("Version is not initialized.");
throw new ArgumentNullException(nameof(ctx), "Version is not initialized.");
}
ctx.Context.Version = ClientCtx.Version;
}
CallInvoker? callInvoker = null;
if (ctx.Context.Interceptors != null && ctx.Context.Interceptors.Count > 0)
{
foreach (var interceptor in ctx.Context.Interceptors)
{
callInvoker = AddInvoker(callInvoker, interceptor);
}
}
foreach (var interceptor in ctx.Context.Interceptors)
callInvoker = AddInvoker(callInvoker, interceptor);
if (ctx.Context.Callback != null)
callInvoker = AddInvoker(callInvoker, new MetricsInterceptor(ctx.Context.Callback));
if (ctx.Context.PoolErrorHandler != null)
callInvoker = AddInvoker(callInvoker, new ErrorInterceptor(ctx.Context.PoolErrorHandler));
return callInvoker;
CallInvoker AddInvoker(CallInvoker? callInvoker, Interceptor interceptor)
@ -405,7 +429,7 @@ public class FrostFSClient : IFrostFSClient
if (callInvoker == null)
callInvoker = ClientCtx.Channel.Intercept(interceptor);
else
callInvoker.Intercept(interceptor);
callInvoker = callInvoker.Intercept(interceptor);
return callInvoker;
}
@ -413,7 +437,7 @@ public class FrostFSClient : IFrostFSClient
private NetmapServiceProvider GetNetmapService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = NetmapServiceClient ?? (callInvoker != null
? new NetmapService.NetmapServiceClient(callInvoker)
: new NetmapService.NetmapServiceClient(ClientCtx.Channel));
@ -423,7 +447,7 @@ public class FrostFSClient : IFrostFSClient
private SessionServiceProvider GetSessionService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = SessionServiceClient ?? (callInvoker != null
? new SessionService.SessionServiceClient(callInvoker)
: new SessionService.SessionServiceClient(ClientCtx.Channel));
@ -433,7 +457,7 @@ public class FrostFSClient : IFrostFSClient
private ApeManagerServiceProvider GetApeManagerService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = ApeManagerServiceClient ?? (callInvoker != null
? new APEManagerService.APEManagerServiceClient(callInvoker)
: new APEManagerService.APEManagerServiceClient(ClientCtx.Channel));
@ -441,9 +465,19 @@ public class FrostFSClient : IFrostFSClient
return new ApeManagerServiceProvider(client, ClientCtx);
}
private AccountingServiceProvider GetAccouningService(IContext ctx)
{
var callInvoker = SetupClientContext(ctx);
var client = AccountingServiceClient ?? (callInvoker != null
? new AccountingService.AccountingServiceClient(callInvoker)
: new AccountingService.AccountingServiceClient(ClientCtx.Channel));
return new AccountingServiceProvider(client, ClientCtx);
}
private ContainerServiceProvider GetContainerService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = ContainerServiceClient ?? (callInvoker != null
? new ContainerService.ContainerServiceClient(callInvoker)
: new ContainerService.ContainerServiceClient(ClientCtx.Channel));
@ -453,7 +487,7 @@ public class FrostFSClient : IFrostFSClient
private ObjectServiceProvider GetObjectService(IContext ctx)
{
var callInvoker = SetupEnvironment(ctx);
var callInvoker = SetupClientContext(ctx);
var client = ObjectServiceClient ?? (callInvoker != null
? new ObjectService.ObjectServiceClient(callInvoker)
: new ObjectService.ObjectServiceClient(ClientCtx.Channel));
@ -461,6 +495,16 @@ public class FrostFSClient : IFrostFSClient
return new ObjectServiceProvider(client, ClientCtx);
}
private AccountingServiceProvider GetAccountService(IContext ctx)
{
var callInvoker = SetupClientContext(ctx);
var client = AccountingServiceClient ?? (callInvoker != null
? new AccountingService.AccountingServiceClient(callInvoker)
: new AccountingService.AccountingServiceClient(ClientCtx.Channel));
return new AccountingServiceProvider(client, ClientCtx);
}
private static GrpcChannel InitGrpcChannel(string host, GrpcChannelOptions? channelOptions)
{
try
@ -480,4 +524,24 @@ public class FrostFSClient : IFrostFSClient
throw new ArgumentException($"Host '{host}' has invalid format. Error: {e.Message}");
}
}
public async Task<string?> Dial(CallContext ctx)
{
var prm = new PrmBalance(ctx);
var service = GetAccouningService(prm);
_ = await service.GetBallance(prm).ConfigureAwait(false);
return null;
}
public bool RestartIfUnhealthy(CallContext ctx)
{
throw new NotImplementedException();
}
public void Close()
{
Dispose();
}
}

View file

@ -0,0 +1,8 @@
// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.
using System.Diagnostics.CodeAnalysis;
[assembly: SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "<Pending>", Scope = "member", Target = "~M:FrostFS.SDK.ClientV2.Sampler.Next~System.Int32")]

View file

@ -0,0 +1,68 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Interceptors;
namespace FrostFS.SDK.ClientV2;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1062:Validate arguments of public methods",
Justification = "parameters are provided by GRPC infrastructure")]
public class ErrorInterceptor(Action<Exception> handler) : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
var call = continuation(request, context);
return new AsyncUnaryCall<TResponse>(
HandleUnaryResponse(call),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> context,
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
{
var call = continuation(context);
return new AsyncClientStreamingCall<TRequest, TResponse>(
call.RequestStream,
HandleStreamResponse(call),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}
private async Task<TResponse> HandleUnaryResponse<TResponse>(AsyncUnaryCall<TResponse> call)
{
try
{
return await call;
}
catch (Exception ex)
{
handler(ex);
throw;
}
}
private async Task<TResponse> HandleStreamResponse<TRequest, TResponse>(AsyncClientStreamingCall<TRequest, TResponse> call)
{
try
{
return await call;
}
catch (Exception ex)
{
handler(ex);
throw;
}
}
}

View file

@ -7,6 +7,8 @@ using Grpc.Core.Interceptors;
namespace FrostFS.SDK.ClientV2;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1062:Validate arguments of public methods",
Justification = "parameters are provided by GRPC infrastructure")]
public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
@ -14,11 +16,6 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
if (continuation is null)
{
throw new ArgumentNullException(nameof(continuation));
}
var call = continuation(request, context);
return new AsyncUnaryCall<TResponse>(
@ -33,9 +30,6 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
ClientInterceptorContext<TRequest, TResponse> context,
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
{
if (continuation is null)
throw new ArgumentNullException(nameof(continuation));
var call = continuation(context);
return new AsyncClientStreamingCall<TRequest, TResponse>(
@ -52,7 +46,7 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
var watch = new Stopwatch();
watch.Start();
var response = await call.ResponseAsync.ConfigureAwait(false);
var response = await call;
watch.Stop();
@ -68,7 +62,7 @@ public class MetricsInterceptor(Action<CallStatistics> callback) : Interceptor
var watch = new Stopwatch();
watch.Start();
var response = await call.ResponseAsync.ConfigureAwait(false);
var response = await call;
watch.Stop();

View file

@ -19,7 +19,7 @@ public interface IFrostFSClient : IDisposable
Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args);
#endregion
#region ApeMAnager
#region ApeManager
Task<byte[]> AddChainAsync(PrmApeChainAdd args);
Task RemoveChainAsync(PrmApeChainRemove args);
@ -51,7 +51,15 @@ public interface IFrostFSClient : IDisposable
IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args);
#endregion
#region Tools
FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, Context ctx);
#region Account
Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args = null);
#endregion
#region Tools
FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx);
#endregion
public Task<string?> Dial(CallContext ctx);
public void Close();
}

View file

@ -0,0 +1,24 @@
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
internal static partial class FrostFsMessages
{
[LoggerMessage(100,
LogLevel.Warning,
"Failed to create frostfs session token for client. Address {address}, {error}",
EventName = nameof(SessionCreationError))]
internal static partial void SessionCreationError(ILogger logger, string address, string error);
[LoggerMessage(101,
LogLevel.Warning,
"Error threshold reached. Address {address}, threshold {threshold}",
EventName = nameof(ErrorЕhresholdReached))]
internal static partial void ErrorЕhresholdReached(ILogger logger, string address, uint threshold);
[LoggerMessage(102,
LogLevel.Warning,
"Health has changed: {address} healthy {healthy}, reason {error}",
EventName = nameof(HealthChanged))]
internal static partial void HealthChanged(ILogger logger, string address, bool healthy, string error);
}

View file

@ -24,14 +24,14 @@ public static class ContainerIdMapper
var containerId = model.GetValue() ?? throw new ArgumentNullException(nameof(model));
if (!Cache.Containers.TryGetValue(containerId, out ContainerID? message))
if (!Caches.Containers.TryGetValue(containerId, out ContainerID? message))
{
message = new ContainerID
{
Value = ByteString.CopyFrom(Base58.Decode(containerId))
};
Cache.Containers.Set(containerId, message, _oneHourExpiration);
Caches.Containers.Set(containerId, message, _oneHourExpiration);
}
return message!;

View file

@ -22,14 +22,14 @@ public static class OwnerIdMapper
throw new ArgumentNullException(nameof(model));
}
if (!Cache.Owners.TryGetValue(model, out OwnerID? message))
if (!Caches.Owners.TryGetValue(model, out OwnerID? message))
{
message = new OwnerID
{
Value = ByteString.CopyFrom(model.ToHash())
};
Cache.Owners.Set(model, message, _oneHourExpiration);
Caches.Owners.Set(model, message, _oneHourExpiration);
}
return message!;
@ -42,11 +42,11 @@ public static class OwnerIdMapper
throw new ArgumentNullException(nameof(message));
}
if (!Cache.Owners.TryGetValue(message, out FrostFsOwner? model))
if (!Caches.Owners.TryGetValue(message, out FrostFsOwner? model))
{
model = new FrostFsOwner(Base58.Encode(message.Value.ToByteArray()));
Cache.Owners.Set(message, model, _oneHourExpiration);
Caches.Owners.Set(message, model, _oneHourExpiration);
}
return model!;

View file

@ -2,62 +2,61 @@
using Frostfs.V2.Ape;
namespace FrostFS.SDK.ClientV2
namespace FrostFS.SDK.ClientV2;
public struct FrostFsChainTarget(FrostFsTargetType type, string name) : IEquatable<FrostFsChainTarget>
{
public struct FrostFsChainTarget(FrostFsTargetType type, string name) : IEquatable<FrostFsChainTarget>
private ChainTarget? chainTarget;
public FrostFsTargetType Type { get; } = type;
public string Name { get; } = name;
internal ChainTarget GetChainTarget()
{
private ChainTarget? chainTarget;
public FrostFsTargetType Type { get; } = type;
public string Name { get; } = name;
internal ChainTarget GetChainTarget()
return chainTarget ??= new ChainTarget
{
return chainTarget ??= new ChainTarget
{
Type = GetTargetType(Type),
Name = Name
};
}
Type = GetTargetType(Type),
Name = Name
};
}
private static TargetType GetTargetType(FrostFsTargetType type)
private static TargetType GetTargetType(FrostFsTargetType type)
{
return type switch
{
return type switch
{
FrostFsTargetType.Undefined => TargetType.Undefined,
FrostFsTargetType.Namespace => TargetType.Namespace,
FrostFsTargetType.Container => TargetType.Container,
FrostFsTargetType.User => TargetType.User,
FrostFsTargetType.Group => TargetType.Group,
_ => throw new ArgumentException("Unexpected value for TargetType", nameof(type)),
};
}
FrostFsTargetType.Undefined => TargetType.Undefined,
FrostFsTargetType.Namespace => TargetType.Namespace,
FrostFsTargetType.Container => TargetType.Container,
FrostFsTargetType.User => TargetType.User,
FrostFsTargetType.Group => TargetType.Group,
_ => throw new ArgumentException("Unexpected value for TargetType", nameof(type)),
};
}
public override readonly bool Equals(object obj)
{
var target = (FrostFsChainTarget)obj;
return Equals(target);
}
public override readonly bool Equals(object obj)
{
var target = (FrostFsChainTarget)obj;
return Equals(target);
}
public override readonly int GetHashCode()
{
return $"{Name}{Type}".GetHashCode();
}
public override readonly int GetHashCode()
{
return $"{Name}{Type}".GetHashCode();
}
public static bool operator ==(FrostFsChainTarget left, FrostFsChainTarget right)
{
return left.Equals(right);
}
public static bool operator ==(FrostFsChainTarget left, FrostFsChainTarget right)
{
return left.Equals(right);
}
public static bool operator !=(FrostFsChainTarget left, FrostFsChainTarget right)
{
return !(left == right);
}
public static bool operator !=(FrostFsChainTarget left, FrostFsChainTarget right)
{
return !(left == right);
}
public readonly bool Equals(FrostFsChainTarget other)
{
return Type == other.Type && Name.Equals(other.Name, StringComparison.Ordinal);
}
public readonly bool Equals(FrostFsChainTarget other)
{
return Type == other.Type && Name.Equals(other.Name, StringComparison.Ordinal);
}
}

View file

@ -1,42 +1,41 @@
using Google.Protobuf;
namespace FrostFS.SDK.ClientV2
namespace FrostFS.SDK.ClientV2;
public struct FrostFsChain(byte[] raw) : System.IEquatable<FrostFsChain>
{
public struct FrostFsChain(byte[] raw) : System.IEquatable<FrostFsChain>
private ByteString? grpcRaw;
public byte[] Raw { get; } = raw;
internal ByteString GetRaw()
{
private ByteString? grpcRaw;
return grpcRaw ??= ByteString.CopyFrom(Raw);
}
public byte[] Raw { get; } = raw;
public override readonly bool Equals(object obj)
{
var chain = (FrostFsChain)obj;
return Equals(chain);
}
internal ByteString GetRaw()
{
return grpcRaw ??= ByteString.CopyFrom(Raw);
}
public override readonly int GetHashCode()
{
return Raw.GetHashCode();
}
public override readonly bool Equals(object obj)
{
var chain = (FrostFsChain)obj;
return Equals(chain);
}
public static bool operator ==(FrostFsChain left, FrostFsChain right)
{
return left.Equals(right);
}
public override readonly int GetHashCode()
{
return Raw.GetHashCode();
}
public static bool operator !=(FrostFsChain left, FrostFsChain right)
{
return !(left == right);
}
public static bool operator ==(FrostFsChain left, FrostFsChain right)
{
return left.Equals(right);
}
public static bool operator !=(FrostFsChain left, FrostFsChain right)
{
return !(left == right);
}
public readonly bool Equals(FrostFsChain other)
{
return Raw == other.Raw;
}
public readonly bool Equals(FrostFsChain other)
{
return Raw == other.Raw;
}
}

View file

@ -1,11 +1,10 @@
namespace FrostFS.SDK.ClientV2
namespace FrostFS.SDK.ClientV2;
public enum FrostFsTargetType
{
public enum FrostFsTargetType
{
Undefined = 0,
Namespace,
Container,
User,
Group
}
Undefined = 0,
Namespace,
Container,
User,
Group
}

View file

@ -15,7 +15,7 @@ public class ClientSettings
{
var errors = CheckFields();
if (errors != null)
ThrowException(errors);
ThrowSettingsException(errors);
}
protected Collection<string>? CheckFields()
@ -29,7 +29,7 @@ public class ClientSettings
return null;
}
protected static void ThrowException(Collection<string> errors)
protected static void ThrowSettingsException(Collection<string> errors)
{
if (errors is null)
{
@ -55,7 +55,7 @@ public class SingleOwnerClientSettings : ClientSettings
{
var errors = CheckFields();
if (errors != null)
ThrowException(errors);
ThrowSettingsException(errors);
}
protected new Collection<string>? CheckFields()

View file

@ -31,7 +31,7 @@ public class FrostFsContainerId
return this.modelId;
}
throw new InvalidObjectException();
throw new FrostFsInvalidObjectException();
}
internal ContainerID ContainerID
@ -47,7 +47,7 @@ public class FrostFsContainerId
return this.containerID;
}
throw new InvalidObjectException();
throw new FrostFsInvalidObjectException();
}
}

View file

@ -88,7 +88,7 @@ public class FrostFsContainerInfo
{
if (PlacementPolicy == null)
{
throw new InvalidObjectException("PlacementPolicy is null");
throw new ArgumentNullException("PlacementPolicy is null");
}
this.container = new Container.Container()

View file

@ -1,4 +1,4 @@
using FrostFS.SDK.ClientV2;
using System;
namespace FrostFS.SDK;
@ -67,7 +67,7 @@ public class FrostFsObject
public void SetParent(FrostFsObjectHeader largeObjectHeader)
{
if (Header?.Split == null)
throw new InvalidObjectException("The object is not initialized properly");
throw new ArgumentNullException(nameof(largeObjectHeader), "Split value must not be null");
Header.Split.ParentHeader = largeObjectHeader;
}

View file

@ -1,6 +1,9 @@
using System;
namespace FrostFS.SDK;
public class FrostFsSessionToken(byte[] token)
public class FrostFsSessionToken(byte[] token, Guid id)
{
public Guid Id { get; private set; } = id;
public byte[] Token { get; private set; } = token;
}

View file

@ -11,12 +11,12 @@ using Grpc.Core.Interceptors;
namespace FrostFS.SDK.ClientV2;
public class Context()
public class CallContext()
{
private ReadOnlyCollection<Interceptor>? interceptors;
private ByteString? publicKeyCache;
internal Action<Exception>? PoolErrorHandler { get; set; }
public ECDsa? Key { get; set; }
public FrostFsOwner? OwnerId { get; set; }
@ -31,11 +31,7 @@ public class Context()
public Action<CallStatistics>? Callback { get; set; }
public ReadOnlyCollection<Interceptor>? Interceptors
{
get { return this.interceptors; }
set { this.interceptors = value; }
}
public Collection<Interceptor> Interceptors { get; } = [];
public ByteString? GetPublicKeyCache()
{

View file

@ -7,5 +7,5 @@ public interface IContext
/// callbacks, interceptors.
/// </summary>
/// <value>Additional parameters for calling the method</value>
Context? Context { get; set; }
CallContext? Context { get; }
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmApeChainList(FrostFsChainTarget target) : PrmBase
public sealed class PrmApeChainList(FrostFsChainTarget target, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsChainTarget Target { get; } = target;
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmApeChainRemove(FrostFsChainTarget target, FrostFsChain chain) : PrmBase
public sealed class PrmApeChainRemove(FrostFsChainTarget target, FrostFsChain chain, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsChainTarget Target { get; } = target;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmApeChainAdd(FrostFsChainTarget target, FrostFsChain chain) : PrmBase
public sealed class PrmApeChainAdd(FrostFsChainTarget target, FrostFsChain chain, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsChainTarget Target { get; } = target;

View file

@ -0,0 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmBalance(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -2,7 +2,7 @@
namespace FrostFS.SDK.ClientV2;
public class PrmBase(NameValueCollection? xheaders = null) : IContext
public class PrmBase(CallContext? ctx, NameValueCollection? xheaders = null) : IContext
{
/// <summary>
/// FrostFS request X-Headers
@ -10,5 +10,5 @@ public class PrmBase(NameValueCollection? xheaders = null) : IContext
public NameValueCollection XHeaders { get; } = xheaders ?? [];
/// <inheritdoc />
public Context? Context { get; set; }
public CallContext Context { get; } = ctx ?? new CallContext();
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerCreate(FrostFsContainerInfo container) : PrmBase, ISessionToken
public sealed class PrmContainerCreate(FrostFsContainerInfo container, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerInfo Container { get; set; } = container;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerDelete(FrostFsContainerId containerId) : PrmBase, ISessionToken
public sealed class PrmContainerDelete(FrostFsContainerId containerId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerGet(FrostFsContainerId container) : PrmBase
public sealed class PrmContainerGet(FrostFsContainerId container, CallContext? ctx = null) : PrmBase(ctx)
{
public FrostFsContainerId Container { get; set; } = container;
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmContainerGetAll() : PrmBase()
public sealed class PrmContainerGetAll(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmNetmapSnapshot() : PrmBase
public sealed class PrmNetmapSnapshot(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmNetworkSettings() : PrmBase
public sealed class PrmNetworkSettings(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,5 +1,5 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmNodeInfo() : PrmBase
public sealed class PrmNodeInfo(CallContext? ctx = null) : PrmBase(ctx)
{
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectDelete(FrostFsContainerId containerId, FrostFsObjectId objectId) : PrmBase, ISessionToken
public sealed class PrmObjectDelete(FrostFsContainerId containerId, FrostFsObjectId objectId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectGet(FrostFsContainerId containerId, FrostFsObjectId objectId) : PrmBase, ISessionToken
public sealed class PrmObjectGet(FrostFsContainerId containerId, FrostFsObjectId objectId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectHeadGet(FrostFsContainerId containerId, FrostFsObjectId objectId) : PrmBase, ISessionToken
public sealed class PrmObjectHeadGet(FrostFsContainerId containerId, FrostFsObjectId objectId, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsContainerId ContainerId { get; set; } = containerId;

View file

@ -2,7 +2,7 @@ using System.IO;
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectPut : PrmBase, ISessionToken
public sealed class PrmObjectPut(CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
/// <summary>
/// Need to provide values like <c>ContainerId</c> and <c>ObjectType</c> to create and object.

View file

@ -2,7 +2,7 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmObjectSearch(FrostFsContainerId containerId, params IObjectFilter[] filters) : PrmBase, ISessionToken
public sealed class PrmObjectSearch(FrostFsContainerId containerId, CallContext? ctx = null, params IObjectFilter[] filters) : PrmBase(ctx), ISessionToken
{
/// <summary>
/// Defines container for the search

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmSessionCreate(ulong expiration) : PrmBase
public sealed class PrmSessionCreate(ulong expiration, CallContext? ctx = null) : PrmBase(ctx)
{
public ulong Expiration { get; set; } = expiration;
}

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
public sealed class PrmSingleObjectPut(FrostFsObject frostFsObject) : PrmBase, ISessionToken
public sealed class PrmSingleObjectPut(FrostFsObject frostFsObject, CallContext? ctx = null) : PrmBase(ctx), ISessionToken
{
public FrostFsObject FrostFsObject { get; set; } = frostFsObject;

View file

@ -0,0 +1,163 @@
using System;
using System.Threading;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
// clientStatusMonitor count error rate and other statistics for connection.
public class ClientStatusMonitor : IClientStatus
{
private static readonly MethodIndex[] MethodIndexes =
[
MethodIndex.methodBalanceGet,
MethodIndex.methodContainerPut,
MethodIndex.methodContainerGet,
MethodIndex.methodContainerList,
MethodIndex.methodContainerDelete,
MethodIndex.methodEndpointInfo,
MethodIndex.methodNetworkInfo,
MethodIndex.methodNetMapSnapshot,
MethodIndex.methodObjectPut,
MethodIndex.methodObjectDelete,
MethodIndex.methodObjectGet,
MethodIndex.methodObjectHead,
MethodIndex.methodObjectRange,
MethodIndex.methodObjectPatch,
MethodIndex.methodSessionCreate,
MethodIndex.methodAPEManagerAddChain,
MethodIndex.methodAPEManagerRemoveChain,
MethodIndex.methodAPEManagerListChains
];
public static string GetMethodName(MethodIndex index)
{
return index switch
{
MethodIndex.methodBalanceGet => "BalanceGet",
MethodIndex.methodContainerPut => "ContainerPut",
MethodIndex.methodContainerGet => "ContainerGet",
MethodIndex.methodContainerList => "ContainerList",
MethodIndex.methodContainerDelete => "ContainerDelete",
MethodIndex.methodEndpointInfo => "EndpointInfo",
MethodIndex.methodNetworkInfo => "NetworkInfo",
MethodIndex.methodNetMapSnapshot => "NetMapSnapshot",
MethodIndex.methodObjectPut => "ObjectPut",
MethodIndex.methodObjectDelete => "ObjectDelete",
MethodIndex.methodObjectGet => "ObjectGet",
MethodIndex.methodObjectHead => "ObjectHead",
MethodIndex.methodObjectRange => "ObjectRange",
MethodIndex.methodObjectPatch => "ObjectPatch",
MethodIndex.methodSessionCreate => "SessionCreate",
MethodIndex.methodAPEManagerAddChain => "APEManagerAddChain",
MethodIndex.methodAPEManagerRemoveChain => "APEManagerRemoveChain",
MethodIndex.methodAPEManagerListChains => "APEManagerListChains",
_ => throw new ArgumentException("Unknown method", nameof(index)),
};
}
private readonly object _lock = new();
private readonly ILogger? logger;
private int healthy;
public ClientStatusMonitor(ILogger? logger, string address)
{
this.logger = logger;
healthy = (int)HealthyStatus.Healthy;
Address = address;
Methods = new MethodStatus[MethodIndexes.Length];
for (int i = 0; i < MethodIndexes.Length; i++)
{
Methods[i] = new MethodStatus(GetMethodName(MethodIndexes[i]));
}
}
public string Address { get; }
internal uint ErrorThreshold { get; set; }
public uint CurrentErrorCount { get; set; }
public ulong OverallErrorCount { get; set; }
public MethodStatus[] Methods { get; private set; }
public bool IsHealthy()
{
var res = Interlocked.CompareExchange(ref healthy, -1, -1) == (int)HealthyStatus.Healthy;
return res;
}
public bool IsDialed()
{
return Interlocked.CompareExchange(ref healthy, -1, -1) != (int)HealthyStatus.UnhealthyOnDial;
}
public void SetHealthy()
{
Interlocked.Exchange(ref healthy, (int)HealthyStatus.Healthy);
}
public void SetUnhealthy()
{
Interlocked.Exchange(ref healthy, (int)HealthyStatus.UnhealthyOnRequest);
}
public void SetUnhealthyOnDial()
{
Interlocked.Exchange(ref healthy, (int)HealthyStatus.UnhealthyOnDial);
}
public void IncErrorRate()
{
bool thresholdReached;
lock (_lock)
{
CurrentErrorCount++;
OverallErrorCount++;
thresholdReached = CurrentErrorCount >= ErrorThreshold;
if (thresholdReached)
{
SetUnhealthy();
CurrentErrorCount = 0;
}
}
if (thresholdReached && logger != null)
{
FrostFsMessages.ErrorЕhresholdReached(logger, Address, ErrorThreshold);
}
}
public uint GetCurrentErrorRate()
{
lock (_lock)
{
return CurrentErrorCount;
}
}
public ulong GetOverallErrorRate()
{
lock (_lock)
{
return OverallErrorCount;
}
}
public StatusSnapshot[] MethodsStatus()
{
var result = new StatusSnapshot[Methods.Length];
for (int i = 0; i < result.Length; i++)
{
result[i] = Methods[i].Snapshot!;
}
return result;
}
}

View file

@ -0,0 +1,143 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
namespace FrostFS.SDK.ClientV2;
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
public class ClientWrapper : ClientStatusMonitor
{
private readonly object _lock = new();
private SessionCache sessionCache;
internal ClientWrapper(WrapperPrm wrapperPrm, Pool pool) : base(wrapperPrm.Logger, wrapperPrm.Address)
{
WrapperPrm = wrapperPrm;
ErrorThreshold = wrapperPrm.ErrorThreshold;
sessionCache = pool.SessionCache;
Client = new FrostFSClient(WrapperPrm, sessionCache);
}
internal FrostFSClient? Client { get; private set; }
internal WrapperPrm WrapperPrm { get; }
internal FrostFSClient? GetClient()
{
lock (_lock)
{
if (IsHealthy())
{
return Client;
}
return null;
}
}
// dial establishes a connection to the server from the FrostFS network.
// Returns an error describing failure reason. If failed, the client
// SHOULD NOT be used.
internal async Task Dial(CallContext ctx)
{
var client = GetClient() ?? throw new FrostFsInvalidObjectException("pool client unhealthy");
await client.Dial(ctx).ConfigureAwait(false);
}
internal void HandleError(Exception ex)
{
if (ex is FrostFsResponseException responseException && responseException.Status != null)
{
switch (responseException.Status.Code)
{
case FrostFsStatusCode.Internal:
case FrostFsStatusCode.WrongMagicNumber:
case FrostFsStatusCode.SignatureVerificationFailure:
case FrostFsStatusCode.NodeUnderMaintenance:
IncErrorRate();
return;
}
}
IncErrorRate();
}
private async Task ScheduleGracefulClose()
{
if (Client == null)
return;
await Task.Delay((int)WrapperPrm.GracefulCloseOnSwitchTimeout).ConfigureAwait(false);
Client.Close();
}
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
internal async Task<bool> RestartIfUnhealthy(CallContext ctx)
{
bool wasHealthy;
try
{
var prmNodeInfo = new PrmNodeInfo(ctx);
var response = await Client!.GetNodeInfoAsync(prmNodeInfo).ConfigureAwait(false);
return false;
}
catch (RpcException)
{
wasHealthy = true;
}
// if connection is dialed before, to avoid routine/connection leak,
// pool has to close it and then initialize once again.
if (IsDialed())
{
await ScheduleGracefulClose().ConfigureAwait(false);
}
#pragma warning disable CA2000 // Dispose objects before losing scope: will be disposed manually
FrostFSClient client = new(WrapperPrm, sessionCache);
#pragma warning restore CA2000
//TODO: set additioanl params
var error = await client.Dial(ctx).ConfigureAwait(false);
if (!string.IsNullOrEmpty(error))
{
SetUnhealthyOnDial();
return wasHealthy;
}
lock (_lock)
{
Client = client;
}
try
{
var prmNodeInfo = new PrmNodeInfo(ctx);
var res = await client.GetNodeInfoAsync(prmNodeInfo).ConfigureAwait(false);
}
catch (FrostFsException)
{
SetUnhealthy();
return wasHealthy;
}
SetHealthy();
return !wasHealthy;
}
internal void IncRequests(ulong elapsed, MethodIndex method)
{
var methodStat = Methods[(int)method];
methodStat.IncRequests(elapsed);
}
}

View file

@ -0,0 +1,18 @@
namespace FrostFS.SDK.ClientV2;
// values for healthy status of clientStatusMonitor.
public enum HealthyStatus
{
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
UnhealthyOnDial,
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
UnhealthyOnRequest,
// statusHealthy is set when connection is ready to be used by the pool.
Healthy
}

View file

@ -0,0 +1,28 @@
namespace FrostFS.SDK.ClientV2;
public interface IClientStatus
{
// isHealthy checks if the connection can handle requests.
bool IsHealthy();
// isDialed checks if the connection was created.
bool IsDialed();
// setUnhealthy marks client as unhealthy.
void SetUnhealthy();
// address return address of endpoint.
string Address { get; }
// currentErrorRate returns current errors rate.
// After specific threshold connection is considered as unhealthy.
// Pool.startRebalance routine can make this connection healthy again.
uint GetCurrentErrorRate();
// overallErrorRate returns the number of all happened errors.
ulong GetOverallErrorRate();
// methodsStatus returns statistic for all used methods.
StatusSnapshot[] MethodsStatus();
}

View file

@ -0,0 +1,36 @@
using System;
using System.Security.Cryptography;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
// InitParameters contains values used to initialize connection Pool.
public class InitParameters
{
public ECDsa? Key { get; set; }
public ulong NodeDialTimeout { get; set; }
public ulong NodeStreamTimeout { get; set; }
public ulong HealthcheckTimeout { get; set; }
public ulong ClientRebalanceInterval { get; set; }
public ulong SessionExpirationDuration { get; set; }
public uint ErrorThreshold { get; set; }
public NodeParam[]? NodeParams { get; set; }
public GrpcChannelOptions[]? DialOptions { get; set; }
public Func<string, ClientWrapper>? ClientBuilder { get; set; }
public ulong GracefulCloseOnSwitchTimeout { get; set; }
public ILogger? Logger { get; set; }
}

View file

@ -0,0 +1,47 @@
using FrostFS.SDK.ClientV2;
internal sealed class InnerPool
{
private readonly object _lock = new();
internal InnerPool(Sampler sampler, ClientWrapper[] clients)
{
Clients = clients;
Sampler = sampler;
}
internal Sampler Sampler { get; set; }
internal ClientWrapper[] Clients { get; }
internal ClientWrapper? Connection()
{
lock (_lock)
{
if (Clients.Length == 1)
{
var client = Clients[0];
if (client.IsHealthy())
{
return client;
}
}
else
{
var attempts = 3 * Clients.Length;
for (int i = 0; i < attempts; i++)
{
int index = Sampler.Next();
if (Clients[index].IsHealthy())
{
return Clients[index];
}
}
}
return null;
}
}
}

View file

@ -0,0 +1,24 @@
namespace FrostFS.SDK.ClientV2;
public enum MethodIndex
{
methodBalanceGet,
methodContainerPut,
methodContainerGet,
methodContainerList,
methodContainerDelete,
methodEndpointInfo,
methodNetworkInfo,
methodNetMapSnapshot,
methodObjectPut,
methodObjectDelete,
methodObjectGet,
methodObjectHead,
methodObjectRange,
methodObjectPatch,
methodSessionCreate,
methodAPEManagerAddChain,
methodAPEManagerRemoveChain,
methodAPEManagerListChains,
methodLast
}

View file

@ -0,0 +1,19 @@
namespace FrostFS.SDK.ClientV2;
public class MethodStatus(string name)
{
private readonly object _lock = new();
public string Name { get; } = name;
public StatusSnapshot Snapshot { get; set; } = new StatusSnapshot();
internal void IncRequests(ulong elapsed)
{
lock (_lock)
{
Snapshot.AllTime += elapsed;
Snapshot.AllRequests++;
}
}
}

View file

@ -0,0 +1,12 @@
namespace FrostFS.SDK.ClientV2;
// NodeParam groups parameters of remote node.
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1815:Override equals and operator equals on value types", Justification = "<Pending>")]
public readonly struct NodeParam(int priority, string address, float weight)
{
public int Priority { get; } = priority;
public string Address { get; } = address;
public float Weight { get; } = weight;
}

View file

@ -0,0 +1,12 @@
namespace FrostFS.SDK.ClientV2;
public class NodeStatistic
{
public string? Address { get; internal set; }
public StatusSnapshot[]? Methods { get; internal set; }
public ulong OverallErrors { get; internal set; }
public uint CurrentErrors { get; internal set; }
}

View file

@ -0,0 +1,12 @@
using System.Collections.ObjectModel;
namespace FrostFS.SDK.ClientV2;
public class NodesParam(int priority)
{
public int Priority { get; } = priority;
public Collection<string> Addresses { get; } = [];
public Collection<double> Weights { get; } = [];
}

View file

@ -0,0 +1,776 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Frostfs.V2.Ape;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using Grpc.Core;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
public partial class Pool : IFrostFSClient
{
const int defaultSessionTokenExpirationDuration = 100; // in epochs
const int defaultErrorThreshold = 100;
const int defaultGracefulCloseOnSwitchTimeout = 10; //Seconds;
const int defaultRebalanceInterval = 15; //Seconds;
const int defaultHealthcheckTimeout = 4; //Seconds;
const int defaultDialTimeout = 5; //Seconds;
const int defaultStreamTimeout = 10; //Seconds;
private readonly object _lock = new();
private InnerPool[]? InnerPools { get; set; }
private ECDsa Key { get; set; }
private string PublicKey { get; }
private OwnerID? _ownerId;
private FrostFsOwner? _owner;
private FrostFsOwner Owner
{
get
{
_owner ??= new FrostFsOwner(Key.PublicKey().PublicKeyToAddress());
return _owner;
}
}
private OwnerID OwnerId
{
get
{
if (_ownerId == null)
{
_owner = new FrostFsOwner(Key.PublicKey().PublicKeyToAddress());
_ownerId = _owner.ToMessage();
}
return _ownerId;
}
}
internal CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
internal SessionCache SessionCache { get; set; }
private ulong SessionTokenDuration { get; set; }
private RebalanceParameters RebalanceParams { get; set; }
private Func<string, ClientWrapper> ClientBuilder;
private bool disposedValue;
private ILogger? logger { get; set; }
private ulong MaxObjectSize { get; set; }
public IClientStatus? ClientStatus { get; }
// NewPool creates connection pool using parameters.
public Pool(InitParameters options)
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}
if (options.Key == null)
{
throw new ArgumentException($"Missed required parameter {nameof(options.Key)}");
}
var nodesParams = AdjustNodeParams(options.NodeParams);
var cache = new SessionCache(options.SessionExpirationDuration);
FillDefaultInitParams(options, this);
Key = options.Key;
PublicKey = $"{Key.PublicKey()}";
SessionCache = cache;
logger = options.Logger;
SessionTokenDuration = options.SessionExpirationDuration;
RebalanceParams = new RebalanceParameters(
nodesParams.ToArray(),
options.HealthcheckTimeout,
options.ClientRebalanceInterval,
options.SessionExpirationDuration);
ClientBuilder = options.ClientBuilder!;
}
private void SetupContext(CallContext ctx)
{
if (ctx == null)
{
throw new ArgumentNullException(nameof(ctx));
}
ctx.Key ??= Key;
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
public async Task<string?> Dial(CallContext ctx)
{
SetupContext(ctx);
var inner = new InnerPool[RebalanceParams.NodesParams.Length];
bool atLeastOneHealthy = false;
int i = 0;
foreach (var nodeParams in RebalanceParams.NodesParams)
{
var clients = new ClientWrapper[nodeParams.Weights.Count];
for (int j = 0; j < nodeParams.Addresses.Count; j++)
{
ClientWrapper? client = null;
bool dialed = false;
try
{
client = clients[j] = ClientBuilder(nodeParams.Addresses[j]);
await client.Dial(ctx).ConfigureAwait(false);
dialed = true;
var token = await InitSessionForDuration(ctx, client, RebalanceParams.SessionExpirationDuration, Key, false)
.ConfigureAwait(false);
var key = FormCacheKey(nodeParams.Addresses[j], Key.PrivateKey().ToString());
_ = SessionCache.Cache[key] = token;
atLeastOneHealthy = true;
}
catch (RpcException ex)
{
if (!dialed)
client!.SetUnhealthyOnDial();
else
client!.SetUnhealthy();
if (logger != null)
{
FrostFsMessages.SessionCreationError(logger, client!.WrapperPrm.Address, ex.Message);
}
}
catch (FrostFsInvalidObjectException)
{
break;
}
}
var sampler = new Sampler(nodeParams.Weights.ToArray());
inner[i] = new InnerPool(sampler, clients);
i++;
}
if (!atLeastOneHealthy)
return "At least one node must be healthy";
InnerPools = inner;
var res = await GetNetworkSettingsAsync(new PrmNetworkSettings(ctx)).ConfigureAwait(false);
MaxObjectSize = res.MaxObjectSize;
StartRebalance(ctx);
return null;
}
private static IEnumerable<NodesParam> AdjustNodeParams(NodeParam[]? nodeParams)
{
if (nodeParams == null || nodeParams.Length == 0)
{
throw new ArgumentException("No FrostFS peers configured");
}
Dictionary<int, NodesParam> nodesParamsDict = new(nodeParams.Length);
foreach (var nodeParam in nodeParams)
{
if (!nodesParamsDict.TryGetValue(nodeParam.Priority, out var nodes))
{
nodes = new NodesParam(nodeParam.Priority);
nodesParamsDict[nodeParam.Priority] = nodes;
}
nodes.Addresses.Add(nodeParam.Address);
nodes.Weights.Add(nodeParam.Weight);
}
var nodesParams = new List<NodesParam>(nodesParamsDict.Count);
foreach (var key in nodesParamsDict.Keys)
{
var nodes = nodesParamsDict[key];
var newWeights = AdjustWeights([.. nodes.Weights]);
nodes.Weights.Clear();
foreach (var weight in newWeights)
{
nodes.Weights.Add(weight);
}
nodesParams.Add(nodes);
}
return nodesParams.OrderBy(n => n.Priority);
}
private static double[] AdjustWeights(double[] weights)
{
var adjusted = new double[weights.Length];
var sum = weights.Sum();
if (sum > 0)
{
for (int i = 0; i < weights.Length; i++)
{
adjusted[i] = weights[i] / sum;
}
}
return adjusted;
}
private static void FillDefaultInitParams(InitParameters parameters, Pool pool)
{
if (parameters.SessionExpirationDuration == 0)
parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration;
if (parameters.ErrorThreshold == 0)
parameters.ErrorThreshold = defaultErrorThreshold;
if (parameters.ClientRebalanceInterval <= 0)
parameters.ClientRebalanceInterval = defaultRebalanceInterval;
if (parameters.GracefulCloseOnSwitchTimeout <= 0)
parameters.GracefulCloseOnSwitchTimeout = defaultGracefulCloseOnSwitchTimeout;
if (parameters.HealthcheckTimeout <= 0)
parameters.HealthcheckTimeout = defaultHealthcheckTimeout;
if (parameters.NodeDialTimeout <= 0)
parameters.NodeDialTimeout = defaultDialTimeout;
if (parameters.NodeStreamTimeout <= 0)
parameters.NodeStreamTimeout = defaultStreamTimeout;
if (parameters.SessionExpirationDuration == 0)
parameters.SessionExpirationDuration = defaultSessionTokenExpirationDuration;
parameters.ClientBuilder ??= new Func<string, ClientWrapper>((address) =>
{
var wrapperPrm = new WrapperPrm
{
Address = address,
Key = parameters.Key,
Logger = parameters.Logger,
DialTimeout = parameters.NodeDialTimeout,
StreamTimeout = parameters.NodeStreamTimeout,
ErrorThreshold = parameters.ErrorThreshold,
GracefulCloseOnSwitchTimeout = parameters.GracefulCloseOnSwitchTimeout
};
return new ClientWrapper(wrapperPrm, pool);
}
);
}
private ClientWrapper Connection()
{
foreach (var pool in InnerPools!)
{
var client = pool.Connection();
if (client != null)
{
return client;
}
}
throw new FrostFsException("Cannot find alive client");
}
private static async Task<FrostFsSessionToken?> InitSessionForDuration(CallContext ctx, ClientWrapper cw, ulong duration, ECDsa key, bool clientCut)
{
var client = cw.Client;
var networkInfo = await client!.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx)).ConfigureAwait(false);
var epoch = networkInfo.Epoch;
ulong exp = ulong.MaxValue - epoch < duration
? ulong.MaxValue
: epoch + duration;
var prmSessionCreate = new PrmSessionCreate(exp, ctx);
return await client.CreateSessionAsync(prmSessionCreate).ConfigureAwait(false);
}
internal static string FormCacheKey(string address, string key)
{
return $"{address}{key}";
}
public void Close()
{
CancellationTokenSource.Cancel();
if (InnerPools != null)
{
// close all clients
foreach (var innerPool in InnerPools)
foreach (var client in innerPool.Clients)
if (client.IsDialed())
client.Client?.Close();
}
}
// startRebalance runs loop to monitor connection healthy status.
internal void StartRebalance(CallContext ctx)
{
var buffers = new double[RebalanceParams.NodesParams.Length][];
for (int i = 0; i < RebalanceParams.NodesParams.Length; i++)
{
var parameters = RebalanceParams.NodesParams[i];
buffers[i] = new double[parameters.Weights.Count];
Task.Run(async () =>
{
await Task.Delay((int)RebalanceParams.ClientRebalanceInterval).ConfigureAwait(false);
UpdateNodesHealth(ctx, buffers);
});
}
}
private void UpdateNodesHealth(CallContext ctx, double[][] buffers)
{
var tasks = new Task[InnerPools!.Length];
for (int i = 0; i < InnerPools.Length; i++)
{
var bufferWeights = buffers[i];
tasks[i] = Task.Run(() => UpdateInnerNodesHealth(ctx, i, bufferWeights));
}
Task.WaitAll(tasks);
}
private async ValueTask UpdateInnerNodesHealth(CallContext ctx, int poolIndex, double[] bufferWeights)
{
if (poolIndex > InnerPools!.Length - 1)
{
return;
}
var pool = InnerPools[poolIndex];
var options = RebalanceParams;
int healthyChanged = 0;
var tasks = new Task[pool.Clients.Length];
for (int j = 0; j < pool.Clients.Length; j++)
{
var client = pool.Clients[j];
var healthy = false;
string? error = null;
var changed = false;
try
{
// check timeout settings
changed = await client!.RestartIfUnhealthy(ctx).ConfigureAwait(false);
healthy = true;
bufferWeights[j] = options.NodesParams[poolIndex].Weights[j];
}
// TODO: specify
catch (FrostFsException e)
{
error = e.Message;
bufferWeights[j] = 0;
SessionCache.DeleteByPrefix(client.Address);
}
if (changed)
{
if (!string.IsNullOrEmpty(error))
{
if (logger != null)
{
FrostFsMessages.HealthChanged(logger, client.Address, healthy, error!);
}
Interlocked.Exchange(ref healthyChanged, 1);
}
}
await Task.WhenAll(tasks).ConfigureAwait(false);
if (Interlocked.CompareExchange(ref healthyChanged, -1, -1) == 1)
{
var probabilities = AdjustWeights(bufferWeights);
lock (_lock)
{
pool.Sampler = new Sampler(probabilities);
}
}
}
}
// TODO: remove
private bool CheckSessionTokenErr(Exception error, string address)
{
if (error == null)
{
return false;
}
if (error is SessionNotFoundException || error is SessionExpiredException)
{
this.SessionCache.DeleteByPrefix(address);
return true;
}
return false;
}
public Statistic Statistic()
{
if (InnerPools == null)
{
throw new FrostFsInvalidObjectException(nameof(Pool));
}
var statistics = new Statistic();
foreach (var inner in InnerPools)
{
int valueIndex = 0;
var nodes = new string[inner.Clients.Length];
lock (_lock)
{
foreach (var client in inner.Clients)
{
if (client.IsHealthy())
{
nodes[valueIndex] = client.Address;
}
var node = new NodeStatistic
{
Address = client.Address,
Methods = client.MethodsStatus(),
OverallErrors = client.GetOverallErrorRate(),
CurrentErrors = client.GetCurrentErrorRate()
};
statistics.Nodes.Add(node);
valueIndex++;
statistics.OverallErrors += node.OverallErrors;
}
if (statistics.CurrentNodes == null || statistics.CurrentNodes.Length == 0)
{
statistics.CurrentNodes = nodes;
}
}
}
return statistics;
}
public async Task<FrostFsNetmapSnapshot> GetNetmapSnapshotAsync(PrmNetmapSnapshot? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNetmapSnapshotAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsNodeInfo> GetNodeInfoAsync(PrmNodeInfo? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNodeInfoAsync(args).ConfigureAwait(false);
}
public async Task<NetworkSettings> GetNetworkSettingsAsync(PrmNetworkSettings? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetNetworkSettingsAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsSessionToken> CreateSessionAsync(PrmSessionCreate args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.CreateSessionAsync(args).ConfigureAwait(false);
}
public async Task<byte[]> AddChainAsync(PrmApeChainAdd args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.AddChainAsync(args).ConfigureAwait(false);
}
public async Task RemoveChainAsync(PrmApeChainRemove args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.RemoveChainAsync(args).ConfigureAwait(false);
}
public async Task<Chain[]> ListChainAsync(PrmApeChainList args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.ListChainAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsContainerInfo> GetContainerAsync(PrmContainerGet args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetContainerAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsContainerId> ListContainersAsync(PrmContainerGetAll? args = null)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return client.Client!.ListContainersAsync(args);
}
public async Task<FrostFsContainerId> CreateContainerAsync(PrmContainerCreate args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.CreateContainerAsync(args).ConfigureAwait(false);
}
public async Task DeleteContainerAsync(PrmContainerDelete args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.DeleteContainerAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetObjectHeadAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.PutObjectAsync(args).ConfigureAwait(false);
}
public async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.PutSingleObjectAsync(args).ConfigureAwait(false);
}
public async Task DeleteObjectAsync(PrmObjectDelete args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
await client.Client!.DeleteObjectAsync(args).ConfigureAwait(false);
}
public IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args)
{
if (args is null)
{
throw new ArgumentNullException(nameof(args));
}
var client = Connection();
args.Context.PoolErrorHandler = client.HandleError;
return client.Client!.SearchObjectsAsync(args);
}
public async Task<Accounting.Decimal> GetBalanceAsync(PrmBalance? args)
{
var client = Connection();
args ??= new();
args.Context.PoolErrorHandler = client.HandleError;
return await client.Client!.GetBalanceAsync(args).ConfigureAwait(false);
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
Close();
}
disposedValue = true;
}
}
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
public FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx)
{
throw new NotImplementedException();
}
}

View file

@ -0,0 +1,16 @@
namespace FrostFS.SDK.ClientV2;
public class RebalanceParameters(
NodesParam[] nodesParams,
ulong nodeRequestTimeout,
ulong clientRebalanceInterval,
ulong sessionExpirationDuration)
{
public NodesParam[] NodesParams { get; set; } = nodesParams;
public ulong NodeRequestTimeout { get; set; } = nodeRequestTimeout;
public ulong ClientRebalanceInterval { get; set; } = clientRebalanceInterval;
public ulong SessionExpirationDuration { get; set; } = sessionExpirationDuration;
}

View file

@ -0,0 +1,14 @@
using System;
namespace FrostFS.SDK.ClientV2;
// RequestInfo groups info about pool request.
struct RequestInfo
{
public string Address { get; set; }
public MethodIndex MethodIndex { get; set; }
public TimeSpan Elapsed { get; set; }
}

View file

@ -0,0 +1,85 @@
using System;
namespace FrostFS.SDK.ClientV2;
internal sealed class Sampler
{
private readonly object _lock = new();
private Random random = new();
internal double[] Probabilities { get; set; }
internal int[] Alias { get; set; }
internal Sampler(double[] probabilities)
{
var small = new WorkList();
var large = new WorkList();
var n = probabilities.Length;
// sampler.randomGenerator = rand.New(source)
Probabilities = new double[n];
Alias = new int[n];
// Compute scaled probabilities.
var p = new double[n];
for (int i = 0; i < n; i++)
{
p[i] = probabilities[i] * n;
if (p[i] < 1)
small.Add(i);
else
large.Add(i);
}
while (small.Length > 0 && large.Length > 0)
{
var l = small.Remove();
var g = large.Remove();
Probabilities[l] = p[l];
Alias[l] = g;
p[g] = p[g] + p[l] - 1;
if (p[g] < 1)
small.Add(g);
else
large.Add(g);
}
while (large.Length > 0)
{
var g = large.Remove();
Probabilities[g] = 1;
}
while (small.Length > 0)
{
var l = small.Remove();
probabilities[l] = 1;
}
}
internal int Next()
{
var n = Alias.Length;
int i;
double f;
lock (_lock)
{
i = random.Next(0, n - 1);
f = random.NextDouble();
}
if (f < Probabilities[i])
{
return i;
}
return Alias[i];
}
}

View file

@ -0,0 +1,24 @@
using System;
using System.Collections;
namespace FrostFS.SDK.ClientV2;
internal struct SessionCache(ulong sessionExpirationDuration)
{
internal Hashtable Cache { get; } = [];
internal ulong CurrentEpoch { get; set; }
internal ulong TokenDuration { get; set; } = sessionExpirationDuration;
internal void DeleteByPrefix(string prefix)
{
foreach (var key in Cache.Keys)
{
if (((string)key).StartsWith(prefix, StringComparison.Ordinal))
{
Cache.Remove(key);
}
}
}
}

View file

@ -0,0 +1,12 @@
using System.Collections.ObjectModel;
namespace FrostFS.SDK.ClientV2;
public sealed class Statistic
{
public ulong OverallErrors { get; internal set; }
public Collection<NodeStatistic> Nodes { get; } = [];
public string[]? CurrentNodes { get; internal set; }
}

View file

@ -0,0 +1,8 @@
namespace FrostFS.SDK.ClientV2;
public class StatusSnapshot()
{
public ulong AllTime { get; internal set; }
public ulong AllRequests { get; internal set; }
}

View file

@ -0,0 +1,26 @@
using System.Collections.Generic;
using System.Linq;
namespace FrostFS.SDK.ClientV2;
internal sealed class WorkList
{
private readonly List<int> elements = [];
internal int Length
{
get { return elements.Count; }
}
internal void Add(int element)
{
elements.Add(element);
}
internal int Remove()
{
int last = elements.LastOrDefault();
elements.RemoveAt(elements.Count - 1);
return last;
}
}

View file

@ -0,0 +1,34 @@
using System;
using System.Security.Cryptography;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
namespace FrostFS.SDK.ClientV2;
// wrapperPrm is params to create clientWrapper.
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1815:Override equals and operator equals on value types", Justification = "<Pending>")]
public struct WrapperPrm
{
internal ILogger? Logger { get; set; }
internal string Address { get; set; }
internal ECDsa? Key { get; set; }
internal ulong DialTimeout { get; set; }
internal ulong StreamTimeout { get; set; }
internal uint ErrorThreshold { get; set; }
internal Action ResponseInfoCallback { get; set; }
internal Action PoolRequestInfoCallback { get; set; }
internal GrpcChannelOptions GrpcChannelOptions { get; set; }
internal ulong GracefulCloseOnSwitchTimeout { get; set; }
}

View file

@ -0,0 +1,40 @@
using System.Threading.Tasks;
using FrostFS.Accounting;
namespace FrostFS.SDK.ClientV2;
internal sealed class AccountingServiceProvider : ContextAccessor
{
private readonly AccountingService.AccountingServiceClient? _accountingServiceClient;
internal AccountingServiceProvider(
AccountingService.AccountingServiceClient? accountingServiceClient,
ClientContext context)
: base(context)
{
_accountingServiceClient = accountingServiceClient;
}
internal async Task<Decimal> GetBallance(PrmBalance args)
{
var ctx = args.Context!;
BalanceRequest request = new()
{
Body = new()
{
OwnerId = ctx.OwnerId!.OwnerID
}
};
request.AddMetaHeader(args.XHeaders);
request.Sign(ctx.Key!);
var response = await _accountingServiceClient!.BalanceAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
return response.Body.Balance;
}
}

View file

@ -1,15 +1,16 @@
using System;
using System.Threading.Tasks;
using Frostfs.V2.Ape;
using Frostfs.V2.Apemanager;
namespace FrostFS.SDK.ClientV2;
namespace FrostFS.SDK.ClientV2.Services;
internal sealed class ApeManagerServiceProvider : ContextAccessor
{
private readonly APEManagerService.APEManagerServiceClient? _apeManagerServiceClient;
internal ApeManagerServiceProvider(APEManagerService.APEManagerServiceClient? apeManagerServiceClient, ClientEnvironment context)
internal ApeManagerServiceProvider(APEManagerService.APEManagerServiceClient? apeManagerServiceClient, ClientContext context)
: base(context)
{
_apeManagerServiceClient = apeManagerServiceClient;
@ -18,10 +19,10 @@ internal sealed class ApeManagerServiceProvider : ContextAccessor
internal async Task<byte[]> AddChainAsync(PrmApeChainAdd args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
AddChainRequest request = new()
{
@ -45,10 +46,10 @@ internal sealed class ApeManagerServiceProvider : ContextAccessor
internal async Task RemoveChainAsync(PrmApeChainRemove args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
RemoveChainRequest request = new()
{
@ -70,10 +71,10 @@ internal sealed class ApeManagerServiceProvider : ContextAccessor
internal async Task<Chain[]> ListChainAsync(PrmApeChainList args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
ListChainsRequest request = new()
{

View file

@ -12,20 +12,28 @@ using FrostFS.Session;
namespace FrostFS.SDK.ClientV2;
internal sealed class ContainerServiceProvider(ContainerService.ContainerServiceClient service, ClientEnvironment context) : ContextAccessor(context), ISessionProvider
internal sealed class ContainerServiceProvider(ContainerService.ContainerServiceClient service, ClientContext clientCtx) : ContextAccessor(clientCtx), ISessionProvider
{
readonly SessionProvider sessions = new(context);
private SessionProvider? sessions;
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
{
sessions ??= new(ClientContext);
if (ClientContext.SessionCache.Cache != null &&
ClientContext.SessionCache.Cache.ContainsKey(ClientContext.SessionCacheKey))
{
return (SessionToken)ClientContext.SessionCache.Cache[ClientContext.SessionCacheKey];
}
return await sessions.GetOrCreateSession(args, ctx).ConfigureAwait(false);
}
internal async Task<FrostFsContainerInfo> GetContainerAsync(PrmContainerGet args)
{
GetRequest request = GetContainerRequest(args.Container.ContainerID, args.XHeaders, args.Context!);
GetRequest request = GetContainerRequest(args.Container.ContainerID, args.XHeaders, args.Context);
var response = await service.GetAsync(request, null, args.Context!.Deadline, args.Context.CancellationToken);
var response = await service.GetAsync(request, null, args.Context.Deadline, args.Context.CancellationToken);
Verifier.CheckResponse(response);
@ -35,13 +43,13 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
internal async IAsyncEnumerable<FrostFsContainerId> ListContainersAsync(PrmContainerGetAll args)
{
var ctx = args.Context!;
ctx.OwnerId ??= Context.Owner;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.OwnerId ??= ClientContext.Owner;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
if (ctx.OwnerId == null)
throw new InvalidObjectException(nameof(ctx.OwnerId));
throw new ArgumentException(nameof(ctx.OwnerId));
var request = new ListRequest
{
@ -74,11 +82,11 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
grpcContainer.Version ??= ctx.Version?.ToMessage();
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
if (grpcContainer.OwnerId == null)
throw new InvalidObjectException(nameof(grpcContainer.OwnerId));
throw new ArgumentException(nameof(grpcContainer.OwnerId));
if (grpcContainer.Version == null)
throw new InvalidObjectException(nameof(grpcContainer.Version));
throw new ArgumentException(nameof(grpcContainer.Version));
var request = new PutRequest
{
@ -114,7 +122,7 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new DeleteRequest
{
@ -147,10 +155,10 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
Verifier.CheckResponse(response);
}
private static GetRequest GetContainerRequest(ContainerID id, NameValueCollection? xHeaders, Context ctx)
private static GetRequest GetContainerRequest(ContainerID id, NameValueCollection? xHeaders, CallContext ctx)
{
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(ctx), "Key is null");
var request = new GetRequest
{
@ -172,7 +180,7 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
Removed
}
private async Task WaitForContainer(WaitExpects expect, ContainerID id, PrmWait? waitParams, Context ctx)
private async Task WaitForContainer(WaitExpects expect, ContainerID id, PrmWait? waitParams, CallContext ctx)
{
var request = GetContainerRequest(id, null, ctx);
@ -207,7 +215,7 @@ internal sealed class ContainerServiceProvider(ContainerService.ContainerService
await Task.Delay(waitParams.PollInterval).ConfigureAwait(false);
}
catch (ResponseException ex)
catch (FrostFsResponseException ex)
{
if (DateTime.UtcNow >= deadLine)
throw new TimeoutException();

View file

@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -12,27 +13,33 @@ internal sealed class NetmapServiceProvider : ContextAccessor
{
private readonly NetmapService.NetmapServiceClient netmapServiceClient;
internal NetmapServiceProvider(NetmapService.NetmapServiceClient netmapServiceClient, ClientEnvironment context)
internal NetmapServiceProvider(NetmapService.NetmapServiceClient netmapServiceClient, ClientContext context)
: base(context)
{
this.netmapServiceClient = netmapServiceClient;
}
internal async Task<NetworkSettings> GetNetworkSettingsAsync(Context ctx)
internal async Task<NetworkSettings> GetNetworkSettingsAsync(CallContext ctx)
{
if (Context.NetworkSettings != null)
return Context.NetworkSettings;
if (ClientContext.NetworkSettings != null)
return ClientContext.NetworkSettings;
var info = await GetNetworkInfoAsync(ctx).ConfigureAwait(false);
var response = await GetNetworkInfoAsync(ctx).ConfigureAwait(false);
var settings = new NetworkSettings();
foreach (var param in info.Body.NetworkInfo.NetworkConfig.Parameters)
var info = response.Body.NetworkInfo;
settings.Epoch = info.CurrentEpoch;
settings.MagicNumber = info.MagicNumber;
settings.MsPerBlock = info.MsPerBlock;
foreach (var param in info.NetworkConfig.Parameters)
{
SetNetworksParam(param, settings);
}
Context.NetworkSettings = settings;
ClientContext.NetworkSettings = settings;
return settings;
}
@ -40,10 +47,10 @@ internal sealed class NetmapServiceProvider : ContextAccessor
internal async Task<FrostFsNodeInfo> GetLocalNodeInfoAsync(PrmNodeInfo args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new LocalNodeInfoRequest
{
@ -60,12 +67,12 @@ internal sealed class NetmapServiceProvider : ContextAccessor
return response.Body.ToModel();
}
internal async Task<NetworkInfoResponse> GetNetworkInfoAsync(Context ctx)
internal async Task<NetworkInfoResponse> GetNetworkInfoAsync(CallContext ctx)
{
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(ctx), "Key is null");
var request = new NetworkInfoRequest();
@ -83,10 +90,10 @@ internal sealed class NetmapServiceProvider : ContextAccessor
internal async Task<FrostFsNetmapSnapshot> GetNetmapSnapshotAsync(PrmNetmapSnapshot args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new NetmapSnapshotRequest();

View file

@ -15,30 +15,32 @@ using Google.Protobuf;
namespace FrostFS.SDK.ClientV2;
internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientContext clientCtx)
: ContextAccessor(clientCtx), ISessionProvider
{
private readonly SessionProvider sessions;
private ObjectService.ObjectServiceClient client;
private SessionProvider? sessions;
private readonly ObjectService.ObjectServiceClient client = client;
internal ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment env)
: base(env)
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
{
this.sessions = new(Context);
this.client = client;
}
sessions ??= new(ClientContext);
if (ClientContext.SessionCache.Cache != null &&
ClientContext.SessionCache.Cache.ContainsKey(ClientContext.SessionCacheKey))
{
return (SessionToken)ClientContext.SessionCache.Cache[ClientContext.SessionCacheKey];
}
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
{
return await sessions.GetOrCreateSession(args, ctx).ConfigureAwait(false);
}
internal async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new HeadRequest
{
@ -74,10 +76,10 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new GetRequest
{
@ -108,10 +110,10 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
internal async Task DeleteObjectAsync(PrmObjectDelete args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new DeleteRequest
{
@ -145,7 +147,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new SearchRequest
{
@ -183,10 +185,10 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
throw new ArgumentNullException(nameof(args));
if (args.Header == null)
throw new ArgumentException(nameof(args.Header));
throw new ArgumentNullException(nameof(args), "Header is null");
if (args.Payload == null)
throw new ArgumentException(nameof(args.Payload));
throw new ArgumentNullException(nameof(args), "Payload is null");
if (args.ClientCut)
return await PutClientCutObject(args).ConfigureAwait(false);
@ -206,7 +208,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ctx);
@ -238,7 +240,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var ctx = args.Context!;
var tokenRaw = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var token = new FrostFsSessionToken(tokenRaw.Serialize());
var token = new FrostFsSessionToken(tokenRaw.Serialize(), tokenRaw.Body.Id.ToUuid());
args.SessionToken = token;
@ -254,7 +256,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
if (args.MaxObjectSizeCache == 0)
{
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings() { Context = ctx })
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx))
.ConfigureAwait(false);
args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
@ -306,7 +308,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds);
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject) { Context = args.Context }).ConfigureAwait(false);
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject, args.Context)).ConfigureAwait(false);
var parentHeader = args.Header.GetHeader();
@ -331,7 +333,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
var payload = args.Payload!;
@ -352,7 +354,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
}
else
{
chunkBuffer = Context.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
chunkBuffer = ClientContext.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
isRentBuffer = true;
}
@ -404,12 +406,12 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
}
}
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, CallContext ctx)
{
var header = args.Header!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new ArgumentNullException(nameof(args), "Key is null");
header.OwnerId ??= ctx.OwnerId;
header.Version ??= ctx.Version;
@ -449,7 +451,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return await PutObjectInit(initRequest, ctx).ConfigureAwait(false);
}
private async Task<FrostFsObject> GetObject(GetRequest request, Context ctx)
private async Task<FrostFsObject> GetObject(GetRequest request, CallContext ctx)
{
var reader = GetObjectInit(request, ctx);
@ -461,7 +463,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return modelObject;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
private ObjectReader GetObjectInit(GetRequest initRequest, CallContext ctx)
{
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
@ -471,7 +473,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return new ObjectReader(call);
}
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, Context ctx)
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, CallContext ctx)
{
if (initRequest is null)
{
@ -485,7 +487,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
return new ObjectStreamer(call);
}
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, Context ctx)
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, CallContext ctx)
{
using var stream = GetSearchReader(request, ctx);
@ -503,7 +505,7 @@ internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
}
}
private SearchReader GetSearchReader(SearchRequest initRequest, Context ctx)
private SearchReader GetSearchReader(SearchRequest initRequest, CallContext ctx)
{
if (initRequest is null)
{

View file

@ -10,7 +10,7 @@ internal sealed class SessionServiceProvider : ContextAccessor
{
private readonly SessionService.SessionServiceClient? _sessionServiceClient;
internal SessionServiceProvider(SessionService.SessionServiceClient? sessionServiceClient, ClientEnvironment context)
internal SessionServiceProvider(SessionService.SessionServiceClient? sessionServiceClient, ClientContext context)
: base(context)
{
_sessionServiceClient = sessionServiceClient;
@ -20,7 +20,7 @@ internal sealed class SessionServiceProvider : ContextAccessor
{
var ctx = args.Context!;
ctx.OwnerId ??= Context.Owner;
ctx.OwnerId ??= ClientContext.Owner;
var request = new CreateRequest
{
@ -37,7 +37,7 @@ internal sealed class SessionServiceProvider : ContextAccessor
return await CreateSession(request, args.Context!).ConfigureAwait(false);
}
internal async Task<SessionToken> CreateSession(CreateRequest request, Context ctx)
internal async Task<SessionToken> CreateSession(CreateRequest request, CallContext ctx)
{
var response = await _sessionServiceClient!.CreateAsync(request, null, ctx.Deadline, ctx.CancellationToken);

View file

@ -1,6 +1,6 @@
namespace FrostFS.SDK.ClientV2;
internal class ContextAccessor(ClientEnvironment context)
internal class ContextAccessor(ClientContext context)
{
protected ClientEnvironment Context { get; set; } = context;
protected ClientContext ClientContext { get; set; } = context;
}

View file

@ -4,16 +4,16 @@ namespace FrostFS.SDK.ClientV2;
internal interface ISessionProvider
{
ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx);
ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx);
}
internal sealed class SessionProvider(ClientEnvironment env)
internal sealed class SessionProvider(ClientContext envCtx)
{
public async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
public async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
{
if (args.SessionToken is null)
{
return await env.Client.CreateSessionInternalAsync(new PrmSessionCreate(uint.MaxValue) { Context = ctx })
return await envCtx.Client.CreateSessionInternalAsync(new PrmSessionCreate(uint.MaxValue, ctx))
.ConfigureAwait(false);
}

View file

@ -2,16 +2,21 @@ using System;
using System.Buffers;
using System.Security.Cryptography;
using FrostFS.SDK.Cryptography;
using Grpc.Net.Client;
namespace FrostFS.SDK.ClientV2;
public class ClientEnvironment(FrostFSClient client, ECDsa? key, FrostFsOwner? owner, GrpcChannel channel, FrostFsVersion version) : IDisposable
public class ClientContext(FrostFSClient client, ECDsa? key, FrostFsOwner? owner, GrpcChannel channel, FrostFsVersion version) : IDisposable
{
private ArrayPool<byte>? _arrayPool;
private string? sessionKey;
internal FrostFsOwner? Owner { get; } = owner;
internal string? Address { get; } = channel.Target;
internal GrpcChannel Channel { get; private set; } = channel;
internal FrostFsVersion Version { get; } = version;
@ -22,6 +27,21 @@ public class ClientEnvironment(FrostFSClient client, ECDsa? key, FrostFsOwner? o
internal ClientKey? Key { get; } = key != null ? new ClientKey(key) : null;
internal SessionCache SessionCache { get; set; }
internal string? SessionCacheKey
{
get
{
if (sessionKey == null && Key != null && Address != null)
{
sessionKey = Pool.FormCacheKey(Address, Key.ECDsaKey.PrivateKey().ToString());
}
return sessionKey;
}
}
/// <summary>
/// Custom pool is used for predefined sizes of buffers like grpc chunk
/// </summary>

View file

@ -4,6 +4,10 @@ namespace FrostFS.SDK.ClientV2;
public class NetworkSettings
{
public ulong Epoch { get; internal set; }
public ulong MagicNumber { get; internal set; }
public long MsPerBlock { get; internal set; }
public ulong AuditFee { get; internal set; }
public ulong BasicIncomeRate { get; internal set; }
public ulong ContainerFee { get; internal set; }

View file

@ -17,13 +17,13 @@ public sealed class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : I
internal async Task<Object.Object> ReadHeader()
{
if (!await Call.ResponseStream.MoveNext().ConfigureAwait(false))
throw new InvalidOperationException("unexpected end of stream");
throw new FrostFsStreamException("unexpected end of stream");
var response = Call.ResponseStream.Current;
Verifier.CheckResponse(response);
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Init)
throw new InvalidOperationException("unexpected message type");
throw new FrostFsStreamException("unexpected message type");
return new Object.Object
{
@ -41,7 +41,7 @@ public sealed class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : I
Verifier.CheckResponse(response);
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Chunk)
throw new InvalidOperationException("unexpected message type");
throw new FrostFsStreamException("unexpected message type");
return response.Body.Chunk.Memory;
}

View file

@ -11,7 +11,7 @@ namespace FrostFS.SDK.ClientV2;
internal static class ObjectTools
{
internal static FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, Context ctx)
internal static FrostFsObjectId CalculateObjectId(FrostFsObjectHeader header, CallContext ctx)
{
var grpcHeader = CreateHeader(header, [], ctx);
@ -21,7 +21,7 @@ internal static class ObjectTools
return new ObjectID { Value = grpcHeader.Sha256() }.ToModel();
}
internal static Object.Object CreateObject(FrostFsObject @object, Context ctx)
internal static Object.Object CreateObject(FrostFsObject @object, CallContext ctx)
{
@object.Header.OwnerId ??= ctx.OwnerId;
@object.Header.Version ??= ctx.Version;
@ -53,13 +53,13 @@ internal static class ObjectTools
return obj;
}
internal static void SetSplitValues(Header grpcHeader, FrostFsSplit split, Context ctx)
internal static void SetSplitValues(Header grpcHeader, FrostFsSplit split, CallContext ctx)
{
if (split == null)
return;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
throw new FrostFsInvalidObjectException(nameof(ctx.Key));
grpcHeader.Split = new Header.Types.Split
{
@ -85,7 +85,7 @@ internal static class ObjectTools
grpcHeader.Split.Previous = split.Previous?.ToMessage();
}
internal static Header CreateHeader(FrostFsObjectHeader header, byte[]? payload, Context ctx)
internal static Header CreateHeader(FrostFsObjectHeader header, byte[]? payload, CallContext ctx)
{
header.OwnerId ??= ctx.OwnerId;
header.Version ??= ctx.Version;

View file

@ -76,6 +76,11 @@ public static class RequestSigner
public static byte[] SignData(this ECDsa key, byte[] data)
{
if (key is null)
{
throw new ArgumentNullException(nameof(key));
}
var hash = new byte[65];
hash[0] = 0x04;

View file

@ -122,7 +122,7 @@ public static class Verifier
var status = resp.MetaHeader.Status.ToModel();
if (status != null && !status.IsSuccess)
throw new ResponseException(status);
throw new FrostFsResponseException(status);
}
/// <summary>
@ -137,6 +137,6 @@ public static class Verifier
}
if (!request.Verify())
throw new FormatException($"invalid response, type={request.GetType()}");
throw new FrostFsResponseException($"invalid response, type={request.GetType()}");
}
}

View file

@ -0,0 +1,62 @@
using FrostFS.SDK.ProtosV2.Interfaces;
using FrostFS.Session;
using Google.Protobuf;
namespace FrostFS.Accounting;
public partial class BalanceRequest : IRequest
{
IMetaHeader IVerifiableMessage.GetMetaHeader()
{
return MetaHeader;
}
IVerificationHeader IVerifiableMessage.GetVerificationHeader()
{
return VerifyHeader;
}
void IVerifiableMessage.SetMetaHeader(IMetaHeader metaHeader)
{
MetaHeader = (RequestMetaHeader)metaHeader;
}
void IVerifiableMessage.SetVerificationHeader(IVerificationHeader verificationHeader)
{
VerifyHeader = (RequestVerificationHeader)verificationHeader;
}
public IMessage GetBody()
{
return Body;
}
}
public partial class BalanceResponse : IResponse
{
IMetaHeader IVerifiableMessage.GetMetaHeader()
{
return MetaHeader;
}
IVerificationHeader IVerifiableMessage.GetVerificationHeader()
{
return VerifyHeader;
}
void IVerifiableMessage.SetMetaHeader(IMetaHeader metaHeader)
{
MetaHeader = (ResponseMetaHeader)metaHeader;
}
void IVerifiableMessage.SetVerificationHeader(IVerificationHeader verificationHeader)
{
VerifyHeader = (ResponseVerificationHeader)verificationHeader;
}
public IMessage GetBody()
{
return Body;
}
}

View file

@ -1,11 +1,11 @@
using System.Diagnostics;
using Grpc.Core;
using Grpc.Core;
using Grpc.Core.Interceptors;
namespace FrostFS.SDK.SmokeTests;
public class MetricsInterceptor() : Interceptor
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1062:Validate arguments of public methods",
Justification = "parameters are provided by GRPC infrastructure")]
public class CallbackInterceptor(Action<string>? callback = null) : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
@ -22,17 +22,11 @@ public class MetricsInterceptor() : Interceptor
call.Dispose);
}
private static async Task<TResponse> HandleUnaryResponse<TResponse>(AsyncUnaryCall<TResponse> call)
private async Task<TResponse> HandleUnaryResponse<TResponse>(AsyncUnaryCall<TResponse> call)
{
var watch = new Stopwatch();
watch.Start();
var response = await call;
var response = await call.ResponseAsync;
watch.Stop();
// Do something with call info
// var elapsed = watch.ElapsedTicks * 1_000_000/Stopwatch.Frequency;
callback?.Invoke($"elapsed");
return response;
}

View file

@ -1,49 +1,14 @@
using System.Diagnostics.CodeAnalysis;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using Google.Protobuf;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ContainerTestsBase
{
protected readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected ContainerMocker Mocker { get; set; }
protected ContainerTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ContainerMocker(this.key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
new NetworkMocker(this.key).GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
Mocker.GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public class ContainerTest : ContainerTestsBase
{
[Fact]

View file

@ -0,0 +1,40 @@
using FrostFS.SDK.ClientV2.Interfaces;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ContainerTestsBase
{
internal readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected ContainerMocker Mocker { get; set; }
protected ContainerTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ContainerMocker(this.key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
new NetworkMocker(this.key).GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
Mocker.GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}

View file

@ -0,0 +1,6 @@
// This file is used by Code Analysis to maintain SuppressMessage
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.

View file

@ -26,8 +26,11 @@ public class AsyncStreamReaderMock(string key, FrostFsObjectHeader objectHeader)
OwnerId = objectHeader.OwnerId!.ToMessage()
};
foreach (var attr in objectHeader.Attributes)
header.Attributes.Add(attr.ToMessage());
if (objectHeader.Attributes != null)
{
foreach (var attr in objectHeader.Attributes)
header.Attributes.Add(attr.ToMessage());
}
var response = new GetResponse
{

View file

@ -1,3 +1,5 @@
using System.Collections.ObjectModel;
using FrostFS.SDK.ProtosV2.Interfaces;
using Grpc.Core;
@ -6,13 +8,15 @@ namespace FrostFS.SDK.Tests;
public class ClientStreamWriter : IClientStreamWriter<IRequest>
{
public List<IRequest> Messages { get; set; } = [];
private WriteOptions? _options;
public Collection<IRequest> Messages { get; } = [];
public bool CompletedTask { get; private set; }
public WriteOptions? WriteOptions
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
get => _options;
set => _options = value;
}
public Task CompleteAsync()

View file

@ -25,7 +25,10 @@ public abstract class ServiceBase(string key)
public static FrostFsVersion DefaultVersion { get; } = new(2, 13);
public static FrostFsPlacementPolicy DefaultPlacementPolicy { get; } = new FrostFsPlacementPolicy(true, new FrostFsReplica(1));
public Metadata Metadata { get; protected set; }
#pragma warning disable CA2227 // this is specific object, should be treated as is
public Metadata? Metadata { get; set; }
#pragma warning restore CA2227
public DateTime? DateTime { get; protected set; }
public CancellationToken CancellationToken { get; protected set; }
@ -35,6 +38,8 @@ public abstract class ServiceBase(string key)
protected ResponseVerificationHeader GetResponseVerificationHeader(IResponse response)
{
ArgumentNullException.ThrowIfNull(response);
var verifyHeader = new ResponseVerificationHeader
{
MetaSignature = new Refs.Signature

View file

@ -1,3 +1,5 @@
using System.Collections.ObjectModel;
using FrostFS.Container;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2;
@ -41,7 +43,7 @@ public class ContainerMocker(string key) : ContainerServiceBase(key)
putResponse.VerifyHeader = GetResponseVerificationHeader(putResponse);
var metadata = new Metadata();
var putContainerResponse = new AsyncUnaryCall<PutResponse>(
using var putContainerResponse = new AsyncUnaryCall<PutResponse>(
Task.FromResult(putResponse),
Task.FromResult(metadata),
() => new Grpc.Core.Status(StatusCode.OK, string.Empty),
@ -180,7 +182,7 @@ public class ContainerMocker(string key) : ContainerServiceBase(key)
public bool ReturnContainerRemoved { get; set; }
public List<byte[]> ContainerIds { get; set; } = [];
public Collection<byte[]> ContainerIds { get; } = [];
public List<RequestData<DeleteRequest>> Requests { get; set; } = [];
public Collection<RequestData<DeleteRequest>> Requests { get; } = [];
}

View file

@ -25,18 +25,17 @@ public class NetworkMocker(string key) : ServiceBase(key)
"MaintenanceModeAllowed"
];
public Dictionary<string, byte[]>? Parameters { get; set; }
public Dictionary<string, byte[]> Parameters { get; } = [];
public LocalNodeInfoResponse NodeInfoResponse { get; set; }
public LocalNodeInfoResponse? NodeInfoResponse { get; set; }
public LocalNodeInfoRequest LocalNodeInfoRequest { get; set; }
public LocalNodeInfoRequest? LocalNodeInfoRequest { get; set; }
public NetworkInfoRequest NetworkInfoRequest { get; set; }
public NetworkInfoRequest? NetworkInfoRequest { get; set; }
public NetmapSnapshotResponse NetmapSnapshotResponse { get; set; }
public NetmapSnapshotRequest NetmapSnapshotRequest { get; set; }
public NetmapSnapshotResponse? NetmapSnapshotResponse { get; set; }
public NetmapSnapshotRequest? NetmapSnapshotRequest { get; set; }
public Mock<NetmapService.NetmapServiceClient> GetMock()
{

View file

@ -1,3 +1,4 @@
using System.Collections.ObjectModel;
using System.Security.Cryptography;
using FrostFS.Object;
@ -92,7 +93,7 @@ public class ObjectMocker(string key) : ObjectServiceBase(key)
}
if (ResultObjectIds != null)
if (ResultObjectIds != null && ResultObjectIds.Count > 0)
{
PutResponse putResponse = new()
{
@ -197,14 +198,14 @@ public class ObjectMocker(string key) : ObjectServiceBase(key)
public Header? HeadResponse { get; set; }
public List<byte[]>? ResultObjectIds { get; set; }
public Collection<byte[]>? ResultObjectIds { get; } = [];
public ClientStreamWriter? ClientStreamWriter { get; private set; } = new();
public List<PutSingleRequest> PutSingleRequests { get; private set; } = [];
public Collection<PutSingleRequest> PutSingleRequests { get; private set; } = [];
public List<DeleteRequest> DeleteRequests { get; private set; } = [];
public Collection<DeleteRequest> DeleteRequests { get; private set; } = [];
public List<HeadRequest> HeadRequests { get; private set; } = [];
public Collection<HeadRequest> HeadRequests { get; private set; } = [];
}

View file

@ -8,13 +8,14 @@ using Moq;
namespace FrostFS.SDK.Tests;
[System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "No secure purpose")]
public class SessionMocker(string key) : ServiceBase(key)
{
public byte[]? SessionId { get; set; }
public byte[]? SessionKey { get; set; }
public CreateRequest CreateSessionRequest { get; private set; }
public CreateRequest? CreateSessionRequest { get; private set; }
public Mock<SessionService.SessionServiceClient> GetMock()
{
@ -24,7 +25,7 @@ public class SessionMocker(string key) : ServiceBase(key)
if (SessionId == null)
{
SessionId = new byte[32];
SessionId = new byte[16];
rand.NextBytes(SessionId);
}

View file

@ -1,55 +1,13 @@
using System.Security.Cryptography;
using System.Diagnostics.CodeAnalysis;
using FrostFS.Netmap;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Google.Protobuf;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class NetworkTestsBase
{
protected readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsVersion Version { get; set; } = new FrostFsVersion(2, 13);
protected ECDsa ECDsaKey { get; set; }
protected FrostFsOwner OwnerId { get; set; }
protected NetworkMocker Mocker { get; set; }
protected NetworkTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
ECDsaKey = key.LoadWif();
OwnerId = FrostFsOwner.FromKey(ECDsaKey);
Mocker = new NetworkMocker(this.key);
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
Mocker.GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
new ContainerMocker(this.key).GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public class NetworkTest : NetworkTestsBase
{
[Theory]
@ -57,35 +15,29 @@ public class NetworkTest : NetworkTestsBase
[InlineData(true)]
public async void NetworkSettingsTest(bool useContext)
{
Mocker.Parameters = new Dictionary<string, byte[]>
{
{ "AuditFee", [1] },
{ "BasicIncomeRate", [2] },
{ "ContainerFee", [3] },
{ "ContainerAliasFee", [4] },
{ "EpochDuration", [5] },
{ "InnerRingCandidateFee", [6] },
{ "MaxECDataCount", [7] },
{ "MaxECParityCount", [8] },
{ "MaxObjectSize", [9] },
{ "WithdrawFee", [10] },
{ "HomomorphicHashingDisabled", [1] },
{ "MaintenanceModeAllowed", [1] },
};
Mocker.Parameters.Add("AuditFee", [1]);
Mocker.Parameters.Add("BasicIncomeRate", [2]);
Mocker.Parameters.Add("ContainerFee", [3]);
Mocker.Parameters.Add("ContainerAliasFee", [4]);
Mocker.Parameters.Add("EpochDuration", [5]);
Mocker.Parameters.Add("InnerRingCandidateFee", [6]);
Mocker.Parameters.Add("MaxECDataCount", [7]);
Mocker.Parameters.Add("MaxECParityCount", [8]);
Mocker.Parameters.Add("MaxObjectSize", [9]);
Mocker.Parameters.Add("WithdrawFee", [10]);
Mocker.Parameters.Add("HomomorphicHashingDisabled", [1]);
Mocker.Parameters.Add("MaintenanceModeAllowed", [1]);
var param = new PrmNetworkSettings();
if (useContext)
{
param.Context = new Context
var param = useContext ?
new PrmNetworkSettings(new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
OwnerId = OwnerId,
Key = ECDsaKey,
Version = Version
};
}
})
: new PrmNetworkSettings();
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -119,6 +71,7 @@ public class NetworkTest : NetworkTestsBase
}
else
{
Assert.NotNull(Mocker.NetworkInfoRequest);
Assert.Empty(Mocker.NetworkInfoRequest.MetaHeader.XHeaders);
Assert.Null(Mocker.DateTime);
}
@ -127,6 +80,7 @@ public class NetworkTest : NetworkTestsBase
[Theory]
[InlineData(false)]
[InlineData(true)]
public async void NetmapSnapshotTest(bool useContext)
{
var body = new NetmapSnapshotResponse.Types.Body
@ -159,12 +113,11 @@ public class NetworkTest : NetworkTestsBase
Mocker.NetmapSnapshotResponse = new NetmapSnapshotResponse { Body = body };
var param = new PrmNetmapSnapshot();
PrmNetmapSnapshot param;
if (useContext)
{
param.XHeaders.Add("headerKey1", "headerValue1");
param.Context = new Context
var ctx = new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
@ -172,6 +125,14 @@ public class NetworkTest : NetworkTestsBase
Key = ECDsaKey,
Version = Version
};
param = new(ctx);
param.XHeaders.Add("headerKey1", "headerValue1");
}
else
{
param = new();
}
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -210,6 +171,7 @@ public class NetworkTest : NetworkTestsBase
if (useContext)
{
Assert.NotNull(Mocker.NetmapSnapshotRequest);
Assert.Single(Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders);
Assert.Equal(param.XHeaders.Keys[0], Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders.First().Key);
Assert.Equal(param.XHeaders[param.XHeaders.Keys[0]], Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders.First().Value);
@ -222,6 +184,7 @@ public class NetworkTest : NetworkTestsBase
}
else
{
Assert.NotNull(Mocker.NetmapSnapshotRequest);
Assert.Empty(Mocker.NetmapSnapshotRequest.MetaHeader.XHeaders);
Assert.Null(Mocker.DateTime);
}
@ -249,12 +212,11 @@ public class NetworkTest : NetworkTestsBase
Mocker.NodeInfoResponse = new LocalNodeInfoResponse { Body = body };
var param = new PrmNodeInfo();
PrmNodeInfo param;
if (useContext)
{
param.XHeaders.Add("headerKey1", "headerValue1");
param.Context = new Context
var ctx = new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
@ -262,6 +224,14 @@ public class NetworkTest : NetworkTestsBase
Key = ECDsaKey,
Version = Version
};
param = new(ctx);
param.XHeaders.Add("headerKey1", "headerValue1");
}
else
{
param = new();
}
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -282,6 +252,7 @@ public class NetworkTest : NetworkTestsBase
Assert.Equal("value1", result.Attributes["key1"]);
Assert.Equal("value2", result.Attributes["key2"]);
Assert.NotNull(Mocker.LocalNodeInfoRequest);
if (useContext)
{
Assert.Single(Mocker.LocalNodeInfoRequest.MetaHeader.XHeaders);

View file

@ -0,0 +1,48 @@
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public abstract class NetworkTestsBase
{
internal readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsVersion Version { get; set; } = new FrostFsVersion(2, 13);
protected ECDsa ECDsaKey { get; set; }
protected FrostFsOwner OwnerId { get; set; }
protected NetworkMocker Mocker { get; set; }
protected NetworkTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
ECDsaKey = key.LoadWif();
OwnerId = FrostFsOwner.FromKey(ECDsaKey);
Mocker = new NetworkMocker(this.key);
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
Mocker.GetMock().Object,
new SessionMocker(this.key).GetMock().Object,
new ContainerMocker(this.key).GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}

View file

@ -1,71 +1,19 @@
using System.Collections.ObjectModel;
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using System.Text;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Google.Protobuf;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ObjectTestsBase
{
protected static readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsContainerId ContainerId { get; set; }
protected NetworkMocker NetworkMocker { get; set; } = new NetworkMocker(key);
protected SessionMocker SessionMocker { get; set; } = new SessionMocker(key);
protected ContainerMocker ContainerMocker { get; set; } = new ContainerMocker(key);
protected ObjectMocker Mocker { get; set; }
protected ObjectTestsBase()
{
var ecdsaKey = key.LoadWif();
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ObjectMocker(key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
ContainerId = new FrostFsContainerId(Base58.Encode(Mocker.ContainerGuid.ToBytes()));
Mocker.ObjectHeader = new(
ContainerId,
FrostFsObjectType.Regular,
[new FrostFsAttributePair("k", "v")],
null,
FrostFsOwner.FromKey(ecdsaKey),
new FrostFsVersion(2, 13));
}
protected IFrostFSClient GetClient()
{
return FrostFSClient.GetTestInstance(
Settings,
null,
NetworkMocker.GetMock().Object,
SessionMocker.GetMock().Object,
ContainerMocker.GetMock().Object,
Mocker.GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
[SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "No secure purpose")]
public class ObjectTest : ObjectTestsBase
{
[Fact]
@ -75,7 +23,7 @@ public class ObjectTest : ObjectTestsBase
var ecdsaKey = key.LoadWif();
var ctx = new Context
var ctx = new CallContext
{
Key = ecdsaKey,
OwnerId = FrostFsOwner.FromKey(ecdsaKey),
@ -84,22 +32,25 @@ public class ObjectTest : ObjectTestsBase
var objectId = client.CalculateObjectId(Mocker.ObjectHeader!, ctx);
var result = await client.GetObjectAsync(new PrmObjectGet(ContainerId, objectId) { Context = ctx });
var result = await client.GetObjectAsync(new PrmObjectGet(ContainerId, objectId, ctx));
Assert.NotNull(result);
Assert.Equal(Mocker.ObjectHeader!.ContainerId.GetValue(), result.Header.ContainerId.GetValue());
Assert.Equal(Mocker.ObjectHeader!.OwnerId!.Value, result.Header.OwnerId!.Value);
Assert.NotNull(Mocker.ObjectHeader);
Assert.Equal(Mocker.ObjectHeader.ContainerId.GetValue(), result.Header.ContainerId.GetValue());
Assert.Equal(Mocker.ObjectHeader.OwnerId!.Value, result.Header.OwnerId!.Value);
Assert.Equal(Mocker.ObjectHeader.PayloadLength, result.Header.PayloadLength);
Assert.NotNull(result.Header.Attributes);
Assert.Single(result.Header.Attributes);
Assert.Equal(Mocker.ObjectHeader.Attributes[0].Key, result.Header.Attributes[0].Key);
Assert.Equal(Mocker.ObjectHeader.Attributes[0].Value, result.Header.Attributes[0].Value);
Assert.Equal(Mocker.ObjectHeader.Attributes![0].Key, result.Header.Attributes[0].Key);
Assert.Equal(Mocker.ObjectHeader.Attributes![0].Value, result.Header.Attributes[0].Value);
}
[Fact]
public async void PutObjectTest()
{
Mocker.ResultObjectIds = new([SHA256.HashData([])]);
Mocker.ResultObjectIds!.Add(SHA256.HashData([]));
Random rnd = new();
var bytes = new byte[1024];
@ -134,7 +85,7 @@ public class ObjectTest : ObjectTestsBase
[Fact]
public async void ClientCutTest()
{
NetworkMocker.Parameters = new Dictionary<string, byte[]>() { { "MaxObjectSize", [0x0, 0xa] } };
NetworkMocker.Parameters.Add("MaxObjectSize", [0x0, 0xa]);
var blockSize = 2560;
byte[] bytes = File.ReadAllBytes(@".\..\..\..\TestData\cat.jpg");
@ -150,17 +101,19 @@ public class ObjectTest : ObjectTestsBase
Random rnd = new();
List<byte[]> objIds = new([new byte[32], new byte[32], new byte[32]]);
Collection<byte[]> objIds = new([new byte[32], new byte[32], new byte[32]]);
rnd.NextBytes(objIds.ElementAt(0));
rnd.NextBytes(objIds.ElementAt(1));
rnd.NextBytes(objIds.ElementAt(2));
Mocker.ResultObjectIds = objIds;
foreach (var objId in objIds)
Mocker.ResultObjectIds!.Add(objId);
var result = await GetClient().PutObjectAsync(param);
var singleObjects = Mocker.PutSingleRequests.ToArray();
Assert.NotNull(Mocker.ClientStreamWriter?.Messages);
var streamObjects = Mocker.ClientStreamWriter.Messages.ToArray();
Assert.Single(singleObjects);
@ -262,6 +215,7 @@ public class ObjectTest : ObjectTestsBase
Assert.Equal(FrostFsObjectType.Regular, response.ObjectType);
Assert.NotNull(response.Attributes);
Assert.Single(response.Attributes);
Assert.Equal(Mocker.HeadResponse.Attributes[0].Key, response.Attributes.First().Key);

View file

@ -0,0 +1,59 @@
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class ObjectTestsBase
{
protected static readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected FrostFsContainerId ContainerId { get; set; }
protected NetworkMocker NetworkMocker { get; set; } = new NetworkMocker(key);
protected SessionMocker SessionMocker { get; set; } = new SessionMocker(key);
protected ContainerMocker ContainerMocker { get; set; } = new ContainerMocker(key);
protected ObjectMocker Mocker { get; set; }
protected ObjectTestsBase()
{
var ecdsaKey = key.LoadWif();
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
Mocker = new ObjectMocker(key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13),
ContainerGuid = Guid.NewGuid()
};
ContainerId = new FrostFsContainerId(Base58.Encode(Mocker.ContainerGuid.ToBytes()));
Mocker.ObjectHeader = new(
ContainerId,
FrostFsObjectType.Regular,
[new FrostFsAttributePair("k", "v")],
null,
FrostFsOwner.FromKey(ecdsaKey),
new FrostFsVersion(2, 13));
}
protected IFrostFSClient GetClient()
{
return FrostFSClient.GetTestInstance(
Settings,
null,
NetworkMocker.GetMock().Object,
SessionMocker.GetMock().Object,
ContainerMocker.GetMock().Object,
Mocker.GetMock().Object);
}
}

View file

@ -0,0 +1,612 @@
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
using static FrostFS.Session.SessionToken.Types.Body;
namespace FrostFS.SDK.SmokeTests;
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
[SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "No secure purpose")]
public class PoolSmokeTests : SmokeTestsBase
{
private static readonly PrmWait lightWait = new(100, 1);
private InitParameters GetDefaultParams()
{
return new InitParameters
{
Key = keyString.LoadWif(),
NodeParams = [new(1, this.url, 100.0f)],
ClientBuilder = null,
GracefulCloseOnSwitchTimeout = 30_000_000,
Logger = null
};
}
[Fact]
public async void NetworkMapTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext());
Assert.Null(error);
var result = await pool.GetNetmapSnapshotAsync(default);
Assert.True(result.Epoch > 0);
Assert.Single(result.NodeInfoCollection);
var item = result.NodeInfoCollection[0];
Assert.Equal(2, item.Version.Major);
Assert.Equal(13, item.Version.Minor);
Assert.Equal(NodeState.Online, item.State);
Assert.True(item.PublicKey.Length > 0);
Assert.Single(item.Addresses);
Assert.Equal(9, item.Attributes.Count);
}
[Fact]
public async void NodeInfoTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext());
Assert.Null(error);
var result = await pool.GetNodeInfoAsync();
Assert.Equal(2, result.Version.Major);
Assert.Equal(13, result.Version.Minor);
Assert.Equal(NodeState.Online, result.State);
Assert.Equal(33, result.PublicKey.Length);
Assert.Single(result.Addresses);
Assert.Equal(9, result.Attributes.Count);
}
[Fact]
public async void NodeInfoStatisticsTwoNodesTest()
{
var options = new InitParameters
{
Key = keyString.LoadWif(),
NodeParams = [
new(1, this.url, 100.0f),
new(2, this.url.Replace('0', '1'), 100.0f)
],
ClientBuilder = null,
GracefulCloseOnSwitchTimeout = 30_000_000,
Logger = null
};
using var pool = new Pool(options);
var callbackText = string.Empty;
var ctx = new CallContext
{
Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"
};
var error = await pool.Dial(ctx).ConfigureAwait(true);
Assert.Null(error);
using var client = FrostFSClient.GetSingleOwnerInstance(GetSingleOwnerOptions(this.keyString, this.url));
var result = await client.GetNodeInfoAsync();
var statistics = pool.Statistic();
Assert.False(string.IsNullOrEmpty(callbackText));
Assert.Contains(" took ", callbackText, StringComparison.Ordinal);
}
[Fact]
public async void NodeInfoStatisticsTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var callbackText = string.Empty;
var ctx = new CallContext
{
Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"
};
var error = await pool.Dial(ctx).ConfigureAwait(true);
Assert.Null(error);
using var client = FrostFSClient.GetSingleOwnerInstance(GetSingleOwnerOptions(this.keyString, this.url));
var result = await client.GetNodeInfoAsync();
Assert.False(string.IsNullOrEmpty(callbackText));
Assert.Contains(" took ", callbackText, StringComparison.Ordinal);
}
[Fact]
public async void GetSessionTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
var prm = new PrmSessionCreate(100);
var token = await pool.CreateSessionAsync(prm).ConfigureAwait(true);
var session = new Session.SessionToken().Deserialize(token.Token);
var ownerHash = Base58.Decode(OwnerId!.Value);
Assert.NotNull(session);
Assert.Null(session.Body.Container);
Assert.Null(session.Body.Object);
Assert.Equal(16, session.Body.Id.Length);
Assert.Equal(100ul, session.Body.Lifetime.Exp);
Assert.Equal(ownerHash, session.Body.OwnerId.Value);
Assert.Equal(33, session.Body.SessionKey.Length);
Assert.Equal(ContextOneofCase.None, session.Body.ContextCase);
}
[Fact]
public async void CreateObjectWithSessionToken()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
var token = await pool.CreateSessionAsync(new PrmSessionCreate(int.MaxValue));
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))));
createContainerParam.XHeaders.Add("key1", "value1");
var containerId = await pool.CreateContainerAsync(createContainerParam);
var bytes = GetRandomBytes(1024);
var param = new PrmObjectPut
{
Header = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = false,
SessionToken = token
};
var objectId = await pool.PutObjectAsync(param).ConfigureAwait(true);
var @object = await pool.GetObjectAsync(new PrmObjectGet(containerId, objectId));
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await Cleanup(pool);
}
[Fact]
public async void FilterTest()
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))))
{
WaitParams = lightWait
};
var containerId = await pool.CreateContainerAsync(createContainerParam);
var bytes = new byte[] { 1, 2, 3 };
var ParentHeader = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular)
{
PayloadLength = 3
};
var param = new PrmObjectPut
{
Header = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")],
new FrostFsSplit()),
Payload = new MemoryStream(bytes),
ClientCut = false
};
var objectId = await pool.PutObjectAsync(param);
var head = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId));
var ecdsaKey = this.keyString.LoadWif();
var networkInfo = await pool.GetNetmapSnapshotAsync();
await CheckFilter(pool, containerId, new FilterByContainerId(FrostFsMatchType.Equals, containerId));
await CheckFilter(pool, containerId, new FilterByOwnerId(FrostFsMatchType.Equals, FrostFsOwner.FromKey(ecdsaKey)));
await CheckFilter(pool, containerId, new FilterBySplitId(FrostFsMatchType.Equals, param.Header.Split!.SplitId));
await CheckFilter(pool, containerId, new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"));
await CheckFilter(pool, containerId, new FilterByObjectId(FrostFsMatchType.Equals, objectId));
await CheckFilter(pool, containerId, new FilterByVersion(FrostFsMatchType.Equals, networkInfo.NodeInfoCollection[0].Version));
await CheckFilter(pool, containerId, new FilterByEpoch(FrostFsMatchType.Equals, networkInfo.Epoch));
await CheckFilter(pool, containerId, new FilterByPayloadLength(FrostFsMatchType.Equals, 3));
var checkSum = CheckSum.CreateCheckSum(bytes);
await CheckFilter(pool, containerId, new FilterByPayloadHash(FrostFsMatchType.Equals, checkSum));
await CheckFilter(pool, containerId, new FilterByPhysicallyStored());
}
private static async Task CheckFilter(Pool pool, FrostFsContainerId containerId, IObjectFilter filter)
{
var resultObjectsCount = 0;
PrmObjectSearch searchParam = new(containerId) { Filters = [filter] };
await foreach (var objId in pool.SearchObjectsAsync(searchParam))
{
resultObjectsCount++;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objId));
}
Assert.True(0 < resultObjectsCount, $"Filter for {filter.Key} doesn't work");
}
[Theory]
[InlineData(1)]
[InlineData(3 * 1024 * 1024)] // exactly one chunk size - 3MB
[InlineData(6 * 1024 * 1024 + 100)]
public async void SimpleScenarioTest(int objectSize)
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
bool callbackInvoked = false;
var ctx = new CallContext
{
// Timeout = TimeSpan.FromSeconds(20),
Callback = new((CallStatistics cs) =>
{
callbackInvoked = true;
Assert.True(cs.ElapsedMicroSeconds > 0);
})
};
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1)), [new("testKey", "testValue")]), ctx);
var createdContainer = await pool.CreateContainerAsync(createContainerParam);
var container = await pool.GetContainerAsync(new PrmContainerGet(createdContainer));
Assert.NotNull(container);
Assert.True(callbackInvoked);
var bytes = GetRandomBytes(objectSize);
var ctx1 = new CallContext
{
Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0))
};
var param = new PrmObjectPut(ctx1)
{
Header = new FrostFsObjectHeader(
containerId: createdContainer,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = false
};
var objectId = await pool.PutObjectAsync(param);
var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test");
bool hasObject = false;
await foreach (var objId in pool.SearchObjectsAsync(new PrmObjectSearch(createdContainer) { Filters = [filter] }))
{
hasObject = true;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(createdContainer, objectId));
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.NotNull(objHeader.Attributes);
Assert.Single(objHeader.Attributes!);
Assert.Equal("fileName", objHeader.Attributes!.First().Key);
Assert.Equal("test", objHeader.Attributes!.First().Value);
}
Assert.True(hasObject);
var @object = await pool.GetObjectAsync(new PrmObjectGet(createdContainer, objectId));
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await Cleanup(pool);
await foreach (var _ in pool.ListContainersAsync())
{
Assert.Fail("Containers exist");
}
}
[Theory]
[InlineData(1)]
[InlineData(3 * 1024 * 1024)] // exactly one chunk size - 3MB
[InlineData(6 * 1024 * 1024 + 100)]
public async void SimpleScenarioWithSessionTest(int objectSize)
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
var token = await pool.CreateSessionAsync(new PrmSessionCreate(int.MaxValue));
await Cleanup(pool);
var ctx = new CallContext
{
Timeout = TimeSpan.FromSeconds(20),
Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0))
};
var createContainerParam = new PrmContainerCreate(
new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))), ctx);
var container = await pool.CreateContainerAsync(createContainerParam);
var containerInfo = await pool.GetContainerAsync(new PrmContainerGet(container, ctx));
Assert.NotNull(containerInfo);
var bytes = GetRandomBytes(objectSize);
var param = new PrmObjectPut(new CallContext { Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0)) })
{
Header = new FrostFsObjectHeader(
containerId: container,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = false,
SessionToken = token
};
var objectId = await pool.PutObjectAsync(param);
var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test");
bool hasObject = false;
await foreach (var objId in pool.SearchObjectsAsync(new PrmObjectSearch(container) { Filters = [filter], SessionToken = token }))
{
hasObject = true;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(container, objectId) { SessionToken = token });
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.NotNull(objHeader.Attributes);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value);
}
Assert.True(hasObject);
var @object = await pool.GetObjectAsync(new PrmObjectGet(container, objectId) { SessionToken = token });
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await Cleanup(pool);
await foreach (var _ in pool.ListContainersAsync())
{
Assert.Fail("Containers exist");
}
}
[Theory]
[InlineData(1)]
[InlineData(64 * 1024 * 1024)] // exactly 1 block size - 64MB
[InlineData(64 * 1024 * 1024 - 1)]
[InlineData(64 * 1024 * 1024 + 1)]
[InlineData(2 * 64 * 1024 * 1024 + 256)]
[InlineData(200)]
public async void ClientCutScenarioTest(int objectSize)
{
var options = GetDefaultParams();
using var pool = new Pool(options);
var error = await pool.Dial(new CallContext()).ConfigureAwait(true);
Assert.Null(error);
await Cleanup(pool);
var createContainerParam = new PrmContainerCreate(new FrostFsContainerInfo(new FrostFsPlacementPolicy(true, new FrostFsReplica(1))))
{
WaitParams = lightWait
};
var containerId = await pool.CreateContainerAsync(createContainerParam);
var ctx = new CallContext
{
Timeout = TimeSpan.FromSeconds(10),
};
ctx.Interceptors.Add(new CallbackInterceptor());
var container = await pool.GetContainerAsync(new PrmContainerGet(containerId, ctx));
Assert.NotNull(container);
byte[] bytes = GetRandomBytes(objectSize);
var param = new PrmObjectPut
{
Header = new FrostFsObjectHeader(
containerId: containerId,
type: FrostFsObjectType.Regular,
[new FrostFsAttributePair("fileName", "test")]),
Payload = new MemoryStream(bytes),
ClientCut = true
};
var objectId = await pool.PutObjectAsync(param);
var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test");
bool hasObject = false;
await foreach (var objId in pool.SearchObjectsAsync(new PrmObjectSearch(containerId, null, filter)))
{
hasObject = true;
var objHeader = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId));
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.NotNull(objHeader.Attributes);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes[0].Key);
Assert.Equal("test", objHeader.Attributes[0].Value);
}
Assert.True(hasObject);
var @object = await pool.GetObjectAsync(new PrmObjectGet(containerId, objectId));
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
ReadOnlyMemory<byte>? chunk = null;
while ((chunk = await @object.ObjectReader!.ReadChunk()) != null)
{
ms.Write(chunk.Value.Span);
}
Assert.Equal(SHA256.HashData(bytes), SHA256.HashData(downloadedBytes));
await CheckFilter(pool, containerId, new FilterByRootObject());
await Cleanup(pool);
await foreach (var cid in pool.ListContainersAsync())
{
Assert.Fail($"Container {cid.GetValue()} exist");
}
}
private static byte[] GetRandomBytes(int size)
{
Random rnd = new();
var bytes = new byte[size];
rnd.NextBytes(bytes);
return bytes;
}
private static IOptions<SingleOwnerClientSettings> GetSingleOwnerOptions(string key, string url)
{
return Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = url
});
}
private static IOptions<ClientSettings> GetOptions(string url)
{
return Options.Create(new ClientSettings
{
Host = url
});
}
static async Task Cleanup(Pool pool)
{
await foreach (var cid in pool.ListContainersAsync())
{
await pool.DeleteContainerAsync(new PrmContainerDelete(cid) { WaitParams = lightWait }).ConfigureAwait(true);
}
}
}

View file

@ -1,56 +1,11 @@
using System.Security.Cryptography;
using System.Diagnostics.CodeAnalysis;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.Cryptography;
using Microsoft.Extensions.Options;
namespace FrostFS.SDK.Tests;
public abstract class SessionTestsBase
{
protected readonly string key = "KwHDAJ66o8FoLBjVbjP2sWBmgBMGjt7Vv4boA7xQrBoAYBE397Aq";
protected IOptions<SingleOwnerClientSettings> Settings { get; set; }
protected ECDsa ECDsaKey { get; set; }
protected FrostFsOwner OwnerId { get; set; }
protected SessionMocker Mocker { get; set; }
protected SessionTestsBase()
{
Settings = Options.Create(new SingleOwnerClientSettings
{
Key = key,
Host = "http://localhost:8080"
});
ECDsaKey = key.LoadWif();
OwnerId = FrostFsOwner.FromKey(ECDsaKey);
Mocker = new SessionMocker(this.key)
{
PlacementPolicy = new FrostFsPlacementPolicy(true, new FrostFsReplica(1)),
Version = new FrostFsVersion(2, 13)
};
}
protected IFrostFSClient GetClient()
{
return ClientV2.FrostFSClient.GetTestInstance(
Settings,
null,
new NetworkMocker(this.key).GetMock().Object,
Mocker.GetMock().Object,
new ContainerMocker(this.key).GetMock().Object,
new ObjectMocker(this.key).GetMock().Object);
}
}
[SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")]
public class SessionTest : SessionTestsBase
{
[Theory]
@ -59,12 +14,11 @@ public class SessionTest : SessionTestsBase
public async void CreateSessionTest(bool useContext)
{
var exp = 100u;
var param = new PrmSessionCreate(exp);
PrmSessionCreate param;
if (useContext)
{
param.XHeaders.Add("headerKey1", "headerValue1");
param.Context = new Context
var ctx = new CallContext
{
CancellationToken = Mocker.CancellationTokenSource.Token,
Timeout = TimeSpan.FromSeconds(20),
@ -72,6 +26,14 @@ public class SessionTest : SessionTestsBase
Key = ECDsaKey,
Version = Mocker.Version
};
param = new PrmSessionCreate(exp, ctx);
param.XHeaders.Add("headerKey1", "headerValue1");
}
else
{
param = new PrmSessionCreate(exp);
}
var validTimeoutFrom = DateTime.UtcNow.AddSeconds(20);
@ -101,7 +63,6 @@ public class SessionTest : SessionTestsBase
Assert.NotNull(Mocker.CreateSessionRequest.MetaHeader);
Assert.Equal(Mocker.Version.ToMessage(), Mocker.CreateSessionRequest.MetaHeader.Version);
Assert.Null(Mocker.Metadata);
if (useContext)

Some files were not shown because too many files have changed in this diff Show more