diff --git a/src/FrostFS.SDK.Cryptography/UUID.cs b/src/FrostFS.SDK.Client/Extensions/FrostFsExtensions.cs similarity index 88% rename from src/FrostFS.SDK.Cryptography/UUID.cs rename to src/FrostFS.SDK.Client/Extensions/FrostFsExtensions.cs index 94fba3f..a986a87 100644 --- a/src/FrostFS.SDK.Cryptography/UUID.cs +++ b/src/FrostFS.SDK.Client/Extensions/FrostFsExtensions.cs @@ -4,8 +4,13 @@ using Google.Protobuf; namespace FrostFS.SDK.Cryptography; -public static class UUIDExtension +public static class FrostFsExtensions { + public static ByteString Sha256(this IMessage data) + { + return ByteString.CopyFrom(data.ToByteArray().Sha256()); + } + public static Guid ToUuid(this ByteString id) { if (id == null) diff --git a/src/FrostFS.SDK.Client/FrostFS.SDK.Client.csproj b/src/FrostFS.SDK.Client/FrostFS.SDK.Client.csproj index 7f4df50..2518ce6 100644 --- a/src/FrostFS.SDK.Client/FrostFS.SDK.Client.csproj +++ b/src/FrostFS.SDK.Client/FrostFS.SDK.Client.csproj @@ -8,11 +8,15 @@ - true + true - true + true + + + + <_SkipUpgradeNetAnalyzersNuGetWarning>true @@ -20,16 +24,16 @@ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - + + + + + diff --git a/src/FrostFS.SDK.Client/FrostFSClient.cs b/src/FrostFS.SDK.Client/FrostFSClient.cs index 4b42fab..eea24fc 100644 --- a/src/FrostFS.SDK.Client/FrostFSClient.cs +++ b/src/FrostFS.SDK.Client/FrostFSClient.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Net.Http; using System.Threading.Tasks; using Frostfs.V2.Ape; @@ -12,7 +11,6 @@ using FrostFS.Session; using Grpc.Core; using Grpc.Core.Interceptors; -using Grpc.Net.Client; using Microsoft.Extensions.Options; @@ -27,8 +25,6 @@ namespace FrostFS.SDK.Client; public class FrostFSClient : IFrostFSClient { - private bool isDisposed; - internal ContainerServiceClient? ContainerServiceClient { get; set; } internal ContainerServiceProvider? ContainerServiceProvider { get; set; } @@ -49,9 +45,19 @@ public class FrostFSClient : IFrostFSClient internal ClientContext ClientCtx { get; set; } - public static IFrostFSClient GetInstance(IOptions clientOptions, GrpcChannelOptions? channelOptions = null) + public static IFrostFSClient GetInstance(IOptions clientOptions, Func grpcChannelFactory) { - return new FrostFSClient(clientOptions, channelOptions); + if (clientOptions is null) + { + throw new ArgumentNullException(nameof(clientOptions)); + } + + if (grpcChannelFactory is null) + { + throw new ArgumentNullException(nameof(grpcChannelFactory)); + } + + return new FrostFSClient(clientOptions, grpcChannelFactory); } /// @@ -66,7 +72,7 @@ public class FrostFSClient : IFrostFSClient /// public static IFrostFSClient GetTestInstance( IOptions settings, - GrpcChannelOptions? channelOptions, + Func grpcChannelFactory, NetmapServiceClient netmapService, SessionServiceClient sessionService, ContainerServiceClient containerService, @@ -77,12 +83,38 @@ public class FrostFSClient : IFrostFSClient throw new ArgumentNullException(nameof(settings)); } - return new FrostFSClient(settings, channelOptions, containerService, netmapService, sessionService, objectService); + if (grpcChannelFactory is null) + { + throw new ArgumentNullException(nameof(grpcChannelFactory)); + } + + if (netmapService is null) + { + throw new ArgumentNullException(nameof(netmapService)); + } + + if (sessionService is null) + { + throw new ArgumentNullException(nameof(sessionService)); + } + + if (containerService is null) + { + throw new ArgumentNullException(nameof(containerService)); + } + + if (objectService is null) + { + throw new ArgumentNullException(nameof(objectService)); + } + + return new FrostFSClient( + settings, channel: grpcChannelFactory(settings.Value.Host), containerService, netmapService, sessionService, objectService); } private FrostFSClient( IOptions settings, - GrpcChannelOptions? channelOptions, + ChannelBase channel, ContainerServiceClient containerService, NetmapServiceClient netmapService, SessionServiceClient sessionService, @@ -99,7 +131,7 @@ public class FrostFSClient : IFrostFSClient client: this, key: new ClientKey(ecdsaKey), owner: FrostFsOwner.FromKey(ecdsaKey), - channel: InitGrpcChannel(settings.Value.Host, channelOptions), + channel: channel, version: new FrostFsVersion(2, 13)) { SessionCache = new SessionCache(0), @@ -113,7 +145,7 @@ public class FrostFSClient : IFrostFSClient ObjectServiceClient = objectService ?? throw new ArgumentNullException(nameof(objectService)); } - private FrostFSClient(IOptions settings, GrpcChannelOptions? channelOptions) + private FrostFSClient(IOptions settings, Func grpcChannelFactory) { var clientSettings = (settings?.Value) ?? throw new ArgumentNullException(nameof(settings), "Options value must be initialized"); @@ -121,13 +153,11 @@ public class FrostFSClient : IFrostFSClient var ecdsaKey = clientSettings.Key.LoadWif(); - var channel = InitGrpcChannel(clientSettings.Host, channelOptions); - ClientCtx = new ClientContext( this, key: new ClientKey(ecdsaKey), owner: FrostFsOwner.FromKey(ecdsaKey), - channel: channel, + channel: grpcChannelFactory(settings.Value.Host), version: new FrostFsVersion(2, 13)) { SessionCache = new SessionCache(0), @@ -145,7 +175,7 @@ public class FrostFSClient : IFrostFSClient client: this, key: new ClientKey(prm.Key), owner: FrostFsOwner.FromKey(prm.Key!), - channel: InitGrpcChannel(prm.Address, null), //prm.GrpcChannelOptions), + channel: prm.GrpcChannelFactory(prm.Address), version: new FrostFsVersion(2, 13)) { SessionCache = cache, @@ -154,21 +184,6 @@ public class FrostFSClient : IFrostFSClient }; } - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing && !isDisposed) - { - ClientCtx?.Dispose(); - isDisposed = true; - } - } - #region ApeManagerImplementation public Task> AddChainAsync(PrmApeChainAdd args, CallContext ctx) { @@ -246,9 +261,14 @@ public class FrostFSClient : IFrostFSClient return GetObjectService().GetRangeHashAsync(args, ctx); } - public Task PutObjectAsync(PrmObjectPut args, CallContext ctx) + public Task PutObjectAsync(PrmObjectPut args, CallContext ctx) { - return GetObjectService().PutObjectAsync(args, ctx); + return GetObjectService().PutStreamObjectAsync(args, ctx); + } + + public Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) + { + return GetObjectService().PutClientCutObjectAsync(args, ctx); } public Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) @@ -308,9 +328,6 @@ public class FrostFSClient : IFrostFSClient private CallInvoker? CreateInvoker() { - if (isDisposed) - throw new FrostFsInvalidObjectException("Client is disposed."); - CallInvoker? callInvoker = null; if (ClientCtx.Interceptors != null) @@ -441,27 +458,6 @@ public class FrostFSClient : IFrostFSClient return ObjectServiceProvider; } - - private static GrpcChannel InitGrpcChannel(string host, GrpcChannelOptions? channelOptions) - { - try - { - var uri = new Uri(host); - - if (channelOptions != null) - return GrpcChannel.ForAddress(uri, channelOptions); - - return GrpcChannel.ForAddress(uri, new GrpcChannelOptions - { - HttpHandler = new HttpClientHandler() - }); - } - catch (UriFormatException e) - { - throw new ArgumentException($"Host '{host}' has invalid format. Error: {e.Message}"); - } - } - public async Task Dial(CallContext ctx) { var service = GetAccouningService(); @@ -474,9 +470,4 @@ public class FrostFSClient : IFrostFSClient { throw new NotImplementedException(); } - - public void Close() - { - Dispose(); - } } diff --git a/src/FrostFS.SDK.Client/Interfaces/IFrostFSClient.cs b/src/FrostFS.SDK.Client/Interfaces/IFrostFSClient.cs index 82df872..cce7364 100644 --- a/src/FrostFS.SDK.Client/Interfaces/IFrostFSClient.cs +++ b/src/FrostFS.SDK.Client/Interfaces/IFrostFSClient.cs @@ -6,7 +6,7 @@ using Frostfs.V2.Ape; namespace FrostFS.SDK.Client.Interfaces; -public interface IFrostFSClient : IDisposable +public interface IFrostFSClient { #region Network Task GetNetmapSnapshotAsync(CallContext ctx); @@ -47,7 +47,9 @@ public interface IFrostFSClient : IDisposable Task[]> GetRangeHashAsync(PrmRangeHashGet args, CallContext ctx); - Task PutObjectAsync(PrmObjectPut args, CallContext ctx); + Task PutObjectAsync(PrmObjectPut args, CallContext ctx); + + Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx); Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx); @@ -63,6 +65,4 @@ public interface IFrostFSClient : IDisposable #endregion public Task Dial(CallContext ctx); - - public void Close(); } diff --git a/src/FrostFS.SDK.Client/Interfaces/IObjectWriter.cs b/src/FrostFS.SDK.Client/Interfaces/IObjectWriter.cs new file mode 100644 index 0000000..2d1abda --- /dev/null +++ b/src/FrostFS.SDK.Client/Interfaces/IObjectWriter.cs @@ -0,0 +1,12 @@ +using System; +using System.Threading.Tasks; + +namespace FrostFS.SDK.Client.Interfaces +{ + public interface IObjectWriter : IDisposable + { + Task WriteAsync(ReadOnlyMemory memory); + + Task CompleteAsync(); + } +} diff --git a/src/FrostFS.SDK.Client/ObjectWriter.cs b/src/FrostFS.SDK.Client/ObjectWriter.cs new file mode 100644 index 0000000..4fad9c9 --- /dev/null +++ b/src/FrostFS.SDK.Client/ObjectWriter.cs @@ -0,0 +1,68 @@ +using System; +using System.Threading.Tasks; + +using FrostFS.Object; +using FrostFS.SDK.Client.Interfaces; + +using Google.Protobuf; + +namespace FrostFS.SDK.Client +{ + internal sealed class ObjectWriter : IObjectWriter + { + private readonly ClientContext ctx; + private readonly PrmObjectPutBase args; + private readonly ObjectStreamer streamer; + private bool disposedValue; + + internal ObjectWriter(ClientContext ctx, PrmObjectPutBase args, ObjectStreamer streamer) + { + this.ctx = ctx; + this.args = args; + this.streamer = streamer; + } + + public async Task WriteAsync(ReadOnlyMemory memory) + { + var chunkRequest = new PutRequest + { + Body = new PutRequest.Types.Body + { + Chunk = UnsafeByteOperations.UnsafeWrap(memory) + } + }; + + chunkRequest.Sign(this.ctx.Key.ECDsaKey); + + await streamer.Write(chunkRequest).ConfigureAwait(false); + } + + public async Task CompleteAsync() + { + var response = await streamer.Close().ConfigureAwait(false); + + Verifier.CheckResponse(response); + + return FrostFsObjectId.FromHash(response.Body.ObjectId.Value.Span); + } + + private void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + streamer.Dispose(); + } + + disposedValue = true; + } + } + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs b/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs new file mode 100644 index 0000000..706e367 --- /dev/null +++ b/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs @@ -0,0 +1,78 @@ +using System.IO; + +namespace FrostFS.SDK.Client; + +public readonly struct PrmObjectClientCutPut( + FrostFsObjectHeader? header, + Stream? payload, + int bufferMaxSize = 0, + FrostFsSessionToken? sessionToken = null, + byte[]? customBuffer = null, + string[]? xheaders = null) : PrmObjectPutBase, System.IEquatable +{ + /// + /// Need to provide values like ContainerId and ObjectType to create and object. + /// Optional parameters ike Attributes can be provided as well. + /// + /// Header with required parameters to create an object + public FrostFsObjectHeader? Header { get; } = header; + + /// + /// A stream with source data + /// + public Stream? Payload { get; } = payload; + + /// + /// Overrides default size of the buffer for stream transferring. + /// + /// Size of the buffer + public int BufferMaxSize { get; } = bufferMaxSize; + + /// + /// Allows to define a buffer for chunks to manage by the memory allocation and releasing. + /// + public byte[]? CustomBuffer { get; } = customBuffer; + + /// + public FrostFsSessionToken? SessionToken { get; } = sessionToken; + + /// + /// FrostFS request X-Headers + /// + public string[] XHeaders { get; } = xheaders ?? []; + + internal PutObjectContext PutObjectContext { get; } = new(); + + public override readonly bool Equals(object obj) + { + if (obj == null || obj is not PrmObjectClientCutPut) + return false; + + return Equals((PrmObjectClientCutPut)obj); + } + + public readonly bool Equals(PrmObjectClientCutPut other) + { + return GetHashCode() == other.GetHashCode(); + } + + public override readonly int GetHashCode() + { + return BufferMaxSize + ^ (Header == null ? 0 : Header.GetHashCode()) + ^ (Payload == null ? 0 : Payload.GetHashCode()) + ^ (CustomBuffer == null ? 0 : CustomBuffer.GetHashCode()) + ^ (SessionToken == null ? 0 : SessionToken.GetHashCode()) + ^ XHeaders.GetHashCode(); + } + + public static bool operator ==(PrmObjectClientCutPut left, PrmObjectClientCutPut right) + { + return left.Equals(right); + } + + public static bool operator !=(PrmObjectClientCutPut left, PrmObjectClientCutPut right) + { + return !(left == right); + } +} diff --git a/src/FrostFS.SDK.Client/Parameters/PrmObjectPut.cs b/src/FrostFS.SDK.Client/Parameters/PrmObjectPut.cs index a2198eb..78eaf1e 100644 --- a/src/FrostFS.SDK.Client/Parameters/PrmObjectPut.cs +++ b/src/FrostFS.SDK.Client/Parameters/PrmObjectPut.cs @@ -1,15 +1,17 @@ -using System.IO; - namespace FrostFS.SDK.Client; +internal interface PrmObjectPutBase : ISessionToken +{ + FrostFsObjectHeader? Header { get; } + + string[] XHeaders { get; } +} + + public readonly struct PrmObjectPut( FrostFsObjectHeader? header, - Stream? payload, - bool clientCut, - int bufferMaxSize = 0, FrostFsSessionToken? sessionToken = null, - byte[]? customBuffer = null, - string[]? xheaders = null) : ISessionToken, System.IEquatable + string[]? xheaders = null) : PrmObjectPutBase, System.IEquatable { /// /// Need to provide values like ContainerId and ObjectType to create and object. @@ -18,30 +20,6 @@ public readonly struct PrmObjectPut( /// Header with required parameters to create an object public FrostFsObjectHeader? Header { get; } = header; - /// - /// A stream with source data - /// - public Stream? Payload { get; } = payload; - - /// - /// Object size is limited. In the data exceeds the limit, the object will be splitted. - /// If the parameter is true, the client side cut is applied. Otherwise, the data is transferred - /// as a stream and will be cut on server side. - /// - /// Is client cut is applied - public bool ClientCut { get; } = clientCut; - - /// - /// Overrides default size of the buffer for stream transferring. - /// - /// Size of the buffer - public int BufferMaxSize { get; } = bufferMaxSize; - - /// - /// Allows to define a buffer for chunks to manage by the memory allocation and releasing. - /// - public byte[]? CustomBuffer { get; } = customBuffer; - /// public FrostFsSessionToken? SessionToken { get; } = sessionToken; @@ -67,11 +45,7 @@ public readonly struct PrmObjectPut( public override readonly int GetHashCode() { - return BufferMaxSize - ^ (Header == null ? 0 : Header.GetHashCode()) - ^ (Payload == null ? 0 : Payload.GetHashCode()) - ^ (ClientCut ? 1 : 0) - ^ (CustomBuffer == null ? 0 : CustomBuffer.GetHashCode()) + return (Header == null ? 0 : Header.GetHashCode()) ^ (SessionToken == null ? 0 : SessionToken.GetHashCode()) ^ XHeaders.GetHashCode(); } @@ -85,4 +59,4 @@ public readonly struct PrmObjectPut( { return !(left == right); } -} +} \ No newline at end of file diff --git a/src/FrostFS.SDK.Client/Pool/ClientWrapper.cs b/src/FrostFS.SDK.Client/Pool/ClientWrapper.cs index 4f8e70c..552ee2a 100644 --- a/src/FrostFS.SDK.Client/Pool/ClientWrapper.cs +++ b/src/FrostFS.SDK.Client/Pool/ClientWrapper.cs @@ -71,8 +71,6 @@ public class ClientWrapper : ClientStatusMonitor return; await Task.Delay((int)WrapperPrm.GracefulCloseOnSwitchTimeout).ConfigureAwait(false); - - Client.Close(); } // restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. @@ -98,31 +96,18 @@ public class ClientWrapper : ClientStatusMonitor await ScheduleGracefulClose().ConfigureAwait(false); } - FrostFSClient? client = null; + FrostFSClient? client = new(WrapperPrm, sessionCache); - try + var error = await client.Dial(ctx).ConfigureAwait(false); + if (!string.IsNullOrEmpty(error)) { - client = new(WrapperPrm, sessionCache); - - var dialCtx = new CallContext(TimeSpan.FromTicks((long)WrapperPrm.DialTimeout), ctx.CancellationToken); - - var error = await client.Dial(ctx).ConfigureAwait(false); - if (!string.IsNullOrEmpty(error)) - { - SetUnhealthyOnDial(); - return wasHealthy; - } - - lock (_lock) - { - Client = client; - } - - client = null; + SetUnhealthyOnDial(); + return wasHealthy; } - finally + + lock (_lock) { - client?.Dispose(); + Client = client; } try diff --git a/src/FrostFS.SDK.Client/Pool/InitParameters.cs b/src/FrostFS.SDK.Client/Pool/InitParameters.cs index 3f4d14e..66da23f 100644 --- a/src/FrostFS.SDK.Client/Pool/InitParameters.cs +++ b/src/FrostFS.SDK.Client/Pool/InitParameters.cs @@ -2,6 +2,7 @@ using System.Collections.ObjectModel; using System.Security.Cryptography; +using Grpc.Core; using Grpc.Core.Interceptors; using Grpc.Net.Client; @@ -10,7 +11,7 @@ using Microsoft.Extensions.Logging; namespace FrostFS.SDK.Client; // InitParameters contains values used to initialize connection Pool. -public class InitParameters +public class InitParameters(Func grpcChannelFactory) { public ECDsa? Key { get; set; } @@ -39,4 +40,6 @@ public class InitParameters public Action? Callback { get; set; } public Collection Interceptors { get; } = []; + + public Func GrpcChannelFactory { get; set; } = grpcChannelFactory; } diff --git a/src/FrostFS.SDK.Client/Pool/Pool.cs b/src/FrostFS.SDK.Client/Pool/Pool.cs index 7df0d40..b4bb357 100644 --- a/src/FrostFS.SDK.Client/Pool/Pool.cs +++ b/src/FrostFS.SDK.Client/Pool/Pool.cs @@ -134,20 +134,20 @@ public partial class Pool : IFrostFSClient int i = 0; foreach (var nodeParams in RebalanceParams.NodesParams) { - var clients = new ClientWrapper[nodeParams.Weights.Count]; + var wrappers = new ClientWrapper[nodeParams.Weights.Count]; for (int j = 0; j < nodeParams.Addresses.Count; j++) { - ClientWrapper? client = null; + ClientWrapper? wrapper = null; bool dialed = false; try { - client = clients[j] = ClientBuilder(nodeParams.Addresses[j]); + wrapper = wrappers[j] = ClientBuilder(nodeParams.Addresses[j]); - await client.Dial(ctx).ConfigureAwait(false); + await wrapper.Dial(ctx).ConfigureAwait(false); dialed = true; - var token = await InitSessionForDuration(ctx, client, RebalanceParams.SessionExpirationDuration, Key.ECDsaKey, false) + var token = await InitSessionForDuration(ctx, wrapper, RebalanceParams.SessionExpirationDuration, Key.ECDsaKey, false) .ConfigureAwait(false); var key = FormCacheKey(nodeParams.Addresses[j], Key.PublicKey); @@ -158,13 +158,13 @@ public partial class Pool : IFrostFSClient catch (RpcException ex) { if (!dialed) - client!.SetUnhealthyOnDial(); + wrapper!.SetUnhealthyOnDial(); else - client!.SetUnhealthy(); + wrapper!.SetUnhealthy(); if (logger != null) { - FrostFsMessages.SessionCreationError(logger, client!.WrapperPrm.Address, ex.Message); + FrostFsMessages.SessionCreationError(logger, wrapper!.WrapperPrm.Address, ex.Message); } } catch (FrostFsInvalidObjectException) @@ -175,7 +175,7 @@ public partial class Pool : IFrostFSClient var sampler = new Sampler(nodeParams.Weights.ToArray()); - inner[i] = new InnerPool(sampler, clients); + inner[i] = new InnerPool(sampler, wrappers); i++; } @@ -277,7 +277,7 @@ public partial class Pool : IFrostFSClient parameters.ClientBuilder ??= new Func((address) => { - var wrapperPrm = new WrapperPrm + var wrapperPrm = new WrapperPrm() { Address = address, Key = parameters.Key!, @@ -287,7 +287,9 @@ public partial class Pool : IFrostFSClient ErrorThreshold = parameters.ErrorThreshold, GracefulCloseOnSwitchTimeout = parameters.GracefulCloseOnSwitchTimeout, Callback = parameters.Callback, - Interceptors = parameters.Interceptors + Interceptors = parameters.Interceptors, + GrpcChannelFactory = parameters.GrpcChannelFactory + }; return new ClientWrapper(wrapperPrm, pool); @@ -334,14 +336,14 @@ public partial class Pool : IFrostFSClient { CancellationTokenSource.Cancel(); - if (InnerPools != null) - { - // close all clients - foreach (var innerPool in InnerPools) - foreach (var client in innerPool.Clients) - if (client.IsDialed()) - client.Client?.Close(); - } + //if (InnerPools != null) + //{ + // // close all clients + // foreach (var innerPool in InnerPools) + // foreach (var client in innerPool.Clients) + // if (client.IsDialed()) + // client.Client?.Close(); + //} } // startRebalance runs loop to monitor connection healthy status. @@ -586,12 +588,18 @@ public partial class Pool : IFrostFSClient return await client.Client!.GetObjectAsync(args, ctx).ConfigureAwait(false); } - public async Task PutObjectAsync(PrmObjectPut args, CallContext ctx) + public async Task PutObjectAsync(PrmObjectPut args, CallContext ctx) { var client = Connection(); return await client.Client!.PutObjectAsync(args, ctx).ConfigureAwait(false); } + public async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) + { + var client = Connection(); + return await client.Client!.PutClientCutObjectAsync(args, ctx).ConfigureAwait(false); + } + public async Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) { var client = Connection(); @@ -655,8 +663,6 @@ public partial class Pool : IFrostFSClient public void Dispose() { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: true); - GC.SuppressFinalize(this); } } diff --git a/src/FrostFS.SDK.Client/Pool/WrapperPrm.cs b/src/FrostFS.SDK.Client/Pool/WrapperPrm.cs index b68b6ee..83ac869 100644 --- a/src/FrostFS.SDK.Client/Pool/WrapperPrm.cs +++ b/src/FrostFS.SDK.Client/Pool/WrapperPrm.cs @@ -2,8 +2,8 @@ using System.Collections.ObjectModel; using System.Security.Cryptography; +using Grpc.Core; using Grpc.Core.Interceptors; -using Grpc.Net.Client; using Microsoft.Extensions.Logging; @@ -28,7 +28,7 @@ public struct WrapperPrm internal Action PoolRequestInfoCallback { get; set; } - internal GrpcChannelOptions GrpcChannelOptions { get; set; } + internal Func GrpcChannelFactory { get; set; } internal ulong GracefulCloseOnSwitchTimeout { get; set; } diff --git a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs index 3267ad3..8561b50 100644 --- a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs +++ b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using FrostFS.Object; using FrostFS.Refs; using FrostFS.SDK.Client; +using FrostFS.SDK.Client.Interfaces; using FrostFS.SDK.Client.Mappers.GRPC; using FrostFS.SDK.Cryptography; using FrostFS.Session; @@ -265,33 +266,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl } } - internal async Task PutObjectAsync(PrmObjectPut args, CallContext ctx) - { - if (args.Header == null) - throw new ArgumentNullException(nameof(args), "Header is null"); - - if (args.Payload == null) - throw new ArgumentNullException(nameof(args), "Payload is null"); - - if (args.ClientCut) - { - return await PutClientCutObject(args, ctx).ConfigureAwait(false); - } - else - { - if (args.Header.PayloadLength > 0) - args.PutObjectContext.FullLength = args.Header.PayloadLength; - else if (args.Payload.CanSeek) - args.PutObjectContext.FullLength = (ulong)args.Payload.Length; - else - throw new ArgumentException("The stream does not have a length and payload length is not defined"); - - var response = await PutStreamObject(args, ctx).ConfigureAwait(false); - - return response.ObjectId; - } - } - internal async Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) { var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ClientContext); @@ -408,7 +382,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl return response.Body.ObjectId.ToModel(); } - private async Task PutClientCutObject(PrmObjectPut args, CallContext ctx) + internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var payloadStream = args.Payload!; var header = args.Header!; @@ -451,7 +425,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl args.Header!.Split = split; - var result = await PutStreamObject(args, default).ConfigureAwait(false); + var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); sentObjectIds.Add(result.ObjectId); @@ -468,7 +442,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl args.Header.Split!.ParentHeader = largeObjectHeader; - var result = await PutStreamObject(args, default).ConfigureAwait(false); + var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); sentObjectIds.Add(result.ObjectId); @@ -484,7 +458,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl // We are here if the payload is placed to one Object. It means no cut action, just simple PUT. args.Header!.Attributes = attributes; - var singlePartResult = await PutStreamObject(args, default).ConfigureAwait(false); + var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); return singlePartResult.ObjectId; } @@ -495,7 +469,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl public int ObjectSize = objectSize; } - private async Task PutStreamObject(PrmObjectPut args, CallContext ctx) + private async Task PutMultipartStreamObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var payload = args.Payload!; @@ -511,7 +485,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl try { // 0 means no limit from client, so server side cut is performed - var objectLimitSize = args.ClientCut ? args.PutObjectContext.MaxObjectSizeCache : 0; + var objectLimitSize = args.PutObjectContext.MaxObjectSizeCache; if (args.CustomBuffer != null) { @@ -573,7 +547,14 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl } } - private async Task> GetUploadStream(PrmObjectPut args, CallContext ctx) + internal async Task PutStreamObjectAsync(PrmObjectPutBase args, CallContext ctx) + { + var stream = await GetUploadStream(args, ctx).ConfigureAwait(false); + + return new ObjectWriter(ClientContext, args, stream); + } + + private async Task> GetUploadStream(PrmObjectPutBase args, CallContext ctx) { var header = args.Header!; diff --git a/src/FrostFS.SDK.Client/Tools/ClientContext.cs b/src/FrostFS.SDK.Client/Tools/ClientContext.cs index 133ad3f..e2522c9 100644 --- a/src/FrostFS.SDK.Client/Tools/ClientContext.cs +++ b/src/FrostFS.SDK.Client/Tools/ClientContext.cs @@ -1,12 +1,12 @@ using System; using System.Collections.ObjectModel; +using Grpc.Core; using Grpc.Core.Interceptors; -using Grpc.Net.Client; namespace FrostFS.SDK.Client; -public class ClientContext(FrostFSClient client, ClientKey key, FrostFsOwner owner, GrpcChannel channel, FrostFsVersion version) : IDisposable +public class ClientContext(FrostFSClient client, ClientKey key, FrostFsOwner owner, ChannelBase channel, FrostFsVersion version) { private string? sessionKey; @@ -14,7 +14,7 @@ public class ClientContext(FrostFSClient client, ClientKey key, FrostFsOwner own internal string? Address { get; } = channel.Target; - internal GrpcChannel Channel { get; private set; } = channel; + internal ChannelBase Channel { get; private set; } = channel; internal FrostFsVersion Version { get; } = version; @@ -45,18 +45,4 @@ public class ClientContext(FrostFSClient client, ClientKey key, FrostFsOwner own return sessionKey; } } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - Channel?.Dispose(); - } - } } diff --git a/src/FrostFS.SDK.Cryptography/Extentions.cs b/src/FrostFS.SDK.Cryptography/Extentions.cs index 9d61972..a3f051f 100644 --- a/src/FrostFS.SDK.Cryptography/Extentions.cs +++ b/src/FrostFS.SDK.Cryptography/Extentions.cs @@ -1,8 +1,6 @@ using System.Security.Cryptography; using System.Threading; -using Google.Protobuf; - using Org.BouncyCastle.Crypto.Digests; namespace FrostFS.SDK.Cryptography; @@ -25,11 +23,6 @@ public static class Extentions return hash; } - public static ByteString Sha256(this IMessage data) - { - return ByteString.CopyFrom(data.ToByteArray().Sha256()); - } - public static byte[] Sha256(this byte[] value) { bool lockTaken = false; diff --git a/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj b/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj index 35804f5..53c9003 100644 --- a/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj +++ b/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj @@ -9,6 +9,10 @@ true + + + <_SkipUpgradeNetAnalyzersNuGetWarning>true + true @@ -16,8 +20,7 @@ - - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/FrostFS.SDK.Protos/FrostFS.SDK.Protos.csproj b/src/FrostFS.SDK.Protos/FrostFS.SDK.Protos.csproj index 8f69fe0..8e7250a 100644 --- a/src/FrostFS.SDK.Protos/FrostFS.SDK.Protos.csproj +++ b/src/FrostFS.SDK.Protos/FrostFS.SDK.Protos.csproj @@ -9,15 +9,19 @@ true + + + <_SkipUpgradeNetAnalyzersNuGetWarning>true + true - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/FrostFS.SDK.Tests/Multithread/MultithreadPoolSmokeTests.cs b/src/FrostFS.SDK.Tests/Multithread/MultithreadPoolSmokeTests.cs index b58cbb1..0cd3d6b 100644 --- a/src/FrostFS.SDK.Tests/Multithread/MultithreadPoolSmokeTests.cs +++ b/src/FrostFS.SDK.Tests/Multithread/MultithreadPoolSmokeTests.cs @@ -16,10 +16,9 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase private InitParameters GetDefaultParams() { - return new InitParameters + return new InitParameters((url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url))) { Key = keyString.LoadWif(), - NodeParams = [new(1, url, 100.0f)], ClientBuilder = null, GracefulCloseOnSwitchTimeout = 30_000_000, @@ -32,7 +31,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)); @@ -57,7 +56,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)); @@ -78,7 +77,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var callbackText = string.Empty; - var options = new InitParameters + var options = new InitParameters((url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url))) { Key = keyString.LoadWif(), NodeParams = [ @@ -91,7 +90,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds" }; - using var pool = new Pool(options); + var pool = new Pool(options); var ctx = new CallContext(TimeSpan.Zero); @@ -99,9 +98,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase Assert.Null(error); - using var client = FrostFSClient.GetInstance(GetSingleOwnerOptions(keyString, url)); - - var result = await client.GetNodeInfoAsync(default); + var result = await pool.GetNodeInfoAsync(default); var statistics = pool.Statistic(); @@ -117,7 +114,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase var callbackText = string.Empty; options.Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"; - using var pool = new Pool(options); + var pool = new Pool(options); var ctx = new CallContext(TimeSpan.Zero); @@ -125,9 +122,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase Assert.Null(error); - using var client = FrostFSClient.GetInstance(GetSingleOwnerOptions(keyString, url)); - - var result = await client.GetNodeInfoAsync(default); + var result = await pool.GetNodeInfoAsync(default); Assert.False(string.IsNullOrEmpty(callbackText)); Assert.Contains(" took ", callbackText, StringComparison.Ordinal); @@ -138,7 +133,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -160,7 +155,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -184,19 +179,19 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - bufferMaxSize: 1024, - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + var stream = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var @object = await pool.GetObjectAsync(new PrmObjectGet(containerId, objectId), default); var downloadedBytes = new byte[@object.Header.PayloadLength]; MemoryStream ms = new(downloadedBytes); - ReadOnlyMemory? chunk = null; + ReadOnlyMemory? chunk; while ((chunk = await @object.ObjectReader!.ReadChunk()) != null) { ms.Write(chunk.Value.Span); @@ -212,7 +207,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -240,11 +235,12 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")], - new FrostFsSplit()), - payload: new MemoryStream(bytes), - clientCut: false); + new FrostFsSplit())); - var objectId = await pool.PutObjectAsync(param, default); + var stream = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var head = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId, false), default); @@ -306,7 +302,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase Assert.True(cs.ElapsedMicroSeconds > 0); }); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -331,11 +327,12 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase new FrostFsObjectHeader( containerId: createdContainer, type: FrostFsObjectType.Regular, - [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false); + [new FrostFsAttributePair("fileName", "test")])); - var objectId = await pool.PutObjectAsync(param, default); + var stream = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -385,7 +382,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); options.Callback = new((cs) => Assert.True(cs.ElapsedMicroSeconds > 0)); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -414,11 +411,12 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase containerId: container, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await pool.PutObjectAsync(param, default); + var stream = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -472,7 +470,7 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -496,15 +494,14 @@ public class MultithreadPoolSmokeTests : SmokeTestsBase byte[] bytes = GetRandomBytes(objectSize); - var param = new PrmObjectPut( + var param = new PrmObjectClientCutPut( new FrostFsObjectHeader( containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: true); + payload: new MemoryStream(bytes)); - var objectId = await pool.PutObjectAsync(param, default); + var objectId = await pool.PutClientCutObjectAsync(param, default).ConfigureAwait(true); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); diff --git a/src/FrostFS.SDK.Tests/Multithread/MultithreadSmokeClientTests.cs b/src/FrostFS.SDK.Tests/Multithread/MultithreadSmokeClientTests.cs index 405278c..e72e657 100644 --- a/src/FrostFS.SDK.Tests/Multithread/MultithreadSmokeClientTests.cs +++ b/src/FrostFS.SDK.Tests/Multithread/MultithreadSmokeClientTests.cs @@ -19,7 +19,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void AccountTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); var result = await client.GetBalanceAsync(default); @@ -30,7 +30,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void NetworkMapTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); var result = await client.GetNetmapSnapshotAsync(default); @@ -50,7 +50,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void NodeInfoTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); var result = await client.GetNodeInfoAsync(default); @@ -65,12 +65,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void NodeInfoStatisticsTest() { - var options = GetClientOptions(keyString, url); + var options = ClientOptions; var callbackContent = string.Empty; options.Value.Callback = (cs) => callbackContent = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"; - using var client = FrostFSClient.GetInstance(options); + var client = FrostFSClient.GetInstance(options, GrpcChannel); var result = await client.GetNodeInfoAsync(default); @@ -80,12 +80,10 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void GetSessionTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); var token = await client.CreateSessionAsync(new(100), default); - var ownerHash = Base58.Decode(OwnerId!.Value); - Assert.NotNull(token); Assert.NotEqual(Guid.Empty, token.Id); Assert.Equal(33, token.SessionKey.Length); @@ -94,7 +92,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void CreateObjectWithSessionToken() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -114,11 +112,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await client.PutObjectAsync(param, default).ConfigureAwait(true); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var @object = await client.GetObjectAsync(new PrmObjectGet(containerId, objectId), default) .ConfigureAwait(true); @@ -140,7 +139,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void FilterTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -164,11 +163,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")], - new FrostFsSplit()), - payload: new MemoryStream(bytes), - clientCut: false); + new FrostFsSplit())); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var head = await client.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId), default); @@ -222,7 +222,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase { bool callbackInvoked = false; - var options = GetClientOptions(keyString, url); + var options = ClientOptions; options.Value.Callback = new((cs) => { @@ -230,7 +230,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase Assert.True(cs.ElapsedMicroSeconds > 0); }); - using var client = FrostFSClient.GetInstance(options); + var client = FrostFSClient.GetInstance(options, GrpcChannel); await Cleanup(client); @@ -253,11 +253,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase new FrostFsObjectHeader( containerId: createdContainer, type: FrostFsObjectType.Regular, - [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false); + [new FrostFsAttributePair("fileName", "test")])); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -302,7 +303,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void PatchTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -326,11 +327,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase new FrostFsObjectHeader( containerId: createdContainer, type: FrostFsObjectType.Regular, - [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false); + [new FrostFsAttributePair("fileName", "test")])); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var patch = new byte[16]; for (int i = 0; i < 16; i++) @@ -381,7 +383,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void RangeTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -402,11 +404,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase } var param = new PrmObjectPut( - new FrostFsObjectHeader(containerId: createdContainer, type: FrostFsObjectType.Regular), - payload: new MemoryStream(bytes), - clientCut: false); + new FrostFsObjectHeader(containerId: createdContainer, type: FrostFsObjectType.Regular)); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var rangeParam = new PrmRangeGet(createdContainer, objectId, new FrostFsRange(100, 64)); @@ -434,7 +437,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [Fact] public async void RangeHashTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -457,11 +460,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase var param = new PrmObjectPut( new FrostFsObjectHeader( containerId: createdContainer, - type: FrostFsObjectType.Regular), - payload: new MemoryStream(bytes), - clientCut: false); + type: FrostFsObjectType.Regular)); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var rangeParam = new PrmRangeHashGet(createdContainer, objectId, [new FrostFsRange(100, 64)], bytes); @@ -486,7 +490,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [InlineData(6 * 1024 * 1024 + 100)] public async void SimpleScenarioWithSessionTest(int objectSize) { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); //Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0)) var token = await client.CreateSessionAsync(new PrmSessionCreate(int.MaxValue), default); @@ -510,11 +514,12 @@ public class MultithreadSmokeClientTests : SmokeTestsBase containerId: container, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -567,11 +572,10 @@ public class MultithreadSmokeClientTests : SmokeTestsBase [InlineData(200)] public async void ClientCutScenarioTest(int objectSize) { - - var options = GetClientOptions(keyString, url); + var options = ClientOptions; options.Value.Interceptors.Add(new CallbackInterceptor()); - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(options, GrpcChannel); await Cleanup(client); @@ -589,15 +593,14 @@ public class MultithreadSmokeClientTests : SmokeTestsBase byte[] bytes = GetRandomBytes(objectSize); - var param = new PrmObjectPut( + var param = new PrmObjectClientCutPut( new FrostFsObjectHeader( containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: true); + payload: new MemoryStream(bytes)); - var objectId = await client.PutObjectAsync(param, default); + var objectId = await client.PutClientCutObjectAsync(param, default).ConfigureAwait(true); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -659,7 +662,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase bool callbackInvoked = false; bool intercepterInvoked = false; - var options = GetClientOptions(keyString, url); + var options = ClientOptions; options.Value.Callback = (cs) => { callbackInvoked = true; @@ -668,7 +671,7 @@ public class MultithreadSmokeClientTests : SmokeTestsBase options.Value.Interceptors.Add(new CallbackInterceptor(s => intercepterInvoked = true)); - using var client = FrostFSClient.GetInstance(options); + var client = FrostFSClient.GetInstance(options, GrpcChannel); var result = await client.GetNodeInfoAsync(default); diff --git a/src/FrostFS.SDK.Tests/Smoke/PoolSmokeTests.cs b/src/FrostFS.SDK.Tests/Smoke/PoolSmokeTests.cs index feaf838..ce626d5 100644 --- a/src/FrostFS.SDK.Tests/Smoke/PoolSmokeTests.cs +++ b/src/FrostFS.SDK.Tests/Smoke/PoolSmokeTests.cs @@ -4,8 +4,6 @@ using System.Security.Cryptography; using FrostFS.SDK.Client; using FrostFS.SDK.Cryptography; -using Microsoft.Extensions.Options; - namespace FrostFS.SDK.Tests.Smoke; [SuppressMessage("Reliability", "CA2007:Consider calling ConfigureAwait on the awaited task", Justification = "Default Value is correct for tests")] @@ -16,10 +14,9 @@ public class PoolSmokeTests : SmokeTestsBase private InitParameters GetDefaultParams() { - return new InitParameters + return new InitParameters((url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url))) { Key = keyString.LoadWif(), - NodeParams = [new(1, url, 100.0f)], ClientBuilder = null, GracefulCloseOnSwitchTimeout = 30_000_000, @@ -32,7 +29,7 @@ public class PoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)); @@ -57,7 +54,7 @@ public class PoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)); @@ -78,7 +75,7 @@ public class PoolSmokeTests : SmokeTestsBase { var callbackText = string.Empty; - var options = new InitParameters + var options = new InitParameters((url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url))) { Key = keyString.LoadWif(), NodeParams = [ @@ -91,7 +88,7 @@ public class PoolSmokeTests : SmokeTestsBase Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds" }; - using var pool = new Pool(options); + var pool = new Pool(options); var ctx = new CallContext(TimeSpan.Zero); @@ -99,9 +96,7 @@ public class PoolSmokeTests : SmokeTestsBase Assert.Null(error); - using var client = FrostFSClient.GetInstance(GetSingleOwnerOptions(keyString, url)); - - var result = await client.GetNodeInfoAsync(default); + var result = await pool.GetNodeInfoAsync(default); var statistics = pool.Statistic(); @@ -117,7 +112,7 @@ public class PoolSmokeTests : SmokeTestsBase var callbackText = string.Empty; options.Callback = (cs) => callbackText = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"; - using var pool = new Pool(options); + var pool = new Pool(options); var ctx = new CallContext(TimeSpan.Zero); @@ -125,9 +120,7 @@ public class PoolSmokeTests : SmokeTestsBase Assert.Null(error); - using var client = FrostFSClient.GetInstance(GetSingleOwnerOptions(keyString, url)); - - var result = await client.GetNodeInfoAsync(default); + var result = await pool.GetNodeInfoAsync(default); Assert.False(string.IsNullOrEmpty(callbackText)); Assert.Contains(" took ", callbackText, StringComparison.Ordinal); @@ -138,7 +131,7 @@ public class PoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -160,7 +153,7 @@ public class PoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -184,11 +177,12 @@ public class PoolSmokeTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + var stream = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var @object = await pool.GetObjectAsync(new PrmObjectGet(containerId, objectId), default); @@ -211,7 +205,7 @@ public class PoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -239,11 +233,12 @@ public class PoolSmokeTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")], - new FrostFsSplit()), - payload: new MemoryStream(bytes), - clientCut: false); + new FrostFsSplit())); - var objectId = await pool.PutObjectAsync(param, default); + var stream = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var head = await pool.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId), default); @@ -305,7 +300,7 @@ public class PoolSmokeTests : SmokeTestsBase Assert.True(cs.ElapsedMicroSeconds > 0); }); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -332,11 +327,12 @@ public class PoolSmokeTests : SmokeTestsBase new FrostFsObjectHeader( containerId: createdContainer, type: FrostFsObjectType.Regular, - [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false); + [new FrostFsAttributePair("fileName", "test")])); - var objectId = await pool.PutObjectAsync(param, default); + var stream = await pool.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -386,7 +382,7 @@ public class PoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); options.Callback = new((cs) => Assert.True(cs.ElapsedMicroSeconds > 0)); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -415,11 +411,12 @@ public class PoolSmokeTests : SmokeTestsBase containerId: container, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await pool.PutObjectAsync(param, new CallContext(TimeSpan.Zero)); + var stream = await pool.PutObjectAsync(param, new CallContext(TimeSpan.Zero)).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -474,7 +471,7 @@ public class PoolSmokeTests : SmokeTestsBase { var options = GetDefaultParams(); - using var pool = new Pool(options); + var pool = new Pool(options); var error = await pool.Dial(new CallContext(TimeSpan.Zero)).ConfigureAwait(true); @@ -498,15 +495,14 @@ public class PoolSmokeTests : SmokeTestsBase byte[] bytes = GetRandomBytes(objectSize); - var param = new PrmObjectPut( + var param = new PrmObjectClientCutPut( new FrostFsObjectHeader( containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: true); + payload: new MemoryStream(bytes)); - var objectId = await pool.PutObjectAsync(param, default); + var objectId = await pool.PutClientCutObjectAsync(param, default).ConfigureAwait(true); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -558,23 +554,6 @@ public class PoolSmokeTests : SmokeTestsBase return bytes; } - private static IOptions GetSingleOwnerOptions(string key, string url) - { - return Options.Create(new ClientSettings - { - Key = key, - Host = url - }); - } - - private static IOptions GetOptions(string url) - { - return Options.Create(new ClientSettings - { - Host = url - }); - } - static async Task Cleanup(Pool pool) { await foreach (var cid in pool.ListContainersAsync(default, default)) diff --git a/src/FrostFS.SDK.Tests/Smoke/SmokeClientTests.cs b/src/FrostFS.SDK.Tests/Smoke/SmokeClientTests.cs index 9c3b21f..db43eef 100644 --- a/src/FrostFS.SDK.Tests/Smoke/SmokeClientTests.cs +++ b/src/FrostFS.SDK.Tests/Smoke/SmokeClientTests.cs @@ -19,7 +19,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void AccountTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); var result = await client.GetBalanceAsync(default); @@ -30,7 +30,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void NodeInfoTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); var result = await client.GetNodeInfoAsync(default); @@ -45,12 +45,12 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void NodeInfoStatisticsTest() { - var options = GetClientOptions(keyString, url); + var options = ClientOptions; var callbackContent = string.Empty; options.Value.Callback = (cs) => callbackContent = $"{cs.MethodName} took {cs.ElapsedMicroSeconds} microseconds"; - using var client = FrostFSClient.GetInstance(options); + var client = FrostFSClient.GetInstance(options, GrpcChannel); var result = await client.GetNodeInfoAsync(default); @@ -60,7 +60,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void GetSessionTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); var token = await client.CreateSessionAsync(new(100), default); @@ -74,7 +74,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void CreateObjectWithSessionToken() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -94,11 +94,12 @@ public class SmokeClientTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await client.PutObjectAsync(param, default).ConfigureAwait(true); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var @object = await client.GetObjectAsync(new PrmObjectGet(containerId, objectId), default) .ConfigureAwait(true); @@ -120,7 +121,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void FilterTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -144,11 +145,12 @@ public class SmokeClientTests : SmokeTestsBase containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")], - new FrostFsSplit()), - payload: new MemoryStream(bytes), - clientCut: false); + new FrostFsSplit())); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var head = await client.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId), default); @@ -202,7 +204,7 @@ public class SmokeClientTests : SmokeTestsBase { bool callbackInvoked = false; - var options = GetClientOptions(keyString, url); + var options = ClientOptions; options.Value.Callback = new((cs) => { @@ -210,7 +212,7 @@ public class SmokeClientTests : SmokeTestsBase Assert.True(cs.ElapsedMicroSeconds > 0); }); - using var client = FrostFSClient.GetInstance(options); + var client = FrostFSClient.GetInstance(options, GrpcChannel); await Cleanup(client); @@ -233,11 +235,12 @@ public class SmokeClientTests : SmokeTestsBase new FrostFsObjectHeader( containerId: createdContainer, type: FrostFsObjectType.Regular, - [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false); + [new FrostFsAttributePair("fileName", "test")])); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter1 = new FilterByOwnerId(FrostFsMatchType.Equals, OwnerId!); await foreach (var objId in client.SearchObjectsAsync(new PrmObjectSearch(createdContainer, null, [], filter1), default)) @@ -292,7 +295,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void PatchTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -316,24 +319,25 @@ public class SmokeClientTests : SmokeTestsBase new FrostFsObjectHeader( containerId: createdContainer, type: FrostFsObjectType.Regular, - [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false); + [new FrostFsAttributePair("fileName", "test")])); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); - var patch = new byte[16]; - for (int i = 0; i < 16; i++) + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); + + var patch = new byte[256]; + for (int i = 0; i < patch.Length; i++) { patch[i] = 32; } - var range = new FrostFsRange(8, (ulong)patch.Length); + var range = new FrostFsRange(64, (ulong)patch.Length); var patchParams = new PrmObjectPatch( new FrostFsAddress(createdContainer, objectId), payload: new MemoryStream(patch), - maxChunkLength: 32, + maxChunkLength: 256, range: range); var newIbjId = await client.PatchObjectAsync(patchParams, default); @@ -371,7 +375,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void RangeTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -394,26 +398,27 @@ public class SmokeClientTests : SmokeTestsBase var param = new PrmObjectPut( new FrostFsObjectHeader( containerId: createdContainer, - type: FrostFsObjectType.Regular), - payload: new MemoryStream(bytes), - clientCut: false); + type: FrostFsObjectType.Regular)); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); - var rangeParam = new PrmRangeGet(createdContainer, objectId, new FrostFsRange(100, 64)); + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); + + var rangeParam = new PrmRangeGet(createdContainer, objectId, new FrostFsRange(50, 100)); var rangeReader = await client.GetRangeAsync(rangeParam, default); var downloadedBytes = new byte[rangeParam.Range.Length]; MemoryStream ms = new(downloadedBytes); - ReadOnlyMemory? chunk = null; + ReadOnlyMemory? chunk; while ((chunk = await rangeReader!.ReadChunk()) != null) { ms.Write(chunk.Value.Span); } - Assert.Equal(SHA256.HashData(bytes.AsSpan().Slice(100, 64)), SHA256.HashData(downloadedBytes)); + Assert.Equal(SHA256.HashData(bytes.AsSpan().Slice(50, 100)), SHA256.HashData(downloadedBytes)); await Cleanup(client); @@ -426,7 +431,7 @@ public class SmokeClientTests : SmokeTestsBase [Fact] public async void RangeHashTest() { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); await Cleanup(client); @@ -449,11 +454,12 @@ public class SmokeClientTests : SmokeTestsBase var param = new PrmObjectPut( new FrostFsObjectHeader( containerId: createdContainer, - type: FrostFsObjectType.Regular), - payload: new MemoryStream(bytes), - clientCut: false); + type: FrostFsObjectType.Regular)); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var rangeParam = new PrmRangeHashGet(createdContainer, objectId, [new FrostFsRange(100, 64)], bytes); @@ -478,7 +484,7 @@ public class SmokeClientTests : SmokeTestsBase [InlineData(6 * 1024 * 1024 + 100)] public async void SimpleScenarioWithSessionTest(int objectSize) { - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(ClientOptions, GrpcChannel); //Callback = new((CallStatistics cs) => Assert.True(cs.ElapsedMicroSeconds > 0)) var token = await client.CreateSessionAsync(new PrmSessionCreate(int.MaxValue), default); @@ -502,11 +508,12 @@ public class SmokeClientTests : SmokeTestsBase containerId: container, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: false, sessionToken: token); - var objectId = await client.PutObjectAsync(param, default); + var stream = await client.PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var objectId = await stream.CompleteAsync(); var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -558,11 +565,10 @@ public class SmokeClientTests : SmokeTestsBase [InlineData(200)] public async void ClientCutScenarioTest(int objectSize) { - - var options = GetClientOptions(keyString, url); + var options = ClientOptions; options.Value.Interceptors.Add(new CallbackInterceptor()); - using var client = FrostFSClient.GetInstance(GetClientOptions(keyString, url)); + var client = FrostFSClient.GetInstance(options, GrpcChannel); await Cleanup(client); @@ -580,15 +586,14 @@ public class SmokeClientTests : SmokeTestsBase byte[] bytes = GetRandomBytes(objectSize); - var param = new PrmObjectPut( + var param = new PrmObjectClientCutPut( new FrostFsObjectHeader( containerId: containerId, type: FrostFsObjectType.Regular, [new FrostFsAttributePair("fileName", "test")]), - payload: new MemoryStream(bytes), - clientCut: true); + payload: new MemoryStream(bytes)); - var objectId = await client.PutObjectAsync(param, default); + var objectId = await client.PutClientCutObjectAsync(param, default).ConfigureAwait(true); // var filter = new FilterByAttributePair(FrostFsMatchType.Equals, "fileName", "test"); @@ -607,14 +612,12 @@ public class SmokeClientTests : SmokeTestsBase //Assert.True(hasObject); - var filter1 = new FilterByOwnerId(FrostFsMatchType.Equals, OwnerId!); await foreach (var objId in client.SearchObjectsAsync(new PrmObjectSearch(containerId, null, [], filter1), default)) { var objHeader = await client.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId, false), default); var objHeader1 = await client.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId, true), default); - } var @object = await client.GetObjectAsync(new PrmObjectGet(containerId, objectId), default); @@ -657,7 +660,7 @@ public class SmokeClientTests : SmokeTestsBase bool callbackInvoked = false; bool intercepterInvoked = false; - var options = GetClientOptions(keyString, url); + var options = ClientOptions; options.Value.Callback = (cs) => { callbackInvoked = true; @@ -666,7 +669,7 @@ public class SmokeClientTests : SmokeTestsBase options.Value.Interceptors.Add(new CallbackInterceptor(s => intercepterInvoked = true)); - using var client = FrostFSClient.GetInstance(options); + var client = FrostFSClient.GetInstance(options, GrpcChannel); var result = await client.GetNodeInfoAsync(default); diff --git a/src/FrostFS.SDK.Tests/Smoke/SmokeTestsBase.cs b/src/FrostFS.SDK.Tests/Smoke/SmokeTestsBase.cs index 0b711df..5a9942d 100644 --- a/src/FrostFS.SDK.Tests/Smoke/SmokeTestsBase.cs +++ b/src/FrostFS.SDK.Tests/Smoke/SmokeTestsBase.cs @@ -3,6 +3,10 @@ using FrostFS.SDK.Client; using FrostFS.SDK.Cryptography; +using Grpc.Core; + +using Microsoft.Extensions.Options; + namespace FrostFS.SDK.Tests.Smoke; public abstract class SmokeTestsBase @@ -25,4 +29,8 @@ public abstract class SmokeTestsBase OwnerId = FrostFsOwner.FromKey(Key); Version = new FrostFsVersion(2, 13); } + + protected static Func GrpcChannel => (url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url)); + + protected IOptions ClientOptions => Options.Create(new ClientSettings { Key = keyString, Host = url }); } diff --git a/src/FrostFS.SDK.Tests/Unit/ContainerTestsBase.cs b/src/FrostFS.SDK.Tests/Unit/ContainerTestsBase.cs index 80a70fe..b6cd61c 100644 --- a/src/FrostFS.SDK.Tests/Unit/ContainerTestsBase.cs +++ b/src/FrostFS.SDK.Tests/Unit/ContainerTestsBase.cs @@ -31,7 +31,7 @@ public abstract class ContainerTestsBase { return Client.FrostFSClient.GetTestInstance( Settings, - null, + (url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url)), new NetworkMocker(key).GetMock().Object, new SessionMocker(key).GetMock().Object, Mocker.GetMock().Object, diff --git a/src/FrostFS.SDK.Tests/Unit/NetworkTestsBase.cs b/src/FrostFS.SDK.Tests/Unit/NetworkTestsBase.cs index 2bbfb9f..9452220 100644 --- a/src/FrostFS.SDK.Tests/Unit/NetworkTestsBase.cs +++ b/src/FrostFS.SDK.Tests/Unit/NetworkTestsBase.cs @@ -32,7 +32,7 @@ public abstract class NetworkTestsBase { return Client.FrostFSClient.GetTestInstance( Options.Create(settings), - null, + (url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url)), Mocker.GetMock().Object, new SessionMocker(key).GetMock().Object, new ContainerMocker(key).GetMock().Object, diff --git a/src/FrostFS.SDK.Tests/Unit/ObjectTest.cs b/src/FrostFS.SDK.Tests/Unit/ObjectTest.cs index 8d837c5..445dd80 100644 --- a/src/FrostFS.SDK.Tests/Unit/ObjectTest.cs +++ b/src/FrostFS.SDK.Tests/Unit/ObjectTest.cs @@ -24,12 +24,12 @@ public class ObjectTest : ObjectTestsBase var bytes = new byte[1024]; rnd.NextBytes(bytes); - var param = new PrmObjectPut( - Mocker.ObjectHeader, - payload: new MemoryStream(bytes), - clientCut: false); + var param = new PrmObjectPut(Mocker.ObjectHeader); - var result = await GetClient().PutObjectAsync(param, default); + var stream = await GetClient().PutObjectAsync(param, default).ConfigureAwait(true); + + await stream.WriteAsync(bytes.AsMemory()); + var result = await stream.CompleteAsync(); var sentMessages = Mocker.ClientStreamWriter!.Messages; @@ -57,11 +57,10 @@ public class ObjectTest : ObjectTestsBase byte[] bytes = File.ReadAllBytes(@".\..\..\..\cat.jpg"); var fileLength = bytes.Length; - var param = new PrmObjectPut( + var param = new PrmObjectClientCutPut( Mocker.ObjectHeader, payload: new MemoryStream(bytes), - bufferMaxSize: 1024, - clientCut: true); + bufferMaxSize: 1024); Random rnd = new(); @@ -73,7 +72,7 @@ public class ObjectTest : ObjectTestsBase foreach (var objId in objIds) Mocker.ResultObjectIds!.Add(objId); - var result = await GetClient().PutObjectAsync(param, default); + var result = await GetClient().PutClientCutObjectAsync(param, default); var singleObjects = Mocker.PutSingleRequests.ToArray(); diff --git a/src/FrostFS.SDK.Tests/Unit/ObjectTestsBase.cs b/src/FrostFS.SDK.Tests/Unit/ObjectTestsBase.cs index ba8bb21..cff9064 100644 --- a/src/FrostFS.SDK.Tests/Unit/ObjectTestsBase.cs +++ b/src/FrostFS.SDK.Tests/Unit/ObjectTestsBase.cs @@ -50,7 +50,7 @@ public abstract class ObjectTestsBase { return FrostFSClient.GetTestInstance( Settings, - null, + (url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url)), NetworkMocker.GetMock().Object, SessionMocker.GetMock().Object, ContainerMocker.GetMock().Object, diff --git a/src/FrostFS.SDK.Tests/Unit/SessionTestsBase.cs b/src/FrostFS.SDK.Tests/Unit/SessionTestsBase.cs index 4482c2e..3706a91 100644 --- a/src/FrostFS.SDK.Tests/Unit/SessionTestsBase.cs +++ b/src/FrostFS.SDK.Tests/Unit/SessionTestsBase.cs @@ -39,7 +39,7 @@ public abstract class SessionTestsBase { return Client.FrostFSClient.GetTestInstance( Settings, - null, + (url) => Grpc.Net.Client.GrpcChannel.ForAddress(new Uri(url)), new NetworkMocker(key).GetMock().Object, Mocker.GetMock().Object, new ContainerMocker(key).GetMock().Object,