diff --git a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs index 1011b69..b5b5bde 100644 --- a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs +++ b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs @@ -385,89 +385,132 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl return response.Body.ObjectId.ToModel(); } - + internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { - var payloadStream = args.Payload!; + var stream = args.Payload!; var header = args.Header!; if (header.PayloadLength > 0) args.PutObjectContext.FullLength = header.PayloadLength; - else if (payloadStream.CanSeek) - args.PutObjectContext.FullLength = (ulong)payloadStream.Length; + else if (stream.CanSeek) + args.PutObjectContext.FullLength = (ulong)stream.Length; else throw new ArgumentException("The stream does not have a length and payload length is not defined"); - if (args.PutObjectContext.MaxObjectSizeCache == 0) - { - var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx) - .ConfigureAwait(false); + if (args.PutObjectContext.FullLength == 0) + throw new ArgumentException("The stream has zero length"); - args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize; + var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); + args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize; + + var restBytes = args.PutObjectContext.FullLength; + + var objectSize = (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes); + + // define collection capacity + var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0); + + // if the object fits one part, it can be loaded as non-complex object + if (objectsCount == 1) + { + var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); + return singlePartResult.ObjectId; } - var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition; - var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes) : args.PutObjectContext.MaxObjectSizeCache; + List parts = new(objectsCount); - //define collection capacity - var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0; - var objectsCount = args.PutObjectContext.FullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0; - - List sentObjectIds = new(objectsCount); - - FrostFsSplit? split = null; SplitId splitId = new(); + var partSize = args.PutObjectContext.MaxObjectSizeCache; + // keep attributes for the large object - var attributes = args.Header!.Attributes; - args.Header!.Attributes = null; + var attributes = args.Header!.Attributes.ToArray(); + header.Attributes = null; - // send all parts except the last one as separate Objects - while (restBytes > (ulong)args.PutObjectContext.MaxObjectSizeCache) + var remain = args.PutObjectContext.FullLength; + + FrostFsObjectHeader? parentHeader = null; + + var lastIndex = objectsCount - 1; + + bool rentBuffer = false; + byte[]? buffer = null; + + try { - split = new FrostFsSplit(splitId, sentObjectIds.LastOrDefault()); - - args.Header!.Split = split; - - var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); - - sentObjectIds.Add(result.ObjectId); - - restBytes -= (ulong)result.ObjectSize; - } - - // send the last part and create linkObject - if (sentObjectIds.Count > 0) - { - var largeObjectHeader = new FrostFsObjectHeader( - header.ContainerId, - FrostFsObjectType.Regular, - attributes != null ? [.. attributes] : []) + for (int i = 0; i < objectsCount; i++) { - PayloadLength = args.PutObjectContext.FullLength, - }; + if (args.CustomBuffer != null) + { + if (args.CustomBuffer.Length < partSize) + { + throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required"); + } - args.Header.Split!.ParentHeader = largeObjectHeader; + buffer = args.CustomBuffer; + } + else + { + buffer = ArrayPool.Shared.Rent(partSize); + rentBuffer = true; + } - var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); + var bytesToWrite = Math.Min((ulong)partSize, remain); - sentObjectIds.Add(result.ObjectId); + var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false); - var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds); + if (i == lastIndex) + { + parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes) + { + PayloadLength = args.PutObjectContext.FullLength + }; + } - _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); + // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter + var partHeader = new FrostFsObjectHeader( + header.ContainerId, + FrostFsObjectType.Regular, + [], + new FrostFsSplit(splitId, parts.LastOrDefault(), + parentHeader: parentHeader)) + { + PayloadLength = (ulong)size + }; - var parentHeader = args.Header.GetHeader(); + var obj = new FrostFsObject(partHeader) + { + SingleObjectPayload = buffer.Length == size ? buffer : buffer[..size] + }; - return parentHeader.Split!.Parent.ToModel(); + var prm = new PrmSingleObjectPut(obj); + + var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false); + + parts.Add(objId); + + if (i < lastIndex) + continue; + + // Once all parts of the object are uploaded, they must be linked into a single entity + var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts); + + _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); + + // Retrieve the ID of the linked object + return partHeader.GetHeader().Split!.Parent.ToModel(); + } + + throw new FrostFsException("Unexpected error"); + } + finally + { + if (rentBuffer && buffer != null) + { + ArrayPool.Shared.Return(buffer); + } } - - // We are here if the payload is placed to one Object. It means no cut action, just simple PUT. - args.Header!.Attributes = attributes; - - var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); - - return singlePartResult.ObjectId; } struct PutObjectResult(FrostFsObjectId objectId, int objectSize) @@ -481,7 +524,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl var payload = args.Payload!; var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; - var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition; chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize); @@ -526,22 +568,23 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl break; sentBytes += bytesCount; - + var chunkRequest = new PutRequest { Body = new PutRequest.Types.Body { - Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount) + Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory()[..bytesCount]) } }; chunkRequest.AddMetaHeader(args.XHeaders); - chunkRequest.Sign(ClientContext.Key.ECDsaKey); await stream.Write(chunkRequest).ConfigureAwait(false); } + args.PutObjectContext.CurrentStreamPosition += (ulong)sentBytes; + var response = await stream.Close().ConfigureAwait(false); Verifier.CheckResponse(response); @@ -580,10 +623,10 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl var initRequest = new PutRequest { Body = new PutRequest.Types.Body - { + { Init = new PutRequest.Types.Body.Types.Init { - Header = grpcHeader + Header = grpcHeader, } } }; diff --git a/src/FrostFS.SDK.Client/Tools/ObjectTools.cs b/src/FrostFS.SDK.Client/Tools/ObjectTools.cs index 40cf1ad..ffc33a3 100644 --- a/src/FrostFS.SDK.Client/Tools/ObjectTools.cs +++ b/src/FrostFS.SDK.Client/Tools/ObjectTools.cs @@ -68,7 +68,7 @@ public static class ObjectTools { Header = grpcHeader, ObjectId = new ObjectID { Value = grpcHeader.Sha256() }, - Payload = ByteString.CopyFrom(@object.SingleObjectPayload) + Payload = UnsafeByteOperations.UnsafeWrap(@object.SingleObjectPayload) }; obj.Signature = new Signature diff --git a/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs b/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs index b6ae62b..81f0cd3 100644 --- a/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs +++ b/src/FrostFS.SDK.Tests/Smoke/Client/ObjectTests/ObjectTests.cs @@ -41,7 +41,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase backupFactor: backupFactor, selectors: [], filter: [], - containerAttributes: [], + containerAttributes: [new FrostFsAttributePair("contAttrKey", "contAttrValue")], new FrostFsReplica(replicas)); Assert.NotNull(containerId); @@ -49,7 +49,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase await AddObjectRules(client, containerId); _testOutputHelper.WriteLine("rules added"); - + await RunSuite(client, containerId); } @@ -73,15 +73,18 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase { case serverCut: objectId = await CreateObjectServerCut(client, containerId, bytes); + _testOutputHelper.WriteLine($"\tserver side cut"); break; case clientCut: objectId = await CreateObjectClientCut(client, containerId, bytes); + _testOutputHelper.WriteLine($"\tclient side cut"); break; - case singleObject: + case singleObject: if (objectSize > 1 * 1024 * 1024) continue; objectId = await PutSingleObject(client, containerId, bytes); - + _testOutputHelper.WriteLine($"\tput single object"); + break; default: throw new ArgumentException("unexpected object type"); @@ -91,17 +94,38 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase _testOutputHelper.WriteLine($"\tobject created"); + var ecdsaKey = ClientOptions.Value.Key.LoadWif(); + var owner = FrostFsOwner.FromKey(ecdsaKey); + + FrostFsHeaderResult expected = new() + { + HeaderInfo = new FrostFsObjectHeader( + containerId: containerId, + type: FrostFsObjectType.Regular, + attributes: [new FrostFsAttributePair("fileName", "test")], + split: null, + owner: owner, + version: new FrostFsVersion(2, 13)) + { + PayloadLength = (ulong)objectSize, + PayloadCheckSum = hash + } + }; + + await ValidateHeader(client, containerId, objectId, expected); + _testOutputHelper.WriteLine($"\theader validated"); + await ValidateContent(client, containerId, hash, objectId); _testOutputHelper.WriteLine($"\tcontent validated"); await ValidateFilters(client, containerId, objectId, null, (ulong)bytes.Length); _testOutputHelper.WriteLine($"\tfilters validated"); - // if (type != clientCut) - // { - // await ValidatePatch(client, containerId, bytes, objectId); - // _testOutputHelper.WriteLine($"\tpatch validated"); - // } + if (type != clientCut) + { + await ValidatePatch(client, containerId, bytes, objectId); + _testOutputHelper.WriteLine($"\tpatch validated"); + } await ValidateRange(client, containerId, bytes, objectId); _testOutputHelper.WriteLine($"\trange validated"); @@ -126,27 +150,27 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase var objectRange = bytes.AsMemory().Slice(100, 64).ToArray(); var expectedHash = SHA256.HashData(objectRange); - + foreach (var h in hashes) { var x = h[..32].ToArray(); Assert.NotNull(x); Assert.True(x.Length > 0); - // Assert.True(expectedHash.SequenceEqual(h.ToArray())); + // Assert.True(expectedHash.SequenceEqual(h.ToArray())); } } private async Task ValidateRange(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes, FrostFsObjectId objectId) { - if (bytes.Length < 200) + if (bytes.Length < 100) return; await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(0, 50)); - await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(50, 100)); + await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(50, 50)); + + await CheckRange(client, containerId, bytes, objectId, new FrostFsRange((ulong)bytes.Length - 100, 100)); - await CheckRange(client, containerId, bytes, objectId, new FrostFsRange((ulong)bytes.Length-100, 100)); - if (bytes.Length >= 6200) await CheckRange(client, containerId, bytes, objectId, new FrostFsRange(6000, 100)); } @@ -161,11 +185,15 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase MemoryStream ms = new(rangeBytes); ReadOnlyMemory? chunk; + int readBytes = 0; while ((chunk = await rangeReader!.ReadChunk()) != null) { + readBytes += chunk.Value.Length; ms.Write(chunk.Value.Span); } + Assert.Equal(range.Length, (ulong)readBytes); + Assert.Equal(SHA256.HashData(bytes.AsSpan().Slice((int)range.Offset, (int)range.Length)), SHA256.HashData(rangeBytes)); _testOutputHelper.WriteLine($"\t\trange {range.Offset};{range.Length} validated"); @@ -173,7 +201,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase private static async Task ValidatePatch(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes, FrostFsObjectId objectId) { - if (bytes.Length < 1024 + 64) + if (bytes.Length < 1024 + 64 || bytes.Length > 5900) return; var patch = new byte[1024]; @@ -214,7 +242,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase for (int i = (int)rangeEnd; i < bytes.Length; i++) Assert.Equal(downloadedBytes[i], bytes[i]); } - + private async Task ValidateFilters(IFrostFSClient client, FrostFsContainerId containerId, FrostFsObjectId objectId, SplitId? splitId, ulong length) { @@ -237,13 +265,8 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase await CheckFilter(client, containerId, new FilterByVersion(FrostFsMatchType.Equals, networkInfo.NodeInfoCollection[0].Version)); - await CheckFilter(client, containerId, new FilterByEpoch(FrostFsMatchType.Equals, networkInfo.Epoch)); - await CheckFilter(client, containerId, new FilterByPayloadLength(FrostFsMatchType.Equals, length)); - // var checkSum = CheckSum.CreateCheckSum(hash); - // await CheckFilter(client, containerId, new FilterByPayloadHash(FrostFsMatchType.Equals, checkSum)); - await CheckFilter(client, containerId, new FilterByPhysicallyStored()); } @@ -265,7 +288,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase } } - private static async Task ValidateContent(IFrostFSClient client, FrostFsContainerId containerId, byte[] hash, FrostFsObjectId objectId) + private static async Task ValidateContent(IFrostFSClient client, FrostFsContainerId containerId, byte[] hash, FrostFsObjectId objectId) { var @object = await client.GetObjectAsync( new PrmObjectGet(containerId, objectId), @@ -283,6 +306,39 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase Assert.Equal(hash, SHA256.HashData(downloadedBytes)); } + private static async Task ValidateHeader( + IFrostFSClient client, + FrostFsContainerId containerId, + FrostFsObjectId objectId, + FrostFsHeaderResult expected) + { + var res = await client.GetObjectHeadAsync(new PrmObjectHeadGet(containerId, objectId, default), default); + + var objHeader = res.HeaderInfo; + Assert.NotNull(objHeader); + + Assert.Equal(containerId.GetValue(), objHeader.ContainerId.GetValue()); + + Assert.Equal(expected.HeaderInfo!.OwnerId!.Value, objHeader.OwnerId!.Value); + Assert.Equal(expected.HeaderInfo.Version!.Major, objHeader.Version!.Major); + Assert.Equal(expected.HeaderInfo.Version!.Minor, objHeader.Version!.Minor); + + Assert.Equal(expected.HeaderInfo.PayloadLength, objHeader.PayloadLength); + + Assert.Equal(expected.HeaderInfo.ObjectType, objHeader.ObjectType); + + if (expected.HeaderInfo.Attributes != null) + { + Assert.NotNull(objHeader.Attributes); + Assert.Equal(expected.HeaderInfo.Attributes.Count, objHeader.Attributes.Count); + + Assert.True(expected.HeaderInfo.Attributes.SequenceEqual(objHeader.Attributes)); + } + + Assert.Null(objHeader.Split); + } + + private static async Task CreateObjectServerCut(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes) { var header = new FrostFsObjectHeader( @@ -310,7 +366,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase return await client.PutClientCutObjectAsync(param, default).ConfigureAwait(true); } - private static async Task PutSingleObject(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes) + private static async Task PutSingleObject(IFrostFSClient client, FrostFsContainerId containerId, byte[] bytes) { var header = new FrostFsObjectHeader( containerId: containerId, @@ -324,7 +380,7 @@ public class ObjectTests(ITestOutputHelper testOutputHelper) : SmokeTestsBase return await client.PutSingleObjectAsync(param, default).ConfigureAwait(true); } - + private static async Task CheckFilter(IFrostFSClient client, FrostFsContainerId containerId, IObjectFilter filter) { var resultObjectsCount = 0;