[#35] Client: rollback to PutSingleObject for client cut upload
Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
parent
8835b23ed3
commit
6988fcedae
3 changed files with 187 additions and 88 deletions
|
@ -385,89 +385,132 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
return response.Body.ObjectId.ToModel();
|
||||
}
|
||||
|
||||
|
||||
internal async Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
|
||||
{
|
||||
var payloadStream = args.Payload!;
|
||||
var stream = args.Payload!;
|
||||
var header = args.Header!;
|
||||
|
||||
if (header.PayloadLength > 0)
|
||||
args.PutObjectContext.FullLength = header.PayloadLength;
|
||||
else if (payloadStream.CanSeek)
|
||||
args.PutObjectContext.FullLength = (ulong)payloadStream.Length;
|
||||
else if (stream.CanSeek)
|
||||
args.PutObjectContext.FullLength = (ulong)stream.Length;
|
||||
else
|
||||
throw new ArgumentException("The stream does not have a length and payload length is not defined");
|
||||
|
||||
if (args.PutObjectContext.MaxObjectSizeCache == 0)
|
||||
{
|
||||
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx)
|
||||
.ConfigureAwait(false);
|
||||
if (args.PutObjectContext.FullLength == 0)
|
||||
throw new ArgumentException("The stream has zero length");
|
||||
|
||||
args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
|
||||
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false);
|
||||
args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
|
||||
|
||||
var restBytes = args.PutObjectContext.FullLength;
|
||||
|
||||
var objectSize = (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes);
|
||||
|
||||
// define collection capacity
|
||||
var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0);
|
||||
|
||||
// if the object fits one part, it can be loaded as non-complex object
|
||||
if (objectsCount == 1)
|
||||
{
|
||||
var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
|
||||
return singlePartResult.ObjectId;
|
||||
}
|
||||
|
||||
var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition;
|
||||
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes) : args.PutObjectContext.MaxObjectSizeCache;
|
||||
List<FrostFsObjectId> parts = new(objectsCount);
|
||||
|
||||
//define collection capacity
|
||||
var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0;
|
||||
var objectsCount = args.PutObjectContext.FullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
|
||||
|
||||
List<FrostFsObjectId> sentObjectIds = new(objectsCount);
|
||||
|
||||
FrostFsSplit? split = null;
|
||||
SplitId splitId = new();
|
||||
|
||||
var partSize = args.PutObjectContext.MaxObjectSizeCache;
|
||||
|
||||
// keep attributes for the large object
|
||||
var attributes = args.Header!.Attributes;
|
||||
args.Header!.Attributes = null;
|
||||
var attributes = args.Header!.Attributes.ToArray();
|
||||
header.Attributes = null;
|
||||
|
||||
// send all parts except the last one as separate Objects
|
||||
while (restBytes > (ulong)args.PutObjectContext.MaxObjectSizeCache)
|
||||
var remain = args.PutObjectContext.FullLength;
|
||||
|
||||
FrostFsObjectHeader? parentHeader = null;
|
||||
|
||||
var lastIndex = objectsCount - 1;
|
||||
|
||||
bool rentBuffer = false;
|
||||
byte[]? buffer = null;
|
||||
|
||||
try
|
||||
{
|
||||
split = new FrostFsSplit(splitId, sentObjectIds.LastOrDefault());
|
||||
|
||||
args.Header!.Split = split;
|
||||
|
||||
var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
|
||||
|
||||
sentObjectIds.Add(result.ObjectId);
|
||||
|
||||
restBytes -= (ulong)result.ObjectSize;
|
||||
}
|
||||
|
||||
// send the last part and create linkObject
|
||||
if (sentObjectIds.Count > 0)
|
||||
{
|
||||
var largeObjectHeader = new FrostFsObjectHeader(
|
||||
header.ContainerId,
|
||||
FrostFsObjectType.Regular,
|
||||
attributes != null ? [.. attributes] : [])
|
||||
for (int i = 0; i < objectsCount; i++)
|
||||
{
|
||||
PayloadLength = args.PutObjectContext.FullLength,
|
||||
};
|
||||
if (args.CustomBuffer != null)
|
||||
{
|
||||
if (args.CustomBuffer.Length < partSize)
|
||||
{
|
||||
throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required");
|
||||
}
|
||||
|
||||
args.Header.Split!.ParentHeader = largeObjectHeader;
|
||||
buffer = args.CustomBuffer;
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer = ArrayPool<byte>.Shared.Rent(partSize);
|
||||
rentBuffer = true;
|
||||
}
|
||||
|
||||
var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
|
||||
var bytesToWrite = Math.Min((ulong)partSize, remain);
|
||||
|
||||
sentObjectIds.Add(result.ObjectId);
|
||||
var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false);
|
||||
|
||||
var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds);
|
||||
if (i == lastIndex)
|
||||
{
|
||||
parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes)
|
||||
{
|
||||
PayloadLength = args.PutObjectContext.FullLength
|
||||
};
|
||||
}
|
||||
|
||||
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
|
||||
// Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter
|
||||
var partHeader = new FrostFsObjectHeader(
|
||||
header.ContainerId,
|
||||
FrostFsObjectType.Regular,
|
||||
[],
|
||||
new FrostFsSplit(splitId, parts.LastOrDefault(),
|
||||
parentHeader: parentHeader))
|
||||
{
|
||||
PayloadLength = (ulong)size
|
||||
};
|
||||
|
||||
var parentHeader = args.Header.GetHeader();
|
||||
var obj = new FrostFsObject(partHeader)
|
||||
{
|
||||
SingleObjectPayload = buffer.Length == size ? buffer : buffer[..size]
|
||||
};
|
||||
|
||||
return parentHeader.Split!.Parent.ToModel();
|
||||
var prm = new PrmSingleObjectPut(obj);
|
||||
|
||||
var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false);
|
||||
|
||||
parts.Add(objId);
|
||||
|
||||
if (i < lastIndex)
|
||||
continue;
|
||||
|
||||
// Once all parts of the object are uploaded, they must be linked into a single entity
|
||||
var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts);
|
||||
|
||||
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
|
||||
|
||||
// Retrieve the ID of the linked object
|
||||
return partHeader.GetHeader().Split!.Parent.ToModel();
|
||||
}
|
||||
|
||||
throw new FrostFsException("Unexpected error");
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (rentBuffer && buffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
// We are here if the payload is placed to one Object. It means no cut action, just simple PUT.
|
||||
args.Header!.Attributes = attributes;
|
||||
|
||||
var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
|
||||
|
||||
return singlePartResult.ObjectId;
|
||||
}
|
||||
|
||||
struct PutObjectResult(FrostFsObjectId objectId, int objectSize)
|
||||
|
@ -481,7 +524,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
var payload = args.Payload!;
|
||||
|
||||
var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
|
||||
|
||||
var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition;
|
||||
|
||||
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
|
||||
|
@ -526,22 +568,23 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
break;
|
||||
|
||||
sentBytes += bytesCount;
|
||||
|
||||
|
||||
var chunkRequest = new PutRequest
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
||||
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory()[..bytesCount])
|
||||
}
|
||||
};
|
||||
|
||||
chunkRequest.AddMetaHeader(args.XHeaders);
|
||||
|
||||
chunkRequest.Sign(ClientContext.Key.ECDsaKey);
|
||||
|
||||
await stream.Write(chunkRequest).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
args.PutObjectContext.CurrentStreamPosition += (ulong)sentBytes;
|
||||
|
||||
var response = await stream.Close().ConfigureAwait(false);
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
|
@ -580,10 +623,10 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
var initRequest = new PutRequest
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
{
|
||||
Init = new PutRequest.Types.Body.Types.Init
|
||||
{
|
||||
Header = grpcHeader
|
||||
Header = grpcHeader,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue