[#58] Observable client cut #59

Merged
PavelGrossSpb merged 1 commit from PavelGrossSpb/frostfs-sdk-csharp:ClientCutObservable into master 2025-04-11 13:12:31 +00:00
7 changed files with 328 additions and 56 deletions

View file

@ -254,7 +254,8 @@ public class FrostFSClient : IFrostFSClient
public Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
{
return GetObjectService().PutClientCutObjectAsync(args, ctx);
return GetObjectService().PutClientCutSingleObjectAsync(args, ctx);
// return GetObjectService().PutClientCutObjectAsync(args, ctx);
}
public Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx)

View file

@ -44,6 +44,11 @@ public static class ObjectHeaderMapper
public static FrostFsSplit ToModel(this Header.Types.Split split)
{
if (split is null)
{
throw new ArgumentNullException(nameof(split));
}
var children = split!.Children.Count != 0
? new ReadOnlyCollection<FrostFsObjectId>([.. split.Children.Select(x => x.ToModel())])
: null;

View file

@ -0,0 +1,8 @@
using System;
namespace FrostFS.SDK.Client;
public class PartUploadedEventArgs(ObjectPartInfo part) : EventArgs
{
public ObjectPartInfo Part { get; } = part;
}

View file

@ -0,0 +1,38 @@
using FrostFS.SDK;
namespace FrostFS.SDK.Client;
public readonly struct ObjectPartInfo(long offset, int length, FrostFsObjectId objectId) : System.IEquatable<ObjectPartInfo>
{
public long Offset { get; } = offset;
public int Length { get; } = length;
public FrostFsObjectId ObjectId { get; } = objectId;
public override bool Equals(object obj)
{
if (obj == null || obj is not ObjectPartInfo)
return false;
return Equals((ObjectPartInfo)obj);
}
public override int GetHashCode()
{
return ((int)(Offset >> 32)) ^ (int)Offset ^ Length ^ ObjectId.Value.GetHashCode();
}
public static bool operator ==(ObjectPartInfo left, ObjectPartInfo right)
{
return left.Equals(right);
}
public static bool operator !=(ObjectPartInfo left, ObjectPartInfo right)
{
return !(left == right);
}
public bool Equals(ObjectPartInfo other)
{
return GetHashCode() == other.GetHashCode();
}
}

View file

@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
namespace FrostFS.SDK.Client;
public class UploadProgressInfo
{
private List<ObjectPartInfo> _parts;
public UploadProgressInfo(Guid splitId, int capacity = 8)
{
_parts = new List<ObjectPartInfo>(capacity);
SplitId = new SplitId(splitId);
}
public UploadProgressInfo(Guid splitId, Collection<ObjectPartInfo> parts)
{
_parts = [.. parts];
SplitId = new SplitId(splitId);
}
public event EventHandler<PartUploadedEventArgs>? Notify;
public SplitId SplitId { get; }
internal void AddPart(ObjectPartInfo part)
{
_parts.Add(part);
Notify?.Invoke(this, new PartUploadedEventArgs(part));
}
public ObjectPartInfo GetPart(int index)
{
return _parts[index];
}
public ObjectPartInfo GetLast()
{
return _parts.LastOrDefault();
}
public ReadOnlyCollection<ObjectPartInfo> GetParts()
{
return new ReadOnlyCollection<ObjectPartInfo>(_parts);
}
}

View file

@ -8,7 +8,8 @@ public readonly struct PrmObjectClientCutPut(
int bufferMaxSize = 0,
FrostFsSessionToken? sessionToken = null,
byte[]? customBuffer = null,
string[]? xheaders = null) : PrmObjectPutBase, System.IEquatable<PrmObjectClientCutPut>
string[]? xheaders = null,
UploadProgressInfo? progress = null) : PrmObjectPutBase, System.IEquatable<PrmObjectClientCutPut>
{
/// <summary>
/// Need to provide values like <c>ContainerId</c> and <c>ObjectType</c> to create and object.
@ -41,6 +42,8 @@ public readonly struct PrmObjectClientCutPut(
/// </summary>
public string[] XHeaders { get; } = xheaders ?? [];
public UploadProgressInfo? Progress { get; } = progress;
internal PutObjectContext PutObjectContext { get; } = new();
public override readonly bool Equals(object obj)

View file

@ -387,28 +387,200 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
internal async Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
{
var stream = args.Payload!;
var header = args.Header!;
if (args.Payload == null)
throw new ArgumentException(nameof(args.Payload));
if (header.PayloadLength > 0)
args.PutObjectContext.FullLength = header.PayloadLength;
else if (stream.CanSeek)
args.PutObjectContext.FullLength = (ulong)stream.Length;
else
throw new ArgumentException("The stream does not have a length and payload length is not defined");
if (args.PutObjectContext.FullLength == 0)
throw new ArgumentException("The stream has zero length");
if (args.Header == null)
throw new ArgumentException(nameof(args.Header));
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
var partSize = (int)networkSettings.MaxObjectSize;
int partSize = (int)networkSettings.MaxObjectSize;
var restBytes = args.PutObjectContext.FullLength;
int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
var objectSize = (int)Math.Min((ulong)partSize, restBytes);
ulong fullLength;
// define collection capacity
var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0);
// Information about the uploaded parts.
var progressInfo = args.Progress; //
var offset = 0L;
if (progressInfo != null && progressInfo.GetParts().Count > 0)
{
if (!args.Payload.CanSeek)
{
throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported.");
}
var lastPart = progressInfo.GetLast();
args.Payload.Position = lastPart.Offset + lastPart.Length;
fullLength = (ulong)(args.Payload.Length - args.Payload.Position);
offset = args.Payload.Position;
}
else
{
if (args.Header.PayloadLength > 0)
fullLength = args.Header.PayloadLength;
else if (args.Payload.CanSeek)
fullLength = (ulong)args.Payload.Length;
else
throw new ArgumentException("The stream does not have a length and payload length is not defined");
}
//define collection capacity
var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0;
var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
var remain = fullLength;
byte[]? buffer = null;
bool isRentBuffer = false;
try
{
if (args.CustomBuffer != null)
{
if (args.CustomBuffer.Length < chunkSize)
throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required");
buffer = args.CustomBuffer;
}
else
{
buffer = ArrayPool<byte>.Shared.Rent(chunkSize);
isRentBuffer = true;
}
FrostFsObjectId? resultObjectId = null;
FrostFsObjectHeader? parentHeader = null;
while (remain > 0)
{
var bytesToWrite = Math.Min((ulong)partSize, remain);
var isLastPart = remain <= (ulong)partSize;
// When the last part of the object is uploaded, all metadata for the object must be added
if (isLastPart && objectsCount > 1)
{
parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular)
{
Attributes = args.Header.Attributes,
PayloadLength = fullLength
};
}
// Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
var header = objectsCount == 1 ? args.Header : new FrostFsObjectHeader(
args.Header.ContainerId,
FrostFsObjectType.Regular,
[],
new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId, parentHeader: parentHeader));
var prm = new PrmObjectPut(header);
using var stream = await PutStreamObjectAsync(prm, ctx).ConfigureAwait(false);
var uploaded = 0;
// If an error occurs while uploading a part of the object, there is no need to re-upload the parts
// that were successfully uploaded before. It is sufficient to re-upload only the failed part
var thisPartRest = (int)Math.Min((ulong)partSize, remain);
while (thisPartRest > 0)
{
var nextChunkSize = Math.Min(thisPartRest, chunkSize);
var size = await args.Payload.ReadAsync(buffer, 0, nextChunkSize).ConfigureAwait(false);
if (size == 0)
break;
await stream.WriteAsync(buffer.AsMemory(0, size)).ConfigureAwait(false);
uploaded += size;
thisPartRest -= size;
}
var objectId = await stream.CompleteAsync().ConfigureAwait(false);
var part = new ObjectPartInfo(offset, uploaded, objectId);
offset += uploaded;
progressInfo.AddPart(part);
remain -= bytesToWrite;
if (isLastPart)
{
if (objectsCount == 1)
{
return progressInfo.GetPart(0).ObjectId;
}
if (parentHeader == null) continue;
// Once all parts of the object are uploaded, they must be linked into a single entity
var linkObject = new FrostFsLinkObject(header.ContainerId, progressInfo.SplitId, parentHeader,
[.. progressInfo.GetParts().Select(p => p.ObjectId)]);
await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
// Retrieve the ID of the linked object
resultObjectId = FrostFsObjectId.FromHash(prm.Header!.GetHeader().Split!.Parent.Value.Span);
return resultObjectId;
}
}
throw new FrostFsException("Unexpected error: cannot send object");
}
finally
{
if (isRentBuffer && buffer != null)
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
internal async Task<FrostFsObjectId> PutClientCutSingleObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
{
if (args.Payload == null)
throw new ArgumentException(nameof(args.Payload));
if (args.Header == null)
throw new ArgumentException(nameof(args.Header));
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
int partSize = (int)networkSettings.MaxObjectSize;
int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
ulong fullLength;
// Information about the uploaded parts.
var progressInfo = args.Progress; //
var offset = 0L;
if (progressInfo != null && progressInfo.GetParts().Count > 0)
{
if (!args.Payload.CanSeek)
{
throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported.");
}
var lastPart = progressInfo.GetLast();
args.Payload.Position = lastPart.Offset + lastPart.Length;
fullLength = (ulong)(args.Payload.Length - args.Payload.Position);
offset = args.Payload.Position;
}
else
{
if (args.Header.PayloadLength > 0)
fullLength = args.Header.PayloadLength;
else if (args.Payload.CanSeek)
fullLength = (ulong)args.Payload.Length;
else
throw new ArgumentException("The stream does not have a length and payload length is not defined");
}
//define collection capacity
var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0;
var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
// if the object fits one part, it can be loaded as non-complex object
if (objectsCount == 1)
@ -418,26 +590,14 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return singlePartResult.ObjectId;
}
List<FrostFsObjectId> parts = new(objectsCount);
progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
SplitId splitId = new();
var remain = fullLength;
// keep attributes for the large object
var attributes = args.Header!.Attributes.ToArray();
header.Attributes = null;
var remain = args.PutObjectContext.FullLength;
FrostFsObjectHeader? parentHeader = null;
var lastIndex = objectsCount - 1;
bool rentBuffer = false;
byte[]? buffer = null;
bool isRentBuffer = false;
try
{
for (int i = 0; i < objectsCount; i++)
{
if (args.CustomBuffer != null)
{
@ -451,27 +611,33 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
else
{
buffer = ArrayPool<byte>.Shared.Rent(partSize);
rentBuffer = true;
isRentBuffer = true;
}
FrostFsObjectHeader? parentHeader = null;
for (int i = 0; i < objectsCount;)
{
i++;
var bytesToWrite = Math.Min((ulong)partSize, remain);
var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
var size = await args.Payload.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
if (i == lastIndex)
if (i == objectsCount)
{
parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes)
parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular)
{
PayloadLength = args.PutObjectContext.FullLength
PayloadLength = args.PutObjectContext.FullLength,
Attributes = args.Header.Attributes
};
}
// Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
var partHeader = new FrostFsObjectHeader(
header.ContainerId,
args.Header.ContainerId,
FrostFsObjectType.Regular,
[],
new FrostFsSplit(splitId, parts.LastOrDefault(),
new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId,
parentHeader: parentHeader))
{
PayloadLength = (ulong)size
@ -484,15 +650,18 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
var prm = new PrmSingleObjectPut(obj);
var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
var objectId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
parts.Add(objId);
var part = new ObjectPartInfo(offset, size, objectId);
progressInfo.AddPart(part);
if (i < lastIndex)
offset += size;
if (i < objectsCount)
continue;
// Once all parts of the object are uploaded, they must be linked into a single entity
var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts);
var linkObject = new FrostFsLinkObject(args.Header.ContainerId, progressInfo.SplitId, parentHeader!, [.. progressInfo.GetParts().Select(p => p.ObjectId)]);
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
@ -504,7 +673,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
finally
{
if (rentBuffer && buffer != null)
if (isRentBuffer && buffer != null)
{
ArrayPool<byte>.Shared.Return(buffer);
}