From 0a5c9f47ee3a60d0eef769ac7fba123d82c2898e Mon Sep 17 00:00:00 2001
From: Pavel Gross
Date: Fri, 11 Apr 2025 15:18:18 +0300
Subject: [PATCH] [#58] Observable client cut
Signed-off-by: Pavel Gross
---
src/FrostFS.SDK.Client/FrostFSClient.cs | 3 +-
.../Mappers/Object/ObjectHeaderMapper.cs | 5 +
.../Models/Object/PartUploadedEventArgs.cs | 8 +
.../Models/Object/UploadInfo.cs | 38 +++
.../Models/Object/UploadProgressInfo.cs | 48 +++
.../Parameters/PrmObjectClientCutPut.cs | 5 +-
.../Services/ObjectServiceProvider.cs | 277 ++++++++++++++----
7 files changed, 328 insertions(+), 56 deletions(-)
create mode 100644 src/FrostFS.SDK.Client/Models/Object/PartUploadedEventArgs.cs
create mode 100644 src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs
create mode 100644 src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs
diff --git a/src/FrostFS.SDK.Client/FrostFSClient.cs b/src/FrostFS.SDK.Client/FrostFSClient.cs
index 860d346..bce9104 100644
--- a/src/FrostFS.SDK.Client/FrostFSClient.cs
+++ b/src/FrostFS.SDK.Client/FrostFSClient.cs
@@ -254,7 +254,8 @@ public class FrostFSClient : IFrostFSClient
public Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
{
- return GetObjectService().PutClientCutObjectAsync(args, ctx);
+ return GetObjectService().PutClientCutSingleObjectAsync(args, ctx);
+ // return GetObjectService().PutClientCutObjectAsync(args, ctx);
}
public Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx)
diff --git a/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs b/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs
index 115f9e2..fde13ff 100644
--- a/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs
+++ b/src/FrostFS.SDK.Client/Mappers/Object/ObjectHeaderMapper.cs
@@ -44,6 +44,11 @@ public static class ObjectHeaderMapper
public static FrostFsSplit ToModel(this Header.Types.Split split)
{
+ if (split is null)
+ {
+ throw new ArgumentNullException(nameof(split));
+ }
+
var children = split!.Children.Count != 0
? new ReadOnlyCollection([.. split.Children.Select(x => x.ToModel())])
: null;
diff --git a/src/FrostFS.SDK.Client/Models/Object/PartUploadedEventArgs.cs b/src/FrostFS.SDK.Client/Models/Object/PartUploadedEventArgs.cs
new file mode 100644
index 0000000..16ad326
--- /dev/null
+++ b/src/FrostFS.SDK.Client/Models/Object/PartUploadedEventArgs.cs
@@ -0,0 +1,8 @@
+using System;
+
+namespace FrostFS.SDK.Client;
+
+public class PartUploadedEventArgs(ObjectPartInfo part) : EventArgs
+{
+ public ObjectPartInfo Part { get; } = part;
+}
diff --git a/src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs b/src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs
new file mode 100644
index 0000000..748fdfe
--- /dev/null
+++ b/src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs
@@ -0,0 +1,38 @@
+using FrostFS.SDK;
+
+namespace FrostFS.SDK.Client;
+
+public readonly struct ObjectPartInfo(long offset, int length, FrostFsObjectId objectId) : System.IEquatable
+{
+ public long Offset { get; } = offset;
+ public int Length { get; } = length;
+ public FrostFsObjectId ObjectId { get; } = objectId;
+
+ public override bool Equals(object obj)
+ {
+ if (obj == null || obj is not ObjectPartInfo)
+ return false;
+
+ return Equals((ObjectPartInfo)obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return ((int)(Offset >> 32)) ^ (int)Offset ^ Length ^ ObjectId.Value.GetHashCode();
+ }
+
+ public static bool operator ==(ObjectPartInfo left, ObjectPartInfo right)
+ {
+ return left.Equals(right);
+ }
+
+ public static bool operator !=(ObjectPartInfo left, ObjectPartInfo right)
+ {
+ return !(left == right);
+ }
+
+ public bool Equals(ObjectPartInfo other)
+ {
+ return GetHashCode() == other.GetHashCode();
+ }
+}
diff --git a/src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs b/src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs
new file mode 100644
index 0000000..a6fa905
--- /dev/null
+++ b/src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs
@@ -0,0 +1,48 @@
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+
+namespace FrostFS.SDK.Client;
+
+public class UploadProgressInfo
+{
+ private List _parts;
+
+ public UploadProgressInfo(Guid splitId, int capacity = 8)
+ {
+ _parts = new List(capacity);
+ SplitId = new SplitId(splitId);
+ }
+
+ public UploadProgressInfo(Guid splitId, Collection parts)
+ {
+ _parts = [.. parts];
+ SplitId = new SplitId(splitId);
+ }
+
+ public event EventHandler? Notify;
+
+ public SplitId SplitId { get; }
+
+ internal void AddPart(ObjectPartInfo part)
+ {
+ _parts.Add(part);
+ Notify?.Invoke(this, new PartUploadedEventArgs(part));
+ }
+
+ public ObjectPartInfo GetPart(int index)
+ {
+ return _parts[index];
+ }
+
+ public ObjectPartInfo GetLast()
+ {
+ return _parts.LastOrDefault();
+ }
+
+ public ReadOnlyCollection GetParts()
+ {
+ return new ReadOnlyCollection(_parts);
+ }
+}
\ No newline at end of file
diff --git a/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs b/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs
index 706e367..46f25e0 100644
--- a/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs
+++ b/src/FrostFS.SDK.Client/Parameters/PrmObjectClientCutPut.cs
@@ -8,7 +8,8 @@ public readonly struct PrmObjectClientCutPut(
int bufferMaxSize = 0,
FrostFsSessionToken? sessionToken = null,
byte[]? customBuffer = null,
- string[]? xheaders = null) : PrmObjectPutBase, System.IEquatable
+ string[]? xheaders = null,
+ UploadProgressInfo? progress = null) : PrmObjectPutBase, System.IEquatable
{
///
/// Need to provide values like ContainerId and ObjectType to create and object.
@@ -41,6 +42,8 @@ public readonly struct PrmObjectClientCutPut(
///
public string[] XHeaders { get; } = xheaders ?? [];
+ public UploadProgressInfo? Progress { get; } = progress;
+
internal PutObjectContext PutObjectContext { get; } = new();
public override readonly bool Equals(object obj)
diff --git a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
index ea3b727..ed9e9bc 100644
--- a/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
+++ b/src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
@@ -385,30 +385,202 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return response.Body.ObjectId.ToModel();
}
- internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
+ internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
{
- var stream = args.Payload!;
- var header = args.Header!;
+ if (args.Payload == null)
+ throw new ArgumentException(nameof(args.Payload));
- if (header.PayloadLength > 0)
- args.PutObjectContext.FullLength = header.PayloadLength;
- else if (stream.CanSeek)
- args.PutObjectContext.FullLength = (ulong)stream.Length;
- else
- throw new ArgumentException("The stream does not have a length and payload length is not defined");
-
- if (args.PutObjectContext.FullLength == 0)
- throw new ArgumentException("The stream has zero length");
+ if (args.Header == null)
+ throw new ArgumentException(nameof(args.Header));
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
- var partSize = (int)networkSettings.MaxObjectSize;
+ int partSize = (int)networkSettings.MaxObjectSize;
- var restBytes = args.PutObjectContext.FullLength;
+ int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
- var objectSize = (int)Math.Min((ulong)partSize, restBytes);
+ ulong fullLength;
- // define collection capacity
- var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0);
+ // Information about the uploaded parts.
+ var progressInfo = args.Progress; //
+ var offset = 0L;
+
+ if (progressInfo != null && progressInfo.GetParts().Count > 0)
+ {
+ if (!args.Payload.CanSeek)
+ {
+ throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported.");
+ }
+
+ var lastPart = progressInfo.GetLast();
+ args.Payload.Position = lastPart.Offset + lastPart.Length;
+ fullLength = (ulong)(args.Payload.Length - args.Payload.Position);
+ offset = args.Payload.Position;
+ }
+ else
+ {
+ if (args.Header.PayloadLength > 0)
+ fullLength = args.Header.PayloadLength;
+ else if (args.Payload.CanSeek)
+ fullLength = (ulong)args.Payload.Length;
+ else
+ throw new ArgumentException("The stream does not have a length and payload length is not defined");
+ }
+
+ //define collection capacity
+ var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0;
+ var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
+
+ progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
+
+ var remain = fullLength;
+
+ byte[]? buffer = null;
+ bool isRentBuffer = false;
+
+ try
+ {
+ if (args.CustomBuffer != null)
+ {
+ if (args.CustomBuffer.Length < chunkSize)
+ throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required");
+
+ buffer = args.CustomBuffer;
+ }
+ else
+ {
+ buffer = ArrayPool.Shared.Rent(chunkSize);
+ isRentBuffer = true;
+ }
+
+ FrostFsObjectId? resultObjectId = null;
+ FrostFsObjectHeader? parentHeader = null;
+
+ while (remain > 0)
+ {
+ var bytesToWrite = Math.Min((ulong)partSize, remain);
+ var isLastPart = remain <= (ulong)partSize;
+
+ // When the last part of the object is uploaded, all metadata for the object must be added
+ if (isLastPart && objectsCount > 1)
+ {
+ parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular)
+ {
+ Attributes = args.Header.Attributes,
+ PayloadLength = fullLength
+ };
+ }
+
+ // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
+ var header = objectsCount == 1 ? args.Header : new FrostFsObjectHeader(
+ args.Header.ContainerId,
+ FrostFsObjectType.Regular,
+ [],
+ new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId, parentHeader: parentHeader));
+
+ var prm = new PrmObjectPut(header);
+ using var stream = await PutStreamObjectAsync(prm, ctx).ConfigureAwait(false);
+ var uploaded = 0;
+
+ // If an error occurs while uploading a part of the object, there is no need to re-upload the parts
+ // that were successfully uploaded before. It is sufficient to re-upload only the failed part
+
+ var thisPartRest = (int)Math.Min((ulong)partSize, remain);
+ while (thisPartRest > 0)
+ {
+ var nextChunkSize = Math.Min(thisPartRest, chunkSize);
+ var size = await args.Payload.ReadAsync(buffer, 0, nextChunkSize).ConfigureAwait(false);
+
+ if (size == 0)
+ break;
+
+ await stream.WriteAsync(buffer.AsMemory(0, size)).ConfigureAwait(false);
+ uploaded += size;
+ thisPartRest -= size;
+ }
+
+ var objectId = await stream.CompleteAsync().ConfigureAwait(false);
+ var part = new ObjectPartInfo(offset, uploaded, objectId);
+ offset += uploaded;
+ progressInfo.AddPart(part);
+
+ remain -= bytesToWrite;
+
+ if (isLastPart)
+ {
+ if (objectsCount == 1)
+ {
+ return progressInfo.GetPart(0).ObjectId;
+ }
+
+ if (parentHeader == null) continue;
+
+ // Once all parts of the object are uploaded, they must be linked into a single entity
+ var linkObject = new FrostFsLinkObject(header.ContainerId, progressInfo.SplitId, parentHeader,
+ [.. progressInfo.GetParts().Select(p => p.ObjectId)]);
+
+ await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
+
+ // Retrieve the ID of the linked object
+ resultObjectId = FrostFsObjectId.FromHash(prm.Header!.GetHeader().Split!.Parent.Value.Span);
+ return resultObjectId;
+ }
+ }
+
+ throw new FrostFsException("Unexpected error: cannot send object");
+ }
+ finally
+ {
+ if (isRentBuffer && buffer != null)
+ {
+ ArrayPool.Shared.Return(buffer);
+ }
+ }
+ }
+
+ internal async Task PutClientCutSingleObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
+ {
+ if (args.Payload == null)
+ throw new ArgumentException(nameof(args.Payload));
+
+ if (args.Header == null)
+ throw new ArgumentException(nameof(args.Header));
+
+ var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
+ int partSize = (int)networkSettings.MaxObjectSize;
+
+ int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
+
+ ulong fullLength;
+
+ // Information about the uploaded parts.
+ var progressInfo = args.Progress; //
+ var offset = 0L;
+
+ if (progressInfo != null && progressInfo.GetParts().Count > 0)
+ {
+ if (!args.Payload.CanSeek)
+ {
+ throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported.");
+ }
+
+ var lastPart = progressInfo.GetLast();
+ args.Payload.Position = lastPart.Offset + lastPart.Length;
+ fullLength = (ulong)(args.Payload.Length - args.Payload.Position);
+ offset = args.Payload.Position;
+ }
+ else
+ {
+ if (args.Header.PayloadLength > 0)
+ fullLength = args.Header.PayloadLength;
+ else if (args.Payload.CanSeek)
+ fullLength = (ulong)args.Payload.Length;
+ else
+ throw new ArgumentException("The stream does not have a length and payload length is not defined");
+ }
+
+ //define collection capacity
+ var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0;
+ var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
// if the object fits one part, it can be loaded as non-complex object
if (objectsCount == 1)
@@ -418,60 +590,54 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return singlePartResult.ObjectId;
}
- List parts = new(objectsCount);
+ progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
- SplitId splitId = new();
+ var remain = fullLength;
- // keep attributes for the large object
- var attributes = args.Header!.Attributes.ToArray();
- header.Attributes = null;
-
- var remain = args.PutObjectContext.FullLength;
-
- FrostFsObjectHeader? parentHeader = null;
-
- var lastIndex = objectsCount - 1;
-
- bool rentBuffer = false;
byte[]? buffer = null;
+ bool isRentBuffer = false;
try
{
- for (int i = 0; i < objectsCount; i++)
+ if (args.CustomBuffer != null)
{
- if (args.CustomBuffer != null)
+ if (args.CustomBuffer.Length < partSize)
{
- if (args.CustomBuffer.Length < partSize)
- {
- throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required");
- }
-
- buffer = args.CustomBuffer;
- }
- else
- {
- buffer = ArrayPool.Shared.Rent(partSize);
- rentBuffer = true;
+ throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required");
}
+ buffer = args.CustomBuffer;
+ }
+ else
+ {
+ buffer = ArrayPool.Shared.Rent(partSize);
+ isRentBuffer = true;
+ }
+
+ FrostFsObjectHeader? parentHeader = null;
+
+ for (int i = 0; i < objectsCount;)
+ {
+ i++;
var bytesToWrite = Math.Min((ulong)partSize, remain);
- var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
+ var size = await args.Payload.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
- if (i == lastIndex)
+ if (i == objectsCount)
{
- parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes)
+ parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular)
{
- PayloadLength = args.PutObjectContext.FullLength
+ PayloadLength = args.PutObjectContext.FullLength,
+ Attributes = args.Header.Attributes
};
}
// Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
var partHeader = new FrostFsObjectHeader(
- header.ContainerId,
+ args.Header.ContainerId,
FrostFsObjectType.Regular,
[],
- new FrostFsSplit(splitId, parts.LastOrDefault(),
+ new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId,
parentHeader: parentHeader))
{
PayloadLength = (ulong)size
@@ -484,15 +650,18 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
var prm = new PrmSingleObjectPut(obj);
- var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
+ var objectId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
- parts.Add(objId);
+ var part = new ObjectPartInfo(offset, size, objectId);
+ progressInfo.AddPart(part);
- if (i < lastIndex)
+ offset += size;
+
+ if (i < objectsCount)
continue;
// Once all parts of the object are uploaded, they must be linked into a single entity
- var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts);
+ var linkObject = new FrostFsLinkObject(args.Header.ContainerId, progressInfo.SplitId, parentHeader!, [.. progressInfo.GetParts().Select(p => p.ObjectId)]);
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
@@ -504,7 +673,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
finally
{
- if (rentBuffer && buffer != null)
+ if (isRentBuffer && buffer != null)
{
ArrayPool.Shared.Return(buffer);
}
--
2.45.3