[#25] Client: Implement Patch and Range methods

Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
Pavel Gross 2024-11-08 10:38:50 +03:00
parent bff8d67867
commit 003b7fdfdd
51 changed files with 1338 additions and 137 deletions

View file

@ -1,6 +1,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
@ -107,6 +108,101 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return await GetObject(request, ctx).ConfigureAwait(false);
}
internal async Task<RangeReader> GetRangeAsync(PrmRangeGet args)
{
var ctx = args.Context!;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new GetRangeRequest
{
Body = new GetRangeRequest.Types.Body
{
Address = new Address
{
ContainerId = args.ContainerId.ToMessage(),
ObjectId = args.ObjectId.ToMessage()
},
Range = new Object.Range
{
Offset = args.Range.Offset,
Length = args.Range.Length
},
Raw = args.Raw
}
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Range,
ctx.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(ctx.Key);
var call = client.GetRange(request, null, ctx.Deadline, ctx.CancellationToken);
return new RangeReader(call);
}
internal async Task<IEnumerable<ReadOnlyMemory<byte>>> GetRangeHashAsync(PrmRangeHashGet args)
{
var ctx = args.Context!;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new GetRangeHashRequest
{
Body = new GetRangeHashRequest.Types.Body
{
Address = new Address
{
ContainerId = args.ContainerId.ToMessage(),
ObjectId = args.ObjectId.ToMessage()
},
Type = ChecksumType.Sha256,
Salt = ByteString.CopyFrom(args.Salt) // TODO: create a type with calculated cashed ByteString inside
}
};
foreach (var range in args.Ranges)
{
request.Body.Ranges.Add(new Object.Range
{
Length = range.Length,
Offset = range.Offset
});
}
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Rangehash,
ctx.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(ctx.Key);
var response = await client.GetRangeHashAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
var hashCollection = response.Body.HashList.ToArray().Select(h => h.Memory);
return hashCollection;
}
internal async Task DeleteObjectAsync(PrmObjectDelete args)
{
var ctx = args.Context!;
@ -191,7 +287,9 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
throw new ArgumentNullException(nameof(args), "Payload is null");
if (args.ClientCut)
{
return await PutClientCutObject(args).ConfigureAwait(false);
}
else
{
if (args.Header.PayloadLength > 0)
@ -199,7 +297,9 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
else if (args.Payload.CanSeek)
args.FullLength = (ulong)args.Payload.Length;
return (await PutStreamObject(args).ConfigureAwait(false)).ObjectId;
var response = await PutStreamObject(args).ConfigureAwait(false);
return response.ObjectId;
}
}
@ -235,6 +335,100 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
}
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args)
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var chunkSize = args.MaxPayloadPatchChunkLength;
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
var call = client.Patch(null, ctx.Deadline, ctx.CancellationToken);
byte[]? chunkBuffer = null;
try
{
// common
chunkBuffer = ClientContext.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
var address = new Address
{
ObjectId = args.Address.ObjectId,
ContainerId = args.Address.ContainerId
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
address,
ObjectSessionContext.Types.Verb.Patch,
ctx.Key
);
var request = new PatchRequest()
{
Body = new()
{
Address = address,
ReplaceAttributes = args.ReplaceAttributes,
}
};
bool isFirstChunk = true;
ulong currentPos = args.Range.Offset;
while (true)
{
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false);
if (bytesCount == 0)
{
break;
}
if (isFirstChunk && args.NewAttributes != null && args.NewAttributes.Length > 0)
{
foreach (var attr in args.NewAttributes)
{
request.Body.NewAttributes.Add(attr.ToMessage());
}
}
request.Body.Patch = new PatchRequest.Types.Body.Types.Patch
{
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount),
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
};
currentPos += (ulong)bytesCount;
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(ctx.Key);
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
isFirstChunk = false;
}
}
finally
{
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
if (chunkBuffer != null)
{
ArrayPool<byte>.Shared.Return(chunkBuffer);
}
}
var response = await call.ResponseAsync.ConfigureAwait(false);
Verifier.CheckResponse(response);
return response.Body.ObjectId.ToModel();
}
private async Task<FrostFsObjectId> PutClientCutObject(PrmObjectPut args)
{
var ctx = args.Context!;
@ -406,7 +600,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
}
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, CallContext ctx)
private async Task<ObjectStreamer<PutRequest, PutResponse>> GetUploadStream(PrmObjectPut args, CallContext ctx)
{
var header = args.Header!;
@ -451,6 +645,20 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return await PutObjectInit(initRequest, ctx).ConfigureAwait(false);
}
private async Task<ObjectStreamer<PutRequest, PutResponse>> PutObjectInit(PutRequest initRequest, CallContext ctx)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false);
return new ObjectStreamer<PutRequest, PutResponse>(call);
}
private async Task<FrostFsObject> GetObject(GetRequest request, CallContext ctx)
{
var reader = GetObjectInit(request, ctx);
@ -473,20 +681,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return new ObjectReader(call);
}
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, CallContext ctx)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false);
return new ObjectStreamer(call);
}
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, CallContext ctx)
{
using var stream = GetSearchReader(request, ctx);