From 6988fcedae2570fc567fc5a0db4e7f75730afcf2 Mon Sep 17 00:00:00 2001
From: Pavel Gross
Date: Mon, 3 Mar 2025 16:18:44 +0300
Subject: [PATCH] [#35] Client: rollback to PutSingleObject for client cut
upload
Signed-off-by: Pavel Gross
---
.../Services/ObjectServiceProvider.cs | 167 +++++++++++-------
src/FrostFS.SDK.Client/Tools/ObjectTools.cs | 2 +-
.../Smoke/Client/ObjectTests/ObjectTests.cs | 106 ++++++++---
3 files changed, 187 insertions(+), 88 deletions(-)
diff --git a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
index 1011b69..b5b5bde 100644
--- a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
+++ b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
@@ -385,89 +385,132 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return response.Body.ObjectId.ToModel();
}
-
+
internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
{
- var payloadStream = args.Payload!;
+ var stream = args.Payload!;
var header = args.Header!;
if (header.PayloadLength > 0)
args.PutObjectContext.FullLength = header.PayloadLength;
- else if (payloadStream.CanSeek)
- args.PutObjectContext.FullLength = (ulong)payloadStream.Length;
+ else if (stream.CanSeek)
+ args.PutObjectContext.FullLength = (ulong)stream.Length;
else
throw new ArgumentException("The stream does not have a length and payload length is not defined");
- if (args.PutObjectContext.MaxObjectSizeCache == 0)
- {
- var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx)
- .ConfigureAwait(false);
+ if (args.PutObjectContext.FullLength == 0)
+ throw new ArgumentException("The stream has zero length");
- args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
+ var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
+ args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
+
+ var restBytes = args.PutObjectContext.FullLength;
+
+ var objectSize = (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes);
+
+ // define collection capacity
+ var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0);
+
+ // if the object fits one part, it can be loaded as non-complex object
+ if (objectsCount == 1)
+ {
+ var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
+ return singlePartResult.ObjectId;
}
- var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition;
- var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes) : args.PutObjectContext.MaxObjectSizeCache;
+ List parts = new(objectsCount);
- //define collection capacity
- var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0;
- var objectsCount = args.PutObjectContext.FullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
-
- List sentObjectIds = new(objectsCount);
-
- FrostFsSplit? split = null;
SplitId splitId = new();
+ var partSize = args.PutObjectContext.MaxObjectSizeCache;
+
// keep attributes for the large object
- var attributes = args.Header!.Attributes;
- args.Header!.Attributes = null;
+ var attributes = args.Header!.Attributes.ToArray();
+ header.Attributes = null;
- // send all parts except the last one as separate Objects
- while (restBytes > (ulong)args.PutObjectContext.MaxObjectSizeCache)
+ var remain = args.PutObjectContext.FullLength;
+
+ FrostFsObjectHeader? parentHeader = null;
+
+ var lastIndex = objectsCount - 1;
+
+ bool rentBuffer = false;
+ byte[]? buffer = null;
+
+ try
{
- split = new FrostFsSplit(splitId, sentObjectIds.LastOrDefault());
-
- args.Header!.Split = split;
-
- var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
-
- sentObjectIds.Add(result.ObjectId);
-
- restBytes -= (ulong)result.ObjectSize;
- }
-
- // send the last part and create linkObject
- if (sentObjectIds.Count > 0)
- {
- var largeObjectHeader = new FrostFsObjectHeader(
- header.ContainerId,
- FrostFsObjectType.Regular,
- attributes != null ? [.. attributes] : [])
+ for (int i = 0; i < objectsCount; i++)
{
- PayloadLength = args.PutObjectContext.FullLength,
- };
+ if (args.CustomBuffer != null)
+ {
+ if (args.CustomBuffer.Length < partSize)
+ {
+ throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required");
+ }
- args.Header.Split!.ParentHeader = largeObjectHeader;
+ buffer = args.CustomBuffer;
+ }
+ else
+ {
+ buffer = ArrayPool.Shared.Rent(partSize);
+ rentBuffer = true;
+ }
- var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
+ var bytesToWrite = Math.Min((ulong)partSize, remain);
- sentObjectIds.Add(result.ObjectId);
+ var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
- var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds);
+ if (i == lastIndex)
+ {
+ parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes)
+ {
+ PayloadLength = args.PutObjectContext.FullLength
+ };
+ }
- _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
+ // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
+ var partHeader = new FrostFsObjectHeader(
+ header.ContainerId,
+ FrostFsObjectType.Regular,
+ [],
+ new FrostFsSplit(splitId, parts.LastOrDefault(),
+ parentHeader: parentHeader))
+ {
+ PayloadLength = (ulong)size
+ };
- var parentHeader = args.Header.GetHeader();
+ var obj = new FrostFsObject(partHeader)
+ {
+ SingleObjectPayload = buffer.Length == size ? buffer : buffer[..size]
+ };
- return parentHeader.Split!.Parent.ToModel();
+ var prm = new PrmSingleObjectPut(obj);
+
+ var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
+
+ parts.Add(objId);
+
+ if (i < lastIndex)
+ continue;
+
+ // Once all parts of the object are uploaded, they must be linked into a single entity
+ var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts);
+
+ _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
+
+ // Retrieve the ID of the linked object
+ return partHeader.GetHeader().Split!.Parent.ToModel();
+ }
+
+ throw new FrostFsException("Unexpected error");
+ }
+ finally
+ {
+ if (rentBuffer && buffer != null)
+ {
+ ArrayPool.Shared.Return(buffer);
+ }
}
-
- // We are here if the payload is placed to one Object. It means no cut action, just simple PUT.
- args.Header!.Attributes = attributes;
-
- var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
-
- return singlePartResult.ObjectId;
}
struct PutObjectResult(FrostFsObjectId objectId, int objectSize)
@@ -481,7 +524,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
var payload = args.Payload!;
var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
-
var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition;
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
@@ -526,22 +568,23 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
break;
sentBytes += bytesCount;
-
+
var chunkRequest = new PutRequest
{
Body = new PutRequest.Types.Body
{
- Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
+ Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory()[..bytesCount])
}
};
chunkRequest.AddMetaHeader(args.XHeaders);
-
chunkRequest.Sign(ClientContext.Key.ECDsaKey);
await stream.Write(chunkRequest).ConfigureAwait(false);
}
+ args.PutObjectContext.CurrentStreamPosition += (ulong)sentBytes;
+
var response = await stream.Close().ConfigureAwait(false);
Verifier.CheckResponse(response);
@@ -580,10 +623,10 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
var initRequest = new PutRequest
{
Body = new PutRequest.Types.Body
- {
+ {
Init = new PutRequest.Types.Body.Types.Init
{
- Header = grpcHeader
+ Header = grpcHeader,
}
}
};
diff --git a/src/FrostFS.SDK.Client/Tools/ObjectTools.cs b/src/FrostFS.SDK.Client/Tools/ObjectTools.cs
index 40cf1ad..ffc33a3 100644
--- a/src/FrostFS.SDK.Client/Tools/ObjectTools.cs
+++ b/src/FrostFS.SDK.Client/Tools/ObjectTools.cs
@@ -68,7 +68,7 @@ public static class ObjectTools
{
Header = grpcHeader,
ObjectId = new ObjectID { Value = grpcHeader.Sha256() },
- Payload = ByteString.CopyFrom(@object.SingleObjectPayload)
+ Payload = UnsafeByteOperations.UnsafeWrap(@object.SingleObjectPayload)
};
obj.Signature = new Signature
diff --git a/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs b/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs
index b6ae62b..81f0cd3 100644
--- a/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs
+++ b/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs
@@ -41,7 +41,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
backupFactor: backupFactor,
selectors: [],
filter: [],
- containerAttributes: [],
+ containerAttributes: [new FrostFsAttributePair("contAttrKey", "contAttrValue")],
new FrostFsReplica(replicas));
Assert.NotNull(containerId);
@@ -49,7 +49,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
await AddObjectRules(client, containerId);
_testOutputHelper.WriteLine("rules added");
-
+
await RunSuite(client, containerId);
}
@@ -73,15 +73,18 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
{
case serverCut:
objectId = await CreateObjectServerCut(client, containerId, bytes);
+ _testOutputHelper.WriteLine($"\tserver side cut");
break;
case clientCut:
objectId = await CreateObjectClientCut(client, containerId, bytes);
+ _testOutputHelper.WriteLine($"\tclient side cut");
break;
- case singleObject:
+ case singleObject:
if (objectSize > 1 * 1024 * 1024)
continue;
objectId = await PutSingleObject(client, containerId, bytes);
-
+ _testOutputHelper.WriteLine($"\tput single object");
+
break;
default:
throw new ArgumentException("unexpected object type");
@@ -91,17 +94,38 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
_testOutputHelper.WriteLine($"\tobject created");
+ var ecdsaKey = ClientOptions.Value.Key.LoadWif();
+ var owner = FrostFsOwner.FromKey(ecdsaKey);
+
+ FrostFsHeaderResult expected = new()
+ {
+ HeaderInfo = new FrostFsObjectHeader(
+ containerId: containerId,
+ type: FrostFsObjectType.Regular,
+ attributes: [new FrostFsAttributePair("fileName", "test")],
+ split: null,
+ owner: owner,
+ version: new FrostFsVersion(2, 13))
+ {
+ PayloadLength = (ulong)objectSize,
+ PayloadCheckSum = hash
+ }
+ };
+
+ await ValidateHeader(client, containerId, objectId, expected);
+ _testOutputHelper.WriteLine($"\theader validated");
+
await ValidateContent(client, containerId, hash, objectId);
_testOutputHelper.WriteLine($"\tcontent validated");
await ValidateFilters(client, containerId, objectId, null, (ulong)bytes.Length);
_testOutputHelper.WriteLine($"\tfilters validated");
- // if (type != clientCut)
- // {
- // await ValidatePatch(client, containerId, bytes, objectId);
- // _testOutputHelper.WriteLine($"\tpatch validated");
- // }
+ if (type != clientCut)
+ {
+ await ValidatePatch(client, containerId, bytes, objectId);
+ _testOutputHelper.WriteLine($"\tpatch validated");
+ }
await ValidateRange(client, containerId, bytes, objectId);
_testOutputHelper.WriteLine($"\trange validated");
@@ -126,27 +150,27 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
var objectRange = bytes.AsMemory().Slice(100, 64).ToArray();
var expectedHash = SHA256.HashData(objectRange);
-
+
foreach (var h in hashes)
{
var x = h[..32].ToArray();
Assert.NotNull(x);
Assert.True(x.Length > 0);
- // Assert.True(expectedHash.SequenceEqual(h.ToArray()));
+ // Assert.True(expectedHash.SequenceEqual(h.ToArray()));
}
}
private async Task ValidateRange(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes, FrostFsObjectId objectId)
{
- if (bytes.Length < 200)
+ if (bytes.Length < 100)
return;
await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(0, 50));
- await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(50, 100));
+ await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(50, 50));
+
+ await CheckRange(client, containerId, bytes, objectId, new FrostFsRange((ulong)bytes.Length - 100, 100));
- await CheckRange(client, containerId, bytes, objectId, new FrostFsRange((ulong)bytes.Length-100, 100));
-
if (bytes.Length >= 6200)
await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(6000, 100));
}
@@ -161,11 +185,15 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
MemoryStream ms = new(rangeBytes);
ReadOnlyMemory? chunk;
+ int readBytes = 0;
while ((chunk = await rangeReader!.ReadChunk()) != null)
{
+ readBytes += chunk.Value.Length;
ms.Write(chunk.Value.Span);
}
+ Assert.Equal(range.Length, (ulong)readBytes);
+
Assert.Equal(SHA256.HashData(bytes.AsSpan().Slice((int)range.Offset, (int)range.Length)), SHA256.HashData(rangeBytes));
_testOutputHelper.WriteLine($"\t\trange {range.Offset};{range.Length} validated");
@@ -173,7 +201,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
private static async Task ValidatePatch(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes, FrostFsObjectId objectId)
{
- if (bytes.Length < 1024 + 64)
+ if (bytes.Length < 1024 + 64 || bytes.Length > 5900)
return;
var patch = new byte[1024];
@@ -214,7 +242,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
for (int i = (int)rangeEnd; i < bytes.Length; i++)
Assert.Equal(downloadedBytes[i], bytes[i]);
}
-
+
private async Task ValidateFilters(IFrostFSClient client, FrostFsContainerId containerId, FrostFsObjectId objectId, SplitId? splitId, ulong length)
{
@@ -237,13 +265,8 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
await CheckFilter(client, containerId, new FilterByVersion(FrostFsMatchType.Equals, networkInfo.NodeInfoCollection[0].Version));
- await CheckFilter(client, containerId, new FilterByEpoch(FrostFsMatchType.Equals, networkInfo.Epoch));
-
await CheckFilter(client, containerId, new FilterByPayloadLength(FrostFsMatchType.Equals, length));
- // var checkSum = CheckSum.CreateCheckSum(hash);
- // await CheckFilter(client, containerId, new FilterByPayloadHash(FrostFsMatchType.Equals, checkSum));
-
await CheckFilter(client, containerId, new FilterByPhysicallyStored());
}
@@ -265,7 +288,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
}
}
- private static async Task ValidateContent(IFrostFSClient client, FrostFsContainerId containerId, byte[] hash, FrostFsObjectId objectId)
+ private static async Task ValidateContent(IFrostFSClient client, FrostFsContainerId containerId, byte[] hash, FrostFsObjectId objectId)
{
var @object = await client.GetObjectAsync(
new PrmObjectGet(containerId, objectId),
@@ -283,6 +306,39 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
Assert.Equal(hash, SHA256.HashData(downloadedBytes));
}
+ private static async Task ValidateHeader(
+ IFrostFSClient client,
+ FrostFsContainerId containerId,
+ FrostFsObjectId objectId,
+ FrostFsHeaderResult expected)
+ {
+ var res = await client.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId, default), default);
+
+ var objHeader = res.HeaderInfo;
+ Assert.NotNull(objHeader);
+
+ Assert.Equal(containerId.GetValue(), objHeader.ContainerId.GetValue());
+
+ Assert.Equal(expected.HeaderInfo!.OwnerId!.Value, objHeader.OwnerId!.Value);
+ Assert.Equal(expected.HeaderInfo.Version!.Major, objHeader.Version!.Major);
+ Assert.Equal(expected.HeaderInfo.Version!.Minor, objHeader.Version!.Minor);
+
+ Assert.Equal(expected.HeaderInfo.PayloadLength, objHeader.PayloadLength);
+
+ Assert.Equal(expected.HeaderInfo.ObjectType, objHeader.ObjectType);
+
+ if (expected.HeaderInfo.Attributes != null)
+ {
+ Assert.NotNull(objHeader.Attributes);
+ Assert.Equal(expected.HeaderInfo.Attributes.Count, objHeader.Attributes.Count);
+
+ Assert.True(expected.HeaderInfo.Attributes.SequenceEqual(objHeader.Attributes));
+ }
+
+ Assert.Null(objHeader.Split);
+ }
+
+
private static async Task CreateObjectServerCut(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes)
{
var header = new FrostFsObjectHeader(
@@ -310,7 +366,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
return await client.PutClientCutObjectAsync(param, default).ConfigureAwait(true);
}
- private static async Task PutSingleObject(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes)
+ private static async Task PutSingleObject(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes)
{
var header = new FrostFsObjectHeader(
containerId: containerId,
@@ -324,7 +380,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase
return await client.PutSingleObjectAsync(param, default).ConfigureAwait(true);
}
-
+
private static async Task CheckFilter(IFrostFSClient client, FrostFsContainerId containerId, IObjectFilter filter)
{
var resultObjectsCount = 0;
--
2.45.3