From b69d22966f0335d64946cdb3cfba94b042d5b9a8 Mon Sep 17 00:00:00 2001
From: Pavel Gross
Date: Fri, 14 Jun 2024 11:58:29 +0300
Subject: [PATCH] [#7] Client cut internal
---
FrostFS.SDK.sln | 3 +
README.md | 22 +++--
src/FrostFS.SDK.ClientV2/Client.cs | 40 +++++++-
src/FrostFS.SDK.ClientV2/Extensions/Object.cs | 2 +-
.../Interfaces/IFrostFSClient.cs | 8 +-
.../Mappers/GRPC/Object.cs | 25 ++++-
.../Mappers/GRPC/OwnerId.cs | 5 +
src/FrostFS.SDK.ClientV2/Services/Object.cs | 99 +++++++++++++++++--
.../FrostFS.SDK.Cryptography.csproj | 2 +-
src/FrostFS.SDK.Cryptography/Helper.cs | 1 +
src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs | 2 +-
src/FrostFS.SDK.ModelsV2/Object/Object.cs | 5 +-
.../Object/ObjectHeader.cs | 4 +-
.../object/Extension.Message.cs | 6 ++
.../FrostFS.SDK.Tests.csproj | 28 ++++++
15 files changed, 216 insertions(+), 36 deletions(-)
create mode 100644 src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj
diff --git a/FrostFS.SDK.sln b/FrostFS.SDK.sln
index 67e5eb3..fcfe732 100644
--- a/FrostFS.SDK.sln
+++ b/FrostFS.SDK.sln
@@ -1,4 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
+#
+VisualStudioVersion = 17.5.002.0
+MinimumVisualStudioVersion =
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FrostFS.SDK.ClientV2", "src\FrostFS.SDK.ClientV2\FrostFS.SDK.ClientV2.csproj", "{50D8F61F-C302-4AC9-8D8A-AB0B8C0988C3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FrostFS.SDK.Cryptography", "src\FrostFS.SDK.Cryptography\FrostFS.SDK.Cryptography.csproj", "{3D804F4A-B0B2-47A5-B006-BE447BE64B50}"
diff --git a/README.md b/README.md
index 542a544..89ab89a 100644
--- a/README.md
+++ b/README.md
@@ -111,7 +111,7 @@ static async Task PutObjectClientCut(IFrostFSClient fsClient, Contain
var fileInfo = new FileInfo(fileName);
var fullLength = (ulong)fileInfo.Length;
var fileNameAttribute = new ObjectAttribute("fileName", fileInfo.Name);
-
+
using var stream = File.OpenRead(fileName);
while (true)
{
@@ -121,7 +121,7 @@ static async Task PutObjectClientCut(IFrostFSClient fsClient, Contain
largeObject.AppendBlock(buffer, bytesCount);
- currentObject = new FrostFS.SDK.ModelsV2.Object(containerId, buffer)
+ currentObject = new FrostFS.SDK.ModelsV2.Object(containerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer)
.AddAttribute(fileNameAttribute)
.SetSplit(split);
@@ -134,15 +134,19 @@ static async Task PutObjectClientCut(IFrostFSClient fsClient, Contain
if (sentObjectIds.Any())
{
- largeObject.CalculateHash();
-
- var linkObject = new LinkObject(containerId, split.SplitId, largeObject)
- .AddChildren(sentObjectIds);
-
- _ = await fsClient.PutSingleObjectAsync(linkObject);
+ largeObject.CalculateHash()
+ .AddAttribute(fileNameAttribute);
currentObject.SetParent(largeObject);
- _ = await fsClient.PutSingleObjectAsync(currentObject);
+
+ var objectId = await fsClient.PutSingleObjectAsync(currentObject);
+ sentObjectIds.Add(objectId);
+
+ var linkObject = new LinkObject(containerId, split.SplitId, largeObject)
+ .AddChildren(sentObjectIds)
+ .AddAttribute(fileNameAttribute);
+
+ _ = await fsClient.PutSingleObjectAsync(linkObject);
return currentObject.GetParentId();
}
diff --git a/src/FrostFS.SDK.ClientV2/Client.cs b/src/FrostFS.SDK.ClientV2/Client.cs
index 7e9e29c..96fb227 100644
--- a/src/FrostFS.SDK.ClientV2/Client.cs
+++ b/src/FrostFS.SDK.ClientV2/Client.cs
@@ -1,5 +1,7 @@
using System;
+using System.Collections.Generic;
using System.Security.Cryptography;
+using System.Text;
using System.Threading.Tasks;
using FrostFS.Container;
using FrostFS.Netmap;
@@ -10,6 +12,7 @@ using FrostFS.SDK.ModelsV2;
using FrostFS.Session;
using Grpc.Core;
using Grpc.Net.Client;
+using static FrostFS.Netmap.NetworkConfig.Types;
using Version = FrostFS.SDK.ModelsV2.Version;
namespace FrostFS.SDK.ClientV2;
@@ -21,6 +24,8 @@ public partial class Client: IFrostFSClient
public readonly OwnerId OwnerId;
public readonly Version Version = new(2, 13);
+ private readonly Dictionary NetworkSettings = [];
+
private ContainerService.ContainerServiceClient? _containerServiceClient;
private NetmapService.NetmapServiceClient? _netmapServiceClient;
private ObjectService.ObjectServiceClient? _objectServiceClient;
@@ -42,6 +47,33 @@ public partial class Client: IFrostFSClient
InitObjectClient();
InitSessionClient();
CheckFrostFsVersionSupport();
+
+ InitNetworkInfoAsync();
+ }
+
+ private async void InitNetworkInfoAsync()
+ {
+ var info = await GetNetworkInfoAsync();
+
+ foreach (var param in info.Body.NetworkInfo.NetworkConfig.Parameters)
+ {
+ SetNetworksParam(param);
+ }
+ }
+
+ private void SetNetworksParam(Parameter param)
+ {
+ var key = Encoding.UTF8.GetString(param.Key.ToByteArray());
+
+ var encodedValue = param.Value.ToByteArray();
+
+ ulong val = 0;
+ for (var i = encodedValue.Length - 1; i >= 0; i--)
+ {
+ val = (val << 8) + encodedValue[i];
+ }
+
+ NetworkSettings.Add(key, val);
}
private async void CheckFrostFsVersionSupport()
@@ -84,7 +116,11 @@ public partial class Client: IFrostFSClient
throw new ArgumentException(msg);
}
- _channel = GrpcChannel.ForAddress(uri, new GrpcChannelOptions { Credentials = grpcCredentials });
+ _channel = GrpcChannel.ForAddress(uri, new GrpcChannelOptions
+ {
+ Credentials = grpcCredentials,
+ HttpHandler = new System.Net.Http.HttpClientHandler()
+ });
}
private void InitContainerClient()
@@ -106,4 +142,4 @@ public partial class Client: IFrostFSClient
{
_sessionServiceClient = new SessionService.SessionServiceClient(_channel);
}
-}
\ No newline at end of file
+}
diff --git a/src/FrostFS.SDK.ClientV2/Extensions/Object.cs b/src/FrostFS.SDK.ClientV2/Extensions/Object.cs
index 93d8d5f..3765e71 100644
--- a/src/FrostFS.SDK.ClientV2/Extensions/Object.cs
+++ b/src/FrostFS.SDK.ClientV2/Extensions/Object.cs
@@ -38,7 +38,7 @@ public static class Extensions
public static LinkObject AddChildren(this LinkObject linkObject, IEnumerable objectIds)
{
- linkObject.Header.Split.Children.AddRange(objectIds);
+ linkObject.Header.Split!.Children.AddRange(objectIds);
return linkObject;
}
}
\ No newline at end of file
diff --git a/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs b/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs
index 5b82d1b..9ae6833 100644
--- a/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs
+++ b/src/FrostFS.SDK.ClientV2/Interfaces/IFrostFSClient.cs
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.IO;
+using System.Threading;
using System.Threading.Tasks;
using FrostFS.SDK.ModelsV2;
@@ -18,13 +19,14 @@ public interface IFrostFSClient
Task GetObjectHeadAsync(ContainerId containerId, ObjectId objectId);
+
Task GetObjectAsync(ContainerId containerId, ObjectId objectId);
- Task PutObjectAsync(ObjectHeader header, Stream payload);
+ Task PutObjectAsync(ObjectHeader header, Stream payload, CancellationToken cancellationToken = default);
- Task PutObjectAsync(ObjectHeader header, byte[] payload);
+ Task PutObjectAsync(ObjectHeader header, byte[] payload, CancellationToken cancellationToken = default);
- Task PutSingleObjectAsync(ModelsV2.Object obj);
+ Task PutSingleObjectAsync(ModelsV2.Object obj, CancellationToken cancellationToken = default);
Task DeleteObjectAsync(ContainerId containerId, ObjectId objectId);
diff --git a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs
index ef21202..15a7b1c 100644
--- a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs
+++ b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/Object.cs
@@ -88,8 +88,8 @@ public static class ObjectHeaderMapper
if (split.Children != null && split.Children.Any())
head.Split.Children.AddRange(split.Children.Select(id => id.ToGrpcMessage()));
- }
-
+ }
+
return head;
}
@@ -103,15 +103,31 @@ public static class ObjectHeaderMapper
_ => throw new ArgumentException($"Unknown ObjectType. Value: '{header.ObjectType}'.")
};
- return new ObjectHeader(
+ var model = new ObjectHeader(
new ContainerId(Base58.Encode(header.ContainerId.Value.ToByteArray())),
objTypeName,
header.Attributes.Select(attribute => attribute.ToModel()).ToArray()
)
{
PayloadLength = header.PayloadLength,
- Version = header.Version.ToModel()
+ Version = header.Version.ToModel(),
+ OwnerId = header.OwnerId.ToModel()
};
+
+ if (header.Split != null)
+ {
+ model.Split = new Split(SplitId.CrateFromBinary(header.Split.SplitId.ToByteArray()))
+ {
+ Parent = header.Split.Parent?.ToModel(),
+ ParentHeader = header.Split.ParentHeader?.ToModel(),
+ Previous = header.Split.Previous?.ToModel()
+ };
+
+ if (header.Split.Children.Any())
+ model.Split.Children.AddRange(header.Split.Children.Select(x => x.ToModel()));
+ }
+
+ return model;
}
}
@@ -148,4 +164,3 @@ public static class SignatureMapper
};
}
}
-
diff --git a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs
index 4bea66d..b4f332e 100644
--- a/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs
+++ b/src/FrostFS.SDK.ClientV2/Mappers/GRPC/OwnerId.cs
@@ -13,4 +13,9 @@ public static class OwnerIdMapper
Value = ByteString.CopyFrom(ownerId.ToHash())
};
}
+
+ public static OwnerId ToModel(this OwnerID ownerId)
+ {
+ return new OwnerId(ownerId.ToString());
+ }
}
\ No newline at end of file
diff --git a/src/FrostFS.SDK.ClientV2/Services/Object.cs b/src/FrostFS.SDK.ClientV2/Services/Object.cs
index bc98dc8..004df1d 100644
--- a/src/FrostFS.SDK.ClientV2/Services/Object.cs
+++ b/src/FrostFS.SDK.ClientV2/Services/Object.cs
@@ -13,6 +13,8 @@ using FrostFS.SDK.Cryptography;
using FrostFS.Session;
using FrostFS.SDK.ModelsV2;
+using FrostFS.SDK.ClientV2.Extensions;
+using System.Threading;
namespace FrostFS.SDK.ClientV2;
@@ -32,6 +34,7 @@ public partial class Client
}
};
+
request.AddMetaHeader();
request.Sign(_key);
var response = await _objectServiceClient!.HeadAsync(request);
@@ -70,18 +73,83 @@ public partial class Client
return obj.ToModel();
}
- public async Task PutObjectAsync(ObjectHeader header, Stream payload)
+ public async Task PutObjectAsync(ObjectHeader header, Stream payload, CancellationToken cancellationToken = default)
{
- return await PutObject(header, payload);
+ return await PutObject(header, payload, cancellationToken);
}
- public async Task PutObjectAsync(ObjectHeader header, byte[] payload)
+ public async Task PutObjectAsync(ObjectHeader header, byte[] payload, CancellationToken cancellationToken = default)
{
using var stream = new MemoryStream(payload);
- return await PutObject(header, stream);
+ return await PutObject(header, stream, cancellationToken);
}
- private async Task PutObject(ObjectHeader header, Stream payload)
+ private Task PutObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken)
+ {
+ if (header.ClientCut)
+ return PutClientCutObject(header, payload, cancellationToken);
+ else
+ return PutStreamObject(header, payload, cancellationToken);
+ }
+
+ private async Task PutClientCutObject(ObjectHeader header, Stream payloadStream, CancellationToken cancellationToken)
+ {
+ ObjectId? objectId = null;
+ List sentObjectIds = [];
+ ModelsV2.Object? currentObject;
+
+ var partSize = (int)NetworkSettings["MaxObjectSize"];
+ var buffer = new byte[partSize];
+
+ var largeObject = new LargeObject(header.ContainerId);
+
+ var split = new Split();
+
+ var fullLength = (ulong)payloadStream.Length;
+
+ while (true)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var bytesCount = await payloadStream.ReadAsync(buffer, 0, partSize);
+
+ split.Previous = sentObjectIds.LastOrDefault();
+
+ largeObject.AppendBlock(buffer, bytesCount);
+
+ currentObject = new ModelsV2.Object(header.ContainerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer)
+ .AddAttributes(header.Attributes)
+ .SetSplit(split);
+
+ if (largeObject.PayloadLength == fullLength)
+ break;
+
+ objectId = await PutSingleObjectAsync(currentObject, cancellationToken);
+
+ sentObjectIds.Add(objectId!);
+ }
+
+ if (sentObjectIds.Any())
+ {
+ largeObject.CalculateHash();
+
+ currentObject.SetParent(largeObject);
+
+ objectId = await PutSingleObjectAsync(currentObject, cancellationToken);
+ sentObjectIds.Add(objectId);
+
+ var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
+ .AddChildren(sentObjectIds);
+
+ _ = await PutSingleObjectAsync(linkObject, cancellationToken);
+
+ return currentObject.GetParentId();
+ }
+
+ return await PutSingleObjectAsync(currentObject, cancellationToken);
+ }
+
+ private async Task PutStreamObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
var hdr = header.ToGrpcMessage();
@@ -120,6 +188,8 @@ public partial class Client
while (true)
{
+ cancellationToken.ThrowIfCancellationRequested();
+
var bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize);
if (bufferLength == 0)
@@ -141,7 +211,7 @@ public partial class Client
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
- public async Task PutSingleObjectAsync(ModelsV2.Object @object)
+ public async Task PutSingleObjectAsync(ModelsV2.Object @object, CancellationToken cancellationToken = default)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
@@ -163,7 +233,7 @@ public partial class Client
request.Sign(_key);
- var response = await _objectServiceClient!.PutSingleAsync(request);
+ var response = await _objectServiceClient!.PutSingleAsync(request, null, null, cancellationToken);
Verifier.CheckResponse(response);
return ObjectId.FromHash(obj.ObjectId.Value.ToByteArray());
@@ -207,6 +277,8 @@ public partial class Client
split.Parent = grpcHeader.Split.Parent.ToModel();
}
+
+ grpcHeader.Split.Previous = split.Previous?.ToGrpcMessage();
}
var obj = new Object.Object
@@ -286,10 +358,12 @@ public partial class Client
request.Body.Filters.Add(filter.ToGrpcMessage());
}
+
request.AddMetaHeader();
request.Sign(_key);
var objectsIds = SearchObjects(request);
+
await foreach (var oid in objectsIds)
{
yield return ObjectId.FromHash(oid.Value.ToByteArray());
@@ -301,12 +375,14 @@ public partial class Client
using var stream = GetObjectInit(request);
var obj = await stream.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
- var offset = 0;
+ var offset = 0L;
var chunk = await stream.ReadChunk();
- while (chunk is not null)
+ while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
{
- chunk.CopyTo(payload, offset);
+ var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
+
+ Array.Copy(chunk, 0, payload, offset, length);
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
@@ -321,6 +397,7 @@ public partial class Client
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
+
return new ObjectReader
{
Call = _objectServiceClient!.Get(initRequest)
@@ -376,3 +453,5 @@ public partial class Client
}
+
+
diff --git a/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj b/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj
index 0534720..aa2afc9 100644
--- a/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj
+++ b/src/FrostFS.SDK.Cryptography/FrostFS.SDK.Cryptography.csproj
@@ -7,8 +7,8 @@
+
-
diff --git a/src/FrostFS.SDK.Cryptography/Helper.cs b/src/FrostFS.SDK.Cryptography/Helper.cs
index 266a193..198dcb5 100644
--- a/src/FrostFS.SDK.Cryptography/Helper.cs
+++ b/src/FrostFS.SDK.Cryptography/Helper.cs
@@ -11,6 +11,7 @@ public static class Helper
internal static byte[] RIPEMD160(this byte[] value)
{
var hash = new byte[20];
+
var digest = new RipeMD160Digest();
digest.BlockUpdate(value, 0, value.Length);
digest.DoFinal(hash, 0);
diff --git a/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs b/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs
index 7e86912..d67d987 100644
--- a/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs
+++ b/src/FrostFS.SDK.ModelsV2/Netmap/NodeInfo.cs
@@ -5,5 +5,5 @@ namespace FrostFS.SDK.ModelsV2.Netmap;
public class NodeInfo
{
public NodeState State { get; set; }
- public Version Version { get; set; }
+ public Version? Version { get; set; }
}
\ No newline at end of file
diff --git a/src/FrostFS.SDK.ModelsV2/Object/Object.cs b/src/FrostFS.SDK.ModelsV2/Object/Object.cs
index 5810699..b4236b1 100644
--- a/src/FrostFS.SDK.ModelsV2/Object/Object.cs
+++ b/src/FrostFS.SDK.ModelsV2/Object/Object.cs
@@ -22,7 +22,7 @@ public class Object
public void SetParent(LargeObject largeObject)
{
- Header.Split!.ParentHeader = largeObject.Header;
+ Header.Split.ParentHeader = largeObject.Header;
}
public ObjectId? GetParentId()
@@ -41,10 +41,11 @@ public class LargeObject(ContainerId container) : Object(container, [])
this.payloadHash.TransformBlock(bytes, 0, count, bytes, 0);
}
- public void CalculateHash()
+ public LargeObject CalculateHash()
{
this.payloadHash.TransformFinalBlock([], 0, 0);
Header.PayloadCheckSum = this.payloadHash.Hash;
+ return this;
}
public ulong PayloadLength
diff --git a/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs b/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs
index 8f51160..db9b867 100644
--- a/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs
+++ b/src/FrostFS.SDK.ModelsV2/Object/ObjectHeader.cs
@@ -6,7 +6,7 @@ namespace FrostFS.SDK.ModelsV2;
public class ObjectHeader
{
- public OwnerId OwnerId { get; set; }
+ public OwnerId? OwnerId { get; set; }
public List Attributes { get; set; }
@@ -18,7 +18,7 @@ public class ObjectHeader
public ObjectType ObjectType { get; set; }
- public Version Version { get; set; }
+ public Version? Version { get; set; }
public Split? Split { get; set; }
diff --git a/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs b/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs
index 6334c71..a7cd446 100644
--- a/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs
+++ b/src/FrostFS.SDK.ProtosV2/object/Extension.Message.cs
@@ -1,3 +1,4 @@
+using System.Diagnostics;
using FrostFS.Session;
using Google.Protobuf;
@@ -201,26 +202,31 @@ namespace FrostFS.Object
public partial class DeleteResponse : IResponse
{
+ [DebuggerStepThrough]
IMetaHeader IVerificableMessage.GetMetaHeader()
{
return MetaHeader;
}
+ [DebuggerStepThrough]
IVerificationHeader IVerificableMessage.GetVerificationHeader()
{
return VerifyHeader;
}
+ [DebuggerStepThrough]
void IVerificableMessage.SetMetaHeader(IMetaHeader metaHeader)
{
MetaHeader = (ResponseMetaHeader)metaHeader;
}
+ [DebuggerStepThrough]
void IVerificableMessage.SetVerificationHeader(IVerificationHeader verificationHeader)
{
VerifyHeader = (ResponseVerificationHeader)verificationHeader;
}
+ [DebuggerStepThrough]
public IMessage GetBody()
{
return Body;
diff --git a/src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj b/src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj
new file mode 100644
index 0000000..931a072
--- /dev/null
+++ b/src/FrostFS.SDK.Tests/FrostFS.SDK.Tests.csproj
@@ -0,0 +1,28 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+ false
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+