[#7] Client cut internal

This commit is contained in:
Pavel Gross 2024-06-14 11:58:29 +03:00 committed by PavelGrossSpb
parent 545e647d7b
commit b69d22966f
15 changed files with 216 additions and 36 deletions

View file

@ -13,6 +13,8 @@ using FrostFS.SDK.Cryptography;
using FrostFS.Session;
using FrostFS.SDK.ModelsV2;
using FrostFS.SDK.ClientV2.Extensions;
using System.Threading;
namespace FrostFS.SDK.ClientV2;
@ -32,6 +34,7 @@ public partial class Client
}
};
request.AddMetaHeader();
request.Sign(_key);
var response = await _objectServiceClient!.HeadAsync(request);
@ -70,18 +73,83 @@ public partial class Client
return obj.ToModel();
}
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, Stream payload)
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, Stream payload, CancellationToken cancellationToken = default)
{
return await PutObject(header, payload);
return await PutObject(header, payload, cancellationToken);
}
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, byte[] payload)
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, byte[] payload, CancellationToken cancellationToken = default)
{
using var stream = new MemoryStream(payload);
return await PutObject(header, stream);
return await PutObject(header, stream, cancellationToken);
}
private async Task<ObjectId> PutObject(ObjectHeader header, Stream payload)
private Task<ObjectId> PutObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken)
{
if (header.ClientCut)
return PutClientCutObject(header, payload, cancellationToken);
else
return PutStreamObject(header, payload, cancellationToken);
}
private async Task<ObjectId> PutClientCutObject(ObjectHeader header, Stream payloadStream, CancellationToken cancellationToken)
{
ObjectId? objectId = null;
List<ObjectId> sentObjectIds = [];
ModelsV2.Object? currentObject;
var partSize = (int)NetworkSettings["MaxObjectSize"];
var buffer = new byte[partSize];
var largeObject = new LargeObject(header.ContainerId);
var split = new Split();
var fullLength = (ulong)payloadStream.Length;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var bytesCount = await payloadStream.ReadAsync(buffer, 0, partSize);
split.Previous = sentObjectIds.LastOrDefault();
largeObject.AppendBlock(buffer, bytesCount);
currentObject = new ModelsV2.Object(header.ContainerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer)
.AddAttributes(header.Attributes)
.SetSplit(split);
if (largeObject.PayloadLength == fullLength)
break;
objectId = await PutSingleObjectAsync(currentObject, cancellationToken);
sentObjectIds.Add(objectId!);
}
if (sentObjectIds.Any())
{
largeObject.CalculateHash();
currentObject.SetParent(largeObject);
objectId = await PutSingleObjectAsync(currentObject, cancellationToken);
sentObjectIds.Add(objectId);
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
.AddChildren(sentObjectIds);
_ = await PutSingleObjectAsync(linkObject, cancellationToken);
return currentObject.GetParentId();
}
return await PutSingleObjectAsync(currentObject, cancellationToken);
}
private async Task<ObjectId> PutStreamObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
var hdr = header.ToGrpcMessage();
@ -120,6 +188,8 @@ public partial class Client
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize);
if (bufferLength == 0)
@ -141,7 +211,7 @@ public partial class Client
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
public async Task<ObjectId> PutSingleObjectAsync(ModelsV2.Object @object)
public async Task<ObjectId> PutSingleObjectAsync(ModelsV2.Object @object, CancellationToken cancellationToken = default)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
@ -163,7 +233,7 @@ public partial class Client
request.Sign(_key);
var response = await _objectServiceClient!.PutSingleAsync(request);
var response = await _objectServiceClient!.PutSingleAsync(request, null, null, cancellationToken);
Verifier.CheckResponse(response);
return ObjectId.FromHash(obj.ObjectId.Value.ToByteArray());
@ -207,6 +277,8 @@ public partial class Client
split.Parent = grpcHeader.Split.Parent.ToModel();
}
grpcHeader.Split.Previous = split.Previous?.ToGrpcMessage();
}
var obj = new Object.Object
@ -286,10 +358,12 @@ public partial class Client
request.Body.Filters.Add(filter.ToGrpcMessage());
}
request.AddMetaHeader();
request.Sign(_key);
var objectsIds = SearchObjects(request);
await foreach (var oid in objectsIds)
{
yield return ObjectId.FromHash(oid.Value.ToByteArray());
@ -301,12 +375,14 @@ public partial class Client
using var stream = GetObjectInit(request);
var obj = await stream.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
var offset = 0;
var offset = 0L;
var chunk = await stream.ReadChunk();
while (chunk is not null)
while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
{
chunk.CopyTo(payload, offset);
var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
Array.Copy(chunk, 0, payload, offset, length);
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
@ -321,6 +397,7 @@ public partial class Client
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
return new ObjectReader
{
Call = _objectServiceClient!.Get(initRequest)
@ -376,3 +453,5 @@ public partial class Client
}