diff --git a/src/FrostFS.SDK.Client/FrostFSClient.cs b/src/FrostFS.SDK.Client/FrostFSClient.cs index 860d346..bce9104 100644 --- a/src/FrostFS.SDK.Client/FrostFSClient.cs +++ b/src/FrostFS.SDK.Client/FrostFSClient.cs @@ -254,7 +254,8 @@ public class FrostFSClient : IFrostFSClient public Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { - return GetObjectService().PutClientCutObjectAsync(args, ctx); + return GetObjectService().PutClientCutSingleObjectAsync(args, ctx); + // return GetObjectService().PutClientCutObjectAsync(args, ctx); } public Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) diff --git a/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs b/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs index 115f9e2..fde13ff 100644 --- a/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs +++ b/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs @@ -44,6 +44,11 @@ public static class ObjectHeaderMapper public static FrostFsSplit ToModel(this Header.Types.Split split) { + if (split is null) + { + throw new ArgumentNullException(nameof(split)); + } + var children = split!.Children.Count != 0 ? new ReadOnlyCollection([.. split.Children.Select(x => x.ToModel())]) : null; diff --git a/src/FrostFS.SDK.Client/Models/Object/PartUploadedEventArgs.cs b/src/FrostFS.SDK.Client/Models/Object/PartUploadedEventArgs.cs new file mode 100644 index 0000000..16ad326 --- /dev/null +++ b/src/FrostFS.SDK.Client/Models/Object/PartUploadedEventArgs.cs @@ -0,0 +1,8 @@ +using System; + +namespace FrostFS.SDK.Client; + +public class PartUploadedEventArgs(ObjectPartInfo part) : EventArgs +{ + public ObjectPartInfo Part { get; } = part; +} diff --git a/src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs b/src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs new file mode 100644 index 0000000..748fdfe --- /dev/null +++ b/src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs @@ -0,0 +1,38 @@ +using FrostFS.SDK; + +namespace FrostFS.SDK.Client; + +public readonly struct ObjectPartInfo(long offset, int length, FrostFsObjectId objectId) : System.IEquatable +{ + public long Offset { get; } = offset; + public int Length { get; } = length; + public FrostFsObjectId ObjectId { get; } = objectId; + + public override bool Equals(object obj) + { + if (obj == null || obj is not ObjectPartInfo) + return false; + + return Equals((ObjectPartInfo)obj); + } + + public override int GetHashCode() + { + return ((int)(Offset >> 32)) ^ (int)Offset ^ Length ^ ObjectId.Value.GetHashCode(); + } + + public static bool operator ==(ObjectPartInfo left, ObjectPartInfo right) + { + return left.Equals(right); + } + + public static bool operator !=(ObjectPartInfo left, ObjectPartInfo right) + { + return !(left == right); + } + + public bool Equals(ObjectPartInfo other) + { + return GetHashCode() == other.GetHashCode(); + } +} diff --git a/src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs b/src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs new file mode 100644 index 0000000..a6fa905 --- /dev/null +++ b/src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; + +namespace FrostFS.SDK.Client; + +public class UploadProgressInfo +{ + private List _parts; + + public UploadProgressInfo(Guid splitId, int capacity = 8) + { + _parts = new List(capacity); + SplitId = new SplitId(splitId); + } + + public UploadProgressInfo(Guid splitId, Collection parts) + { + _parts = [.. parts]; + SplitId = new SplitId(splitId); + } + + public event EventHandler? Notify; + + public SplitId SplitId { get; } + + internal void AddPart(ObjectPartInfo part) + { + _parts.Add(part); + Notify?.Invoke(this, new PartUploadedEventArgs(part)); + } + + public ObjectPartInfo GetPart(int index) + { + return _parts[index]; + } + + public ObjectPartInfo GetLast() + { + return _parts.LastOrDefault(); + } + + public ReadOnlyCollection GetParts() + { + return new ReadOnlyCollection(_parts); + } +} \ No newline at end of file diff --git a/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs b/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs index 706e367..46f25e0 100644 --- a/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs +++ b/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs @@ -8,7 +8,8 @@ public readonly struct PrmObjectClientCutPut( int bufferMaxSize = 0, FrostFsSessionToken? sessionToken = null, byte[]? customBuffer = null, - string[]? xheaders = null) : PrmObjectPutBase, System.IEquatable + string[]? xheaders = null, + UploadProgressInfo? progress = null) : PrmObjectPutBase, System.IEquatable { /// /// Need to provide values like ContainerId and ObjectType to create and object. @@ -41,6 +42,8 @@ public readonly struct PrmObjectClientCutPut( /// public string[] XHeaders { get; } = xheaders ?? []; + public UploadProgressInfo? Progress { get; } = progress; + internal PutObjectContext PutObjectContext { get; } = new(); public override readonly bool Equals(object obj) diff --git a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs index ea3b727..ed9e9bc 100644 --- a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs +++ b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs @@ -385,30 +385,202 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl return response.Body.ObjectId.ToModel(); } - internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) + internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { - var stream = args.Payload!; - var header = args.Header!; + if (args.Payload == null) + throw new ArgumentException(nameof(args.Payload)); - if (header.PayloadLength > 0) - args.PutObjectContext.FullLength = header.PayloadLength; - else if (stream.CanSeek) - args.PutObjectContext.FullLength = (ulong)stream.Length; - else - throw new ArgumentException("The stream does not have a length and payload length is not defined"); - - if (args.PutObjectContext.FullLength == 0) - throw new ArgumentException("The stream has zero length"); + if (args.Header == null) + throw new ArgumentException(nameof(args.Header)); var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); - var partSize = (int)networkSettings.MaxObjectSize; + int partSize = (int)networkSettings.MaxObjectSize; - var restBytes = args.PutObjectContext.FullLength; + int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; - var objectSize = (int)Math.Min((ulong)partSize, restBytes); + ulong fullLength; - // define collection capacity - var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0); + // Information about the uploaded parts. + var progressInfo = args.Progress; // + var offset = 0L; + + if (progressInfo != null && progressInfo.GetParts().Count > 0) + { + if (!args.Payload.CanSeek) + { + throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported."); + } + + var lastPart = progressInfo.GetLast(); + args.Payload.Position = lastPart.Offset + lastPart.Length; + fullLength = (ulong)(args.Payload.Length - args.Payload.Position); + offset = args.Payload.Position; + } + else + { + if (args.Header.PayloadLength > 0) + fullLength = args.Header.PayloadLength; + else if (args.Payload.CanSeek) + fullLength = (ulong)args.Payload.Length; + else + throw new ArgumentException("The stream does not have a length and payload length is not defined"); + } + + //define collection capacity + var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0; + var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0; + + progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount); + + var remain = fullLength; + + byte[]? buffer = null; + bool isRentBuffer = false; + + try + { + if (args.CustomBuffer != null) + { + if (args.CustomBuffer.Length < chunkSize) + throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required"); + + buffer = args.CustomBuffer; + } + else + { + buffer = ArrayPool.Shared.Rent(chunkSize); + isRentBuffer = true; + } + + FrostFsObjectId? resultObjectId = null; + FrostFsObjectHeader? parentHeader = null; + + while (remain > 0) + { + var bytesToWrite = Math.Min((ulong)partSize, remain); + var isLastPart = remain <= (ulong)partSize; + + // When the last part of the object is uploaded, all metadata for the object must be added + if (isLastPart && objectsCount > 1) + { + parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular) + { + Attributes = args.Header.Attributes, + PayloadLength = fullLength + }; + } + + // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter + var header = objectsCount == 1 ? args.Header : new FrostFsObjectHeader( + args.Header.ContainerId, + FrostFsObjectType.Regular, + [], + new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId, parentHeader: parentHeader)); + + var prm = new PrmObjectPut(header); + using var stream = await PutStreamObjectAsync(prm, ctx).ConfigureAwait(false); + var uploaded = 0; + + // If an error occurs while uploading a part of the object, there is no need to re-upload the parts + // that were successfully uploaded before. It is sufficient to re-upload only the failed part + + var thisPartRest = (int)Math.Min((ulong)partSize, remain); + while (thisPartRest > 0) + { + var nextChunkSize = Math.Min(thisPartRest, chunkSize); + var size = await args.Payload.ReadAsync(buffer, 0, nextChunkSize).ConfigureAwait(false); + + if (size == 0) + break; + + await stream.WriteAsync(buffer.AsMemory(0, size)).ConfigureAwait(false); + uploaded += size; + thisPartRest -= size; + } + + var objectId = await stream.CompleteAsync().ConfigureAwait(false); + var part = new ObjectPartInfo(offset, uploaded, objectId); + offset += uploaded; + progressInfo.AddPart(part); + + remain -= bytesToWrite; + + if (isLastPart) + { + if (objectsCount == 1) + { + return progressInfo.GetPart(0).ObjectId; + } + + if (parentHeader == null) continue; + + // Once all parts of the object are uploaded, they must be linked into a single entity + var linkObject = new FrostFsLinkObject(header.ContainerId, progressInfo.SplitId, parentHeader, + [.. progressInfo.GetParts().Select(p => p.ObjectId)]); + + await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); + + // Retrieve the ID of the linked object + resultObjectId = FrostFsObjectId.FromHash(prm.Header!.GetHeader().Split!.Parent.Value.Span); + return resultObjectId; + } + } + + throw new FrostFsException("Unexpected error: cannot send object"); + } + finally + { + if (isRentBuffer && buffer != null) + { + ArrayPool.Shared.Return(buffer); + } + } + } + + internal async Task PutClientCutSingleObjectAsync(PrmObjectClientCutPut args, CallContext ctx) + { + if (args.Payload == null) + throw new ArgumentException(nameof(args.Payload)); + + if (args.Header == null) + throw new ArgumentException(nameof(args.Header)); + + var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); + int partSize = (int)networkSettings.MaxObjectSize; + + int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; + + ulong fullLength; + + // Information about the uploaded parts. + var progressInfo = args.Progress; // + var offset = 0L; + + if (progressInfo != null && progressInfo.GetParts().Count > 0) + { + if (!args.Payload.CanSeek) + { + throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported."); + } + + var lastPart = progressInfo.GetLast(); + args.Payload.Position = lastPart.Offset + lastPart.Length; + fullLength = (ulong)(args.Payload.Length - args.Payload.Position); + offset = args.Payload.Position; + } + else + { + if (args.Header.PayloadLength > 0) + fullLength = args.Header.PayloadLength; + else if (args.Payload.CanSeek) + fullLength = (ulong)args.Payload.Length; + else + throw new ArgumentException("The stream does not have a length and payload length is not defined"); + } + + //define collection capacity + var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0; + var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0; // if the object fits one part, it can be loaded as non-complex object if (objectsCount == 1) @@ -418,60 +590,54 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl return singlePartResult.ObjectId; } - List parts = new(objectsCount); + progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount); - SplitId splitId = new(); + var remain = fullLength; - // keep attributes for the large object - var attributes = args.Header!.Attributes.ToArray(); - header.Attributes = null; - - var remain = args.PutObjectContext.FullLength; - - FrostFsObjectHeader? parentHeader = null; - - var lastIndex = objectsCount - 1; - - bool rentBuffer = false; byte[]? buffer = null; + bool isRentBuffer = false; try { - for (int i = 0; i < objectsCount; i++) + if (args.CustomBuffer != null) { - if (args.CustomBuffer != null) + if (args.CustomBuffer.Length < partSize) { - if (args.CustomBuffer.Length < partSize) - { - throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required"); - } - - buffer = args.CustomBuffer; - } - else - { - buffer = ArrayPool.Shared.Rent(partSize); - rentBuffer = true; + throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required"); } + buffer = args.CustomBuffer; + } + else + { + buffer = ArrayPool.Shared.Rent(partSize); + isRentBuffer = true; + } + + FrostFsObjectHeader? parentHeader = null; + + for (int i = 0; i < objectsCount;) + { + i++; var bytesToWrite = Math.Min((ulong)partSize, remain); - var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false); + var size = await args.Payload.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false); - if (i == lastIndex) + if (i == objectsCount) { - parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes) + parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular) { - PayloadLength = args.PutObjectContext.FullLength + PayloadLength = args.PutObjectContext.FullLength, + Attributes = args.Header.Attributes }; } // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter var partHeader = new FrostFsObjectHeader( - header.ContainerId, + args.Header.ContainerId, FrostFsObjectType.Regular, [], - new FrostFsSplit(splitId, parts.LastOrDefault(), + new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId, parentHeader: parentHeader)) { PayloadLength = (ulong)size @@ -484,15 +650,18 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl var prm = new PrmSingleObjectPut(obj); - var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false); + var objectId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false); - parts.Add(objId); + var part = new ObjectPartInfo(offset, size, objectId); + progressInfo.AddPart(part); - if (i < lastIndex) + offset += size; + + if (i < objectsCount) continue; // Once all parts of the object are uploaded, they must be linked into a single entity - var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts); + var linkObject = new FrostFsLinkObject(args.Header.ContainerId, progressInfo.SplitId, parentHeader!, [.. progressInfo.GetParts().Select(p => p.ObjectId)]); _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); @@ -504,7 +673,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl } finally { - if (rentBuffer && buffer != null) + if (isRentBuffer && buffer != null) { ArrayPool.Shared.Return(buffer); }