From 8de6730282672cb1ed824b1991b78a38d9263908 Mon Sep 17 00:00:00 2001 From: Pavel Gross Date: Fri, 14 Jun 2024 11:58:29 +0300 Subject: [PATCH] [#7] Client cut internal --- FrostFS.SDK.sln | 3 + README.md | 22 +++-- src/FrostFS.SDK.ClientV2/Client.cs | 40 +++++++- src/FrostFS.SDK.ClientV2/Extensions/Object.cs | 2 +- .../Interfaces/IFrostFSClient.cs | 8 +- .../Mappers/GRPC/Object.cs | 25 ++++- .../Mappers/GRPC/OwnerId.cs | 5 + src/FrostFS.SDK.ClientV2/Services/Object.cs | 99 +++++++++++++++++-- .../FrostFS.SDK.Cryptography.csproj | 2 +- src/FrostFS.SDK.Cryptography/Helper.cs | 1 + src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs | 2 +- src/FrostFS.SDK.ModelsV2/Object/Object.cs | 5 +- .../Object/ObjectHeader.cs | 4 +- .../object/Extension.Message.cs | 6 ++ .../FrostFS.SDK.Tests.csproj | 28 ++++++ 15 files changed, 216 insertions(+), 36 deletions(-) create mode 100644 src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj diff --git a/FrostFS.SDK.sln b/FrostFS.SDK.sln index 67e5eb3..fcfe732 100644 --- a/FrostFS.SDK.sln +++ b/FrostFS.SDK.sln @@ -1,4 +1,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00 +# +VisualStudioVersion = 17.5.002.0 +MinimumVisualStudioVersion = Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FrostFS.SDK.ClientV2", "src\FrostFS.SDK.ClientV2\FrostFS.SDK.ClientV2.csproj", "{50D8F61F-C302-4AC9-8D8A-AB0B8C0988C3}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FrostFS.SDK.Cryptography", "src\FrostFS.SDK.Cryptography\FrostFS.SDK.Cryptography.csproj", "{3D804F4A-B0B2-47A5-B006-BE447BE64B50}" diff --git a/README.md b/README.md index 542a544..89ab89a 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,7 @@ static async Task PutObjectClientCut(IFrostFSClient fsClient, Contain var fileInfo = new FileInfo(fileName); var fullLength = (ulong)fileInfo.Length; var fileNameAttribute = new ObjectAttribute("fileName", fileInfo.Name); - + using var stream = File.OpenRead(fileName); while (true) { @@ -121,7 +121,7 @@ static async Task PutObjectClientCut(IFrostFSClient fsClient, Contain largeObject.AppendBlock(buffer, bytesCount); - currentObject = new FrostFS.SDK.ModelsV2.Object(containerId, buffer) + currentObject = new FrostFS.SDK.ModelsV2.Object(containerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer) .AddAttribute(fileNameAttribute) .SetSplit(split); @@ -134,15 +134,19 @@ static async Task PutObjectClientCut(IFrostFSClient fsClient, Contain if (sentObjectIds.Any()) { - largeObject.CalculateHash(); - - var linkObject = new LinkObject(containerId, split.SplitId, largeObject) - .AddChildren(sentObjectIds); - - _ = await fsClient.PutSingleObjectAsync(linkObject); + largeObject.CalculateHash() + .AddAttribute(fileNameAttribute); currentObject.SetParent(largeObject); - _ = await fsClient.PutSingleObjectAsync(currentObject); + + var objectId = await fsClient.PutSingleObjectAsync(currentObject); + sentObjectIds.Add(objectId); + + var linkObject = new LinkObject(containerId, split.SplitId, largeObject) + .AddChildren(sentObjectIds) + .AddAttribute(fileNameAttribute); + + _ = await fsClient.PutSingleObjectAsync(linkObject); return currentObject.GetParentId(); } diff --git a/src/FrostFS.SDK.ClientV2/Client.cs b/src/FrostFS.SDK.ClientV2/Client.cs index 7e9e29c..96fb227 100644 --- a/src/FrostFS.SDK.ClientV2/Client.cs +++ b/src/FrostFS.SDK.ClientV2/Client.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Generic; using System.Security.Cryptography; +using System.Text; using System.Threading.Tasks; using FrostFS.Container; using FrostFS.Netmap; @@ -10,6 +12,7 @@ using FrostFS.SDK.ModelsV2; using FrostFS.Session; using Grpc.Core; using Grpc.Net.Client; +using static FrostFS.Netmap.NetworkConfig.Types; using Version = FrostFS.SDK.ModelsV2.Version; namespace FrostFS.SDK.ClientV2; @@ -21,6 +24,8 @@ public partial class Client: IFrostFSClient public readonly OwnerId OwnerId; public readonly Version Version = new(2, 13); + private readonly Dictionary NetworkSettings = []; + private ContainerService.ContainerServiceClient? _containerServiceClient; private NetmapService.NetmapServiceClient? _netmapServiceClient; private ObjectService.ObjectServiceClient? _objectServiceClient; @@ -42,6 +47,33 @@ public partial class Client: IFrostFSClient InitObjectClient(); InitSessionClient(); CheckFrostFsVersionSupport(); + + InitNetworkInfoAsync(); + } + + private async void InitNetworkInfoAsync() + { + var info = await GetNetworkInfoAsync(); + + foreach (var param in info.Body.NetworkInfo.NetworkConfig.Parameters) + { + SetNetworksParam(param); + } + } + + private void SetNetworksParam(Parameter param) + { + var key = Encoding.UTF8.GetString(param.Key.ToByteArray()); + + var encodedValue = param.Value.ToByteArray(); + + ulong val = 0; + for (var i = encodedValue.Length - 1; i >= 0; i--) + { + val = (val << 8) + encodedValue[i]; + } + + NetworkSettings.Add(key, val); } private async void CheckFrostFsVersionSupport() @@ -84,7 +116,11 @@ public partial class Client: IFrostFSClient throw new ArgumentException(msg); } - _channel = GrpcChannel.ForAddress(uri, new GrpcChannelOptions { Credentials = grpcCredentials }); + _channel = GrpcChannel.ForAddress(uri, new GrpcChannelOptions + { + Credentials = grpcCredentials, + HttpHandler = new System.Net.Http.HttpClientHandler() + }); } private void InitContainerClient() @@ -106,4 +142,4 @@ public partial class Client: IFrostFSClient { _sessionServiceClient = new SessionService.SessionServiceClient(_channel); } -} \ No newline at end of file +} diff --git a/src/FrostFS.SDK.ClientV2/Extensions/Object.cs b/src/FrostFS.SDK.ClientV2/Extensions/Object.cs index 93d8d5f..3765e71 100644 --- a/src/FrostFS.SDK.ClientV2/Extensions/Object.cs +++ b/src/FrostFS.SDK.ClientV2/Extensions/Object.cs @@ -38,7 +38,7 @@ public static class Extensions public static LinkObject AddChildren(this LinkObject linkObject, IEnumerable objectIds) { - linkObject.Header.Split.Children.AddRange(objectIds); + linkObject.Header.Split!.Children.AddRange(objectIds); return linkObject; } } \ No newline at end of file diff --git a/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs b/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs index 5b82d1b..9ae6833 100644 --- a/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs +++ b/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.IO; +using System.Threading; using System.Threading.Tasks; using FrostFS.SDK.ModelsV2; @@ -18,13 +19,14 @@ public interface IFrostFSClient Task GetObjectHeadAsync(ContainerId containerId, ObjectId objectId); + Task GetObjectAsync(ContainerId containerId, ObjectId objectId); - Task PutObjectAsync(ObjectHeader header, Stream payload); + Task PutObjectAsync(ObjectHeader header, Stream payload, CancellationToken cancellationToken = default); - Task PutObjectAsync(ObjectHeader header, byte[] payload); + Task PutObjectAsync(ObjectHeader header, byte[] payload, CancellationToken cancellationToken = default); - Task PutSingleObjectAsync(ModelsV2.Object obj); + Task PutSingleObjectAsync(ModelsV2.Object obj, CancellationToken cancellationToken = default); Task DeleteObjectAsync(ContainerId containerId, ObjectId objectId); diff --git a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs index ef21202..15a7b1c 100644 --- a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs +++ b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs @@ -88,8 +88,8 @@ public static class ObjectHeaderMapper if (split.Children != null && split.Children.Any()) head.Split.Children.AddRange(split.Children.Select(id => id.ToGrpcMessage())); - } - + } + return head; } @@ -103,15 +103,31 @@ public static class ObjectHeaderMapper _ => throw new ArgumentException($"Unknown ObjectType. Value: '{header.ObjectType}'.") }; - return new ObjectHeader( + var model = new ObjectHeader( new ContainerId(Base58.Encode(header.ContainerId.Value.ToByteArray())), objTypeName, header.Attributes.Select(attribute => attribute.ToModel()).ToArray() ) { PayloadLength = header.PayloadLength, - Version = header.Version.ToModel() + Version = header.Version.ToModel(), + OwnerId = header.OwnerId.ToModel() }; + + if (header.Split != null) + { + model.Split = new Split(SplitId.CrateFromBinary(header.Split.SplitId.ToByteArray())) + { + Parent = header.Split.Parent?.ToModel(), + ParentHeader = header.Split.ParentHeader?.ToModel(), + Previous = header.Split.Previous?.ToModel() + }; + + if (header.Split.Children.Any()) + model.Split.Children.AddRange(header.Split.Children.Select(x => x.ToModel())); + } + + return model; } } @@ -148,4 +164,3 @@ public static class SignatureMapper }; } } - diff --git a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs index 4bea66d..b4f332e 100644 --- a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs +++ b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs @@ -13,4 +13,9 @@ public static class OwnerIdMapper Value = ByteString.CopyFrom(ownerId.ToHash()) }; } + + public static OwnerId ToModel(this OwnerID ownerId) + { + return new OwnerId(ownerId.ToString()); + } } \ No newline at end of file diff --git a/src/FrostFS.SDK.ClientV2/Services/Object.cs b/src/FrostFS.SDK.ClientV2/Services/Object.cs index bc98dc8..004df1d 100644 --- a/src/FrostFS.SDK.ClientV2/Services/Object.cs +++ b/src/FrostFS.SDK.ClientV2/Services/Object.cs @@ -13,6 +13,8 @@ using FrostFS.SDK.Cryptography; using FrostFS.Session; using FrostFS.SDK.ModelsV2; +using FrostFS.SDK.ClientV2.Extensions; +using System.Threading; namespace FrostFS.SDK.ClientV2; @@ -32,6 +34,7 @@ public partial class Client } }; + request.AddMetaHeader(); request.Sign(_key); var response = await _objectServiceClient!.HeadAsync(request); @@ -70,18 +73,83 @@ public partial class Client return obj.ToModel(); } - public async Task PutObjectAsync(ObjectHeader header, Stream payload) + public async Task PutObjectAsync(ObjectHeader header, Stream payload, CancellationToken cancellationToken = default) { - return await PutObject(header, payload); + return await PutObject(header, payload, cancellationToken); } - public async Task PutObjectAsync(ObjectHeader header, byte[] payload) + public async Task PutObjectAsync(ObjectHeader header, byte[] payload, CancellationToken cancellationToken = default) { using var stream = new MemoryStream(payload); - return await PutObject(header, stream); + return await PutObject(header, stream, cancellationToken); } - private async Task PutObject(ObjectHeader header, Stream payload) + private Task PutObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken) + { + if (header.ClientCut) + return PutClientCutObject(header, payload, cancellationToken); + else + return PutStreamObject(header, payload, cancellationToken); + } + + private async Task PutClientCutObject(ObjectHeader header, Stream payloadStream, CancellationToken cancellationToken) + { + ObjectId? objectId = null; + List sentObjectIds = []; + ModelsV2.Object? currentObject; + + var partSize = (int)NetworkSettings["MaxObjectSize"]; + var buffer = new byte[partSize]; + + var largeObject = new LargeObject(header.ContainerId); + + var split = new Split(); + + var fullLength = (ulong)payloadStream.Length; + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + var bytesCount = await payloadStream.ReadAsync(buffer, 0, partSize); + + split.Previous = sentObjectIds.LastOrDefault(); + + largeObject.AppendBlock(buffer, bytesCount); + + currentObject = new ModelsV2.Object(header.ContainerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer) + .AddAttributes(header.Attributes) + .SetSplit(split); + + if (largeObject.PayloadLength == fullLength) + break; + + objectId = await PutSingleObjectAsync(currentObject, cancellationToken); + + sentObjectIds.Add(objectId!); + } + + if (sentObjectIds.Any()) + { + largeObject.CalculateHash(); + + currentObject.SetParent(largeObject); + + objectId = await PutSingleObjectAsync(currentObject, cancellationToken); + sentObjectIds.Add(objectId); + + var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject) + .AddChildren(sentObjectIds); + + _ = await PutSingleObjectAsync(linkObject, cancellationToken); + + return currentObject.GetParentId(); + } + + return await PutSingleObjectAsync(currentObject, cancellationToken); + } + + private async Task PutStreamObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken) { var sessionToken = await CreateSessionAsync(uint.MaxValue); var hdr = header.ToGrpcMessage(); @@ -120,6 +188,8 @@ public partial class Client while (true) { + cancellationToken.ThrowIfCancellationRequested(); + var bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize); if (bufferLength == 0) @@ -141,7 +211,7 @@ public partial class Client return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()); } - public async Task PutSingleObjectAsync(ModelsV2.Object @object) + public async Task PutSingleObjectAsync(ModelsV2.Object @object, CancellationToken cancellationToken = default) { var sessionToken = await CreateSessionAsync(uint.MaxValue); @@ -163,7 +233,7 @@ public partial class Client request.Sign(_key); - var response = await _objectServiceClient!.PutSingleAsync(request); + var response = await _objectServiceClient!.PutSingleAsync(request, null, null, cancellationToken); Verifier.CheckResponse(response); return ObjectId.FromHash(obj.ObjectId.Value.ToByteArray()); @@ -207,6 +277,8 @@ public partial class Client split.Parent = grpcHeader.Split.Parent.ToModel(); } + + grpcHeader.Split.Previous = split.Previous?.ToGrpcMessage(); } var obj = new Object.Object @@ -286,10 +358,12 @@ public partial class Client request.Body.Filters.Add(filter.ToGrpcMessage()); } + request.AddMetaHeader(); request.Sign(_key); var objectsIds = SearchObjects(request); + await foreach (var oid in objectsIds) { yield return ObjectId.FromHash(oid.Value.ToByteArray()); @@ -301,12 +375,14 @@ public partial class Client using var stream = GetObjectInit(request); var obj = await stream.ReadHeader(); var payload = new byte[obj.Header.PayloadLength]; - var offset = 0; + var offset = 0L; var chunk = await stream.ReadChunk(); - while (chunk is not null) + while (chunk is not null && (ulong)offset < obj.Header.PayloadLength) { - chunk.CopyTo(payload, offset); + var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length); + + Array.Copy(chunk, 0, payload, offset, length); offset += chunk.Length; chunk = await stream.ReadChunk(); } @@ -321,6 +397,7 @@ public partial class Client if (initRequest is null) throw new ArgumentNullException(nameof(initRequest)); + return new ObjectReader { Call = _objectServiceClient!.Get(initRequest) @@ -376,3 +453,5 @@ public partial class Client } + + diff --git a/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj b/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj index 0534720..aa2afc9 100644 --- a/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj +++ b/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj @@ -7,8 +7,8 @@ + - diff --git a/src/FrostFS.SDK.Cryptography/Helper.cs b/src/FrostFS.SDK.Cryptography/Helper.cs index 266a193..198dcb5 100644 --- a/src/FrostFS.SDK.Cryptography/Helper.cs +++ b/src/FrostFS.SDK.Cryptography/Helper.cs @@ -11,6 +11,7 @@ public static class Helper internal static byte[] RIPEMD160(this byte[] value) { var hash = new byte[20]; + var digest = new RipeMD160Digest(); digest.BlockUpdate(value, 0, value.Length); digest.DoFinal(hash, 0); diff --git a/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs b/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs index 7e86912..d67d987 100644 --- a/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs +++ b/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs @@ -5,5 +5,5 @@ namespace FrostFS.SDK.ModelsV2.Netmap; public class NodeInfo { public NodeState State { get; set; } - public Version Version { get; set; } + public Version? Version { get; set; } } \ No newline at end of file diff --git a/src/FrostFS.SDK.ModelsV2/Object/Object.cs b/src/FrostFS.SDK.ModelsV2/Object/Object.cs index 5810699..b4236b1 100644 --- a/src/FrostFS.SDK.ModelsV2/Object/Object.cs +++ b/src/FrostFS.SDK.ModelsV2/Object/Object.cs @@ -22,7 +22,7 @@ public class Object public void SetParent(LargeObject largeObject) { - Header.Split!.ParentHeader = largeObject.Header; + Header.Split.ParentHeader = largeObject.Header; } public ObjectId? GetParentId() @@ -41,10 +41,11 @@ public class LargeObject(ContainerId container) : Object(container, []) this.payloadHash.TransformBlock(bytes, 0, count, bytes, 0); } - public void CalculateHash() + public LargeObject CalculateHash() { this.payloadHash.TransformFinalBlock([], 0, 0); Header.PayloadCheckSum = this.payloadHash.Hash; + return this; } public ulong PayloadLength diff --git a/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs b/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs index 8f51160..db9b867 100644 --- a/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs +++ b/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs @@ -6,7 +6,7 @@ namespace FrostFS.SDK.ModelsV2; public class ObjectHeader { - public OwnerId OwnerId { get; set; } + public OwnerId? OwnerId { get; set; } public List Attributes { get; set; } @@ -18,7 +18,7 @@ public class ObjectHeader public ObjectType ObjectType { get; set; } - public Version Version { get; set; } + public Version? Version { get; set; } public Split? Split { get; set; } diff --git a/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs b/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs index 6334c71..a7cd446 100644 --- a/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs +++ b/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using FrostFS.Session; using Google.Protobuf; @@ -201,26 +202,31 @@ namespace FrostFS.Object public partial class DeleteResponse : IResponse { + [DebuggerStepThrough] IMetaHeader IVerificableMessage.GetMetaHeader() { return MetaHeader; } + [DebuggerStepThrough] IVerificationHeader IVerificableMessage.GetVerificationHeader() { return VerifyHeader; } + [DebuggerStepThrough] void IVerificableMessage.SetMetaHeader(IMetaHeader metaHeader) { MetaHeader = (ResponseMetaHeader)metaHeader; } + [DebuggerStepThrough] void IVerificableMessage.SetVerificationHeader(IVerificationHeader verificationHeader) { VerifyHeader = (ResponseVerificationHeader)verificationHeader; } + [DebuggerStepThrough] public IMessage GetBody() { return Body; diff --git a/src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj b/src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj new file mode 100644 index 0000000..931a072 --- /dev/null +++ b/src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj @@ -0,0 +1,28 @@ + + + + net8.0 + enable + enable + + false + true + + + + + + + + + + + + + + + + + + +