[#58] Observable client cut #59
7 changed files with 328 additions and 56 deletions
|
@ -254,7 +254,8 @@ public class FrostFSClient : IFrostFSClient
|
|||
|
||||
public Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
|
||||
{
|
||||
return GetObjectService().PutClientCutObjectAsync(args, ctx);
|
||||
return GetObjectService().PutClientCutSingleObjectAsync(args, ctx);
|
||||
// return GetObjectService().PutClientCutObjectAsync(args, ctx);
|
||||
}
|
||||
|
||||
public Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx)
|
||||
|
|
|
@ -44,6 +44,11 @@ public static class ObjectHeaderMapper
|
|||
|
||||
public static FrostFsSplit ToModel(this Header.Types.Split split)
|
||||
{
|
||||
if (split is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(split));
|
||||
}
|
||||
|
||||
var children = split!.Children.Count != 0
|
||||
? new ReadOnlyCollection<FrostFsObjectId>([.. split.Children.Select(x => x.ToModel())])
|
||||
: null;
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
using System;
|
||||
|
||||
namespace FrostFS.SDK.Client;
|
||||
|
||||
public class PartUploadedEventArgs(ObjectPartInfo part) : EventArgs
|
||||
{
|
||||
public ObjectPartInfo Part { get; } = part;
|
||||
}
|
38
src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs
Normal file
38
src/FrostFS.SDK.Client/Models/Object/UploadInfo.cs
Normal file
|
@ -0,0 +1,38 @@
|
|||
using FrostFS.SDK;
|
||||
|
||||
namespace FrostFS.SDK.Client;
|
||||
|
||||
public readonly struct ObjectPartInfo(long offset, int length, FrostFsObjectId objectId) : System.IEquatable<ObjectPartInfo>
|
||||
{
|
||||
public long Offset { get; } = offset;
|
||||
public int Length { get; } = length;
|
||||
public FrostFsObjectId ObjectId { get; } = objectId;
|
||||
|
||||
public override bool Equals(object obj)
|
||||
{
|
||||
if (obj == null || obj is not ObjectPartInfo)
|
||||
return false;
|
||||
|
||||
return Equals((ObjectPartInfo)obj);
|
||||
}
|
||||
|
||||
public override int GetHashCode()
|
||||
{
|
||||
return ((int)(Offset >> 32)) ^ (int)Offset ^ Length ^ ObjectId.Value.GetHashCode();
|
||||
}
|
||||
|
||||
public static bool operator ==(ObjectPartInfo left, ObjectPartInfo right)
|
||||
{
|
||||
return left.Equals(right);
|
||||
}
|
||||
|
||||
public static bool operator !=(ObjectPartInfo left, ObjectPartInfo right)
|
||||
{
|
||||
return !(left == right);
|
||||
}
|
||||
|
||||
public bool Equals(ObjectPartInfo other)
|
||||
{
|
||||
return GetHashCode() == other.GetHashCode();
|
||||
}
|
||||
}
|
48
src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs
Normal file
48
src/FrostFS.SDK.Client/Models/Object/UploadProgressInfo.cs
Normal file
|
@ -0,0 +1,48 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Linq;
|
||||
|
||||
namespace FrostFS.SDK.Client;
|
||||
|
||||
public class UploadProgressInfo
|
||||
{
|
||||
private List<ObjectPartInfo> _parts;
|
||||
|
||||
public UploadProgressInfo(Guid splitId, int capacity = 8)
|
||||
{
|
||||
_parts = new List<ObjectPartInfo>(capacity);
|
||||
SplitId = new SplitId(splitId);
|
||||
}
|
||||
|
||||
public UploadProgressInfo(Guid splitId, Collection<ObjectPartInfo> parts)
|
||||
{
|
||||
_parts = [.. parts];
|
||||
SplitId = new SplitId(splitId);
|
||||
}
|
||||
|
||||
public event EventHandler<PartUploadedEventArgs>? Notify;
|
||||
|
||||
public SplitId SplitId { get; }
|
||||
|
||||
internal void AddPart(ObjectPartInfo part)
|
||||
{
|
||||
_parts.Add(part);
|
||||
Notify?.Invoke(this, new PartUploadedEventArgs(part));
|
||||
}
|
||||
|
||||
public ObjectPartInfo GetPart(int index)
|
||||
{
|
||||
return _parts[index];
|
||||
}
|
||||
|
||||
public ObjectPartInfo GetLast()
|
||||
{
|
||||
return _parts.LastOrDefault();
|
||||
}
|
||||
|
||||
public ReadOnlyCollection<ObjectPartInfo> GetParts()
|
||||
{
|
||||
return new ReadOnlyCollection<ObjectPartInfo>(_parts);
|
||||
}
|
||||
}
|
|
@ -8,7 +8,8 @@ public readonly struct PrmObjectClientCutPut(
|
|||
int bufferMaxSize = 0,
|
||||
FrostFsSessionToken? sessionToken = null,
|
||||
byte[]? customBuffer = null,
|
||||
string[]? xheaders = null) : PrmObjectPutBase, System.IEquatable<PrmObjectClientCutPut>
|
||||
string[]? xheaders = null,
|
||||
UploadProgressInfo? progress = null) : PrmObjectPutBase, System.IEquatable<PrmObjectClientCutPut>
|
||||
{
|
||||
/// <summary>
|
||||
/// Need to provide values like <c>ContainerId</c> and <c>ObjectType</c> to create and object.
|
||||
|
@ -41,6 +42,8 @@ public readonly struct PrmObjectClientCutPut(
|
|||
/// </summary>
|
||||
public string[] XHeaders { get; } = xheaders ?? [];
|
||||
|
||||
public UploadProgressInfo? Progress { get; } = progress;
|
||||
|
||||
internal PutObjectContext PutObjectContext { get; } = new();
|
||||
|
||||
public override readonly bool Equals(object obj)
|
||||
|
|
|
@ -387,28 +387,200 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
internal async Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
|
||||
{
|
||||
var stream = args.Payload!;
|
||||
var header = args.Header!;
|
||||
if (args.Payload == null)
|
||||
throw new ArgumentException(nameof(args.Payload));
|
||||
|
||||
if (header.PayloadLength > 0)
|
||||
args.PutObjectContext.FullLength = header.PayloadLength;
|
||||
else if (stream.CanSeek)
|
||||
args.PutObjectContext.FullLength = (ulong)stream.Length;
|
||||
else
|
||||
throw new ArgumentException("The stream does not have a length and payload length is not defined");
|
||||
|
||||
if (args.PutObjectContext.FullLength == 0)
|
||||
throw new ArgumentException("The stream has zero length");
|
||||
if (args.Header == null)
|
||||
throw new ArgumentException(nameof(args.Header));
|
||||
|
||||
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
|
||||
var partSize = (int)networkSettings.MaxObjectSize;
|
||||
int partSize = (int)networkSettings.MaxObjectSize;
|
||||
|
||||
var restBytes = args.PutObjectContext.FullLength;
|
||||
int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
|
||||
|
||||
var objectSize = (int)Math.Min((ulong)partSize, restBytes);
|
||||
ulong fullLength;
|
||||
|
||||
// define collection capacity
|
||||
var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0);
|
||||
// Information about the uploaded parts.
|
||||
var progressInfo = args.Progress; //
|
||||
var offset = 0L;
|
||||
|
||||
if (progressInfo != null && progressInfo.GetParts().Count > 0)
|
||||
{
|
||||
if (!args.Payload.CanSeek)
|
||||
{
|
||||
throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported.");
|
||||
}
|
||||
|
||||
var lastPart = progressInfo.GetLast();
|
||||
args.Payload.Position = lastPart.Offset + lastPart.Length;
|
||||
fullLength = (ulong)(args.Payload.Length - args.Payload.Position);
|
||||
offset = args.Payload.Position;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (args.Header.PayloadLength > 0)
|
||||
fullLength = args.Header.PayloadLength;
|
||||
else if (args.Payload.CanSeek)
|
||||
fullLength = (ulong)args.Payload.Length;
|
||||
else
|
||||
throw new ArgumentException("The stream does not have a length and payload length is not defined");
|
||||
}
|
||||
|
||||
//define collection capacity
|
||||
var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0;
|
||||
var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
|
||||
|
||||
progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
|
||||
|
||||
var remain = fullLength;
|
||||
|
||||
byte[]? buffer = null;
|
||||
bool isRentBuffer = false;
|
||||
|
||||
try
|
||||
{
|
||||
if (args.CustomBuffer != null)
|
||||
{
|
||||
if (args.CustomBuffer.Length < chunkSize)
|
||||
throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required");
|
||||
|
||||
buffer = args.CustomBuffer;
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
isRentBuffer = true;
|
||||
}
|
||||
|
||||
FrostFsObjectId? resultObjectId = null;
|
||||
FrostFsObjectHeader? parentHeader = null;
|
||||
|
||||
while (remain > 0)
|
||||
{
|
||||
var bytesToWrite = Math.Min((ulong)partSize, remain);
|
||||
var isLastPart = remain <= (ulong)partSize;
|
||||
|
||||
// When the last part of the object is uploaded, all metadata for the object must be added
|
||||
if (isLastPart && objectsCount > 1)
|
||||
{
|
||||
parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular)
|
||||
{
|
||||
Attributes = args.Header.Attributes,
|
||||
PayloadLength = fullLength
|
||||
};
|
||||
}
|
||||
|
||||
// Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
|
||||
var header = objectsCount == 1 ? args.Header : new FrostFsObjectHeader(
|
||||
args.Header.ContainerId,
|
||||
FrostFsObjectType.Regular,
|
||||
[],
|
||||
new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId, parentHeader: parentHeader));
|
||||
|
||||
var prm = new PrmObjectPut(header);
|
||||
using var stream = await PutStreamObjectAsync(prm, ctx).ConfigureAwait(false);
|
||||
var uploaded = 0;
|
||||
|
||||
// If an error occurs while uploading a part of the object, there is no need to re-upload the parts
|
||||
// that were successfully uploaded before. It is sufficient to re-upload only the failed part
|
||||
|
||||
var thisPartRest = (int)Math.Min((ulong)partSize, remain);
|
||||
while (thisPartRest > 0)
|
||||
{
|
||||
var nextChunkSize = Math.Min(thisPartRest, chunkSize);
|
||||
var size = await args.Payload.ReadAsync(buffer, 0, nextChunkSize).ConfigureAwait(false);
|
||||
|
||||
if (size == 0)
|
||||
break;
|
||||
|
||||
await stream.WriteAsync(buffer.AsMemory(0, size)).ConfigureAwait(false);
|
||||
uploaded += size;
|
||||
thisPartRest -= size;
|
||||
}
|
||||
|
||||
var objectId = await stream.CompleteAsync().ConfigureAwait(false);
|
||||
var part = new ObjectPartInfo(offset, uploaded, objectId);
|
||||
offset += uploaded;
|
||||
progressInfo.AddPart(part);
|
||||
|
||||
remain -= bytesToWrite;
|
||||
|
||||
if (isLastPart)
|
||||
{
|
||||
if (objectsCount == 1)
|
||||
{
|
||||
return progressInfo.GetPart(0).ObjectId;
|
||||
}
|
||||
|
||||
if (parentHeader == null) continue;
|
||||
|
||||
// Once all parts of the object are uploaded, they must be linked into a single entity
|
||||
var linkObject = new FrostFsLinkObject(header.ContainerId, progressInfo.SplitId, parentHeader,
|
||||
[.. progressInfo.GetParts().Select(p => p.ObjectId)]);
|
||||
|
||||
await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
|
||||
|
||||
// Retrieve the ID of the linked object
|
||||
resultObjectId = FrostFsObjectId.FromHash(prm.Header!.GetHeader().Split!.Parent.Value.Span);
|
||||
return resultObjectId;
|
||||
}
|
||||
}
|
||||
|
||||
throw new FrostFsException("Unexpected error: cannot send object");
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (isRentBuffer && buffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PutClientCutSingleObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
|
||||
{
|
||||
if (args.Payload == null)
|
||||
throw new ArgumentException(nameof(args.Payload));
|
||||
|
||||
if (args.Header == null)
|
||||
throw new ArgumentException(nameof(args.Header));
|
||||
|
||||
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
|
||||
int partSize = (int)networkSettings.MaxObjectSize;
|
||||
|
||||
int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
|
||||
|
||||
ulong fullLength;
|
||||
|
||||
// Information about the uploaded parts.
|
||||
var progressInfo = args.Progress; //
|
||||
var offset = 0L;
|
||||
|
||||
if (progressInfo != null && progressInfo.GetParts().Count > 0)
|
||||
{
|
||||
if (!args.Payload.CanSeek)
|
||||
{
|
||||
throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported.");
|
||||
}
|
||||
|
||||
var lastPart = progressInfo.GetLast();
|
||||
args.Payload.Position = lastPart.Offset + lastPart.Length;
|
||||
fullLength = (ulong)(args.Payload.Length - args.Payload.Position);
|
||||
offset = args.Payload.Position;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (args.Header.PayloadLength > 0)
|
||||
fullLength = args.Header.PayloadLength;
|
||||
else if (args.Payload.CanSeek)
|
||||
fullLength = (ulong)args.Payload.Length;
|
||||
else
|
||||
throw new ArgumentException("The stream does not have a length and payload length is not defined");
|
||||
}
|
||||
|
||||
//define collection capacity
|
||||
var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0;
|
||||
var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
|
||||
|
||||
// if the object fits one part, it can be loaded as non-complex object
|
||||
if (objectsCount == 1)
|
||||
|
@ -418,26 +590,14 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
return singlePartResult.ObjectId;
|
||||
}
|
||||
|
||||
List<FrostFsObjectId> parts = new(objectsCount);
|
||||
progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
|
||||
|
||||
SplitId splitId = new();
|
||||
var remain = fullLength;
|
||||
|
||||
// keep attributes for the large object
|
||||
var attributes = args.Header!.Attributes.ToArray();
|
||||
header.Attributes = null;
|
||||
|
||||
var remain = args.PutObjectContext.FullLength;
|
||||
|
||||
FrostFsObjectHeader? parentHeader = null;
|
||||
|
||||
var lastIndex = objectsCount - 1;
|
||||
|
||||
bool rentBuffer = false;
|
||||
byte[]? buffer = null;
|
||||
bool isRentBuffer = false;
|
||||
|
||||
try
|
||||
{
|
||||
for (int i = 0; i < objectsCount; i++)
|
||||
{
|
||||
if (args.CustomBuffer != null)
|
||||
{
|
||||
|
@ -451,27 +611,33 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
else
|
||||
{
|
||||
buffer = ArrayPool<byte>.Shared.Rent(partSize);
|
||||
rentBuffer = true;
|
||||
isRentBuffer = true;
|
||||
}
|
||||
|
||||
FrostFsObjectHeader? parentHeader = null;
|
||||
|
||||
for (int i = 0; i < objectsCount;)
|
||||
{
|
||||
i++;
|
||||
var bytesToWrite = Math.Min((ulong)partSize, remain);
|
||||
|
||||
var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
|
||||
var size = await args.Payload.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
|
||||
|
||||
if (i == lastIndex)
|
||||
if (i == objectsCount)
|
||||
{
|
||||
parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes)
|
||||
parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular)
|
||||
{
|
||||
PayloadLength = args.PutObjectContext.FullLength
|
||||
PayloadLength = args.PutObjectContext.FullLength,
|
||||
Attributes = args.Header.Attributes
|
||||
};
|
||||
}
|
||||
|
||||
// Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
|
||||
var partHeader = new FrostFsObjectHeader(
|
||||
header.ContainerId,
|
||||
args.Header.ContainerId,
|
||||
FrostFsObjectType.Regular,
|
||||
[],
|
||||
new FrostFsSplit(splitId, parts.LastOrDefault(),
|
||||
new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId,
|
||||
parentHeader: parentHeader))
|
||||
{
|
||||
PayloadLength = (ulong)size
|
||||
|
@ -484,15 +650,18 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
var prm = new PrmSingleObjectPut(obj);
|
||||
|
||||
var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
|
||||
var objectId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
|
||||
|
||||
parts.Add(objId);
|
||||
var part = new ObjectPartInfo(offset, size, objectId);
|
||||
progressInfo.AddPart(part);
|
||||
|
||||
if (i < lastIndex)
|
||||
offset += size;
|
||||
|
||||
if (i < objectsCount)
|
||||
continue;
|
||||
|
||||
// Once all parts of the object are uploaded, they must be linked into a single entity
|
||||
var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts);
|
||||
var linkObject = new FrostFsLinkObject(args.Header.ContainerId, progressInfo.SplitId, parentHeader!, [.. progressInfo.GetParts().Select(p => p.ObjectId)]);
|
||||
|
||||
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
|
||||
|
||||
|
@ -504,7 +673,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
}
|
||||
finally
|
||||
{
|
||||
if (rentBuffer && buffer != null)
|
||||
if (isRentBuffer && buffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue