[#28] Client: Apply code optimizations

Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
Pavel Gross 2024-11-18 16:57:20 +03:00
parent 766f61a5f7
commit 749000a090
57 changed files with 845 additions and 1116 deletions

View file

@ -17,31 +17,35 @@ using Google.Protobuf;
namespace FrostFS.SDK.Client;
internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientContext clientCtx)
: ContextAccessor(clientCtx), ISessionProvider
: ContextAccessor(clientCtx)
{
private SessionProvider? sessions;
private readonly ObjectService.ObjectServiceClient client = client;
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
public async ValueTask<FrostFsSessionToken> GetDefaultSession(ISessionToken args, CallContext ctx)
{
sessions ??= new(ClientContext);
if (ClientContext.SessionCache.Cache != null &&
ClientContext.SessionCache.Cache.ContainsKey(ClientContext.SessionCacheKey))
if (!ClientContext.SessionCache!.TryGetValue(ClientContext.SessionCacheKey, out var token))
{
return (SessionToken)ClientContext.SessionCache.Cache[ClientContext.SessionCacheKey];
var protoToken = await sessions.GetDefaultSession(args, ctx).ConfigureAwait(false);
token = new FrostFsSessionToken(protoToken);
ClientContext.SessionCache.SetValue(ClientContext.SessionCacheKey, token);
}
return await sessions.GetOrCreateSession(args, ctx).ConfigureAwait(false);
if (token == null)
{
throw new FrostFsException("Cannot create session");
}
return token;
}
internal async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
var ctx = args.Context!;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new HeadRequest
{
@ -55,16 +59,16 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Head,
ctx.Key);
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ctx.Key);
request.Sign(ClientContext.Key.ECDsaKey);
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
@ -77,11 +81,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var ctx = args.Context!;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new GetRequest
{
Body = new GetRequest.Types.Body
@ -94,16 +93,16 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Get,
ctx.Key);
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ctx.Key);
request.Sign(ClientContext.Key.ECDsaKey);
return await GetObject(request, ctx).ConfigureAwait(false);
}
@ -112,11 +111,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var ctx = args.Context!;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new GetRangeRequest
{
Body = new GetRangeRequest.Types.Body
@ -135,16 +129,16 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Range,
ctx.Key);
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ctx.Key);
request.Sign(ClientContext.Key.ECDsaKey);
var call = client.GetRange(request, null, ctx.Deadline, ctx.CancellationToken);
return new RangeReader(call);
@ -154,11 +148,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var ctx = args.Context!;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new GetRangeHashRequest
{
Body = new GetRangeHashRequest.Types.Body
@ -182,16 +171,16 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
});
}
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Rangehash,
ctx.Key);
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ctx.Key);
request.Sign(ClientContext.Key.ECDsaKey);
var response = await client.GetRangeHashAsync(request, null, ctx.Deadline, ctx.CancellationToken);
@ -206,10 +195,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
internal async Task DeleteObjectAsync(PrmObjectDelete args)
{
var ctx = args.Context!;
ctx.Key ??= ClientContext.Key?.ECDsaKey;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new DeleteRequest
{
@ -223,15 +208,15 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Delete,
ctx.Key);
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(ctx.Key);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ClientContext.Key.ECDsaKey);
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
@ -242,9 +227,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
@ -256,22 +238,30 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage()));
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
new Address { ContainerId = request.Body.ContainerId },
ObjectSessionContext.Types.Verb.Search,
ctx.Key);
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ctx.Key);
request.Sign(ClientContext.Key.ECDsaKey);
var objectsIds = SearchObjects(request, ctx);
using var stream = GetSearchReader(request, ctx);
await foreach (var oid in objectsIds)
while (true)
{
yield return FrostFsObjectId.FromHash(oid.Value.ToByteArray());
var ids = await stream.Read(ctx.CancellationToken).ConfigureAwait(false);
if (ids == null)
yield break;
foreach (var oid in ids)
{
yield return FrostFsObjectId.FromHash(oid.Value.Span);
}
}
}
@ -296,6 +286,8 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
args.FullLength = args.Header.PayloadLength;
else if (args.Payload.CanSeek)
args.FullLength = (ulong)args.Payload.Length;
else
throw new ArgumentException("The stream does not have a length and payload length is not defined");
var response = await PutStreamObject(args).ConfigureAwait(false);
@ -307,39 +299,34 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ctx);
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ClientContext);
var request = new PutSingleRequest
{
Body = new() { Object = grpcObject }
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId },
ObjectSessionContext.Types.Verb.Put,
ctx.Key);
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ctx.Key);
request.Sign(ClientContext.Key.ECDsaKey);
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
Verifier.CheckResponse(response);
return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.Span);
}
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args)
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var chunkSize = args.MaxPayloadPatchChunkLength;
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
@ -350,7 +337,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
try
{
// common
chunkBuffer = ClientContext.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
var address = new Address
{
@ -358,13 +345,12 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
ContainerId = args.Address.ContainerId
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
address,
ObjectSessionContext.Types.Verb.Patch,
ctx.Key
);
ClientContext.Key);
var request = new PatchRequest()
{
@ -403,9 +389,9 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
currentPos += (ulong)bytesCount;
request.AddMetaHeader(args.XHeaders, sessionToken);
request.AddMetaHeader(args.XHeaders, protoToken);
request.Sign(ctx.Key);
request.Sign(ClientContext.Key.ECDsaKey);
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
@ -433,20 +419,17 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var ctx = args.Context!;
var tokenRaw = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var token = new FrostFsSessionToken(tokenRaw.Serialize(), tokenRaw.Body.Id.ToUuid());
args.SessionToken = token;
args.SessionToken ??= await GetDefaultSession(args, ctx).ConfigureAwait(false);
var payloadStream = args.Payload!;
var header = args.Header!;
var fullLength = header.PayloadLength;
if (payloadStream.CanSeek && fullLength == 0)
fullLength = (ulong)payloadStream.Length;
args.FullLength = fullLength;
if (header.PayloadLength > 0)
args.FullLength = header.PayloadLength;
else if (payloadStream.CanSeek)
args.FullLength = (ulong)payloadStream.Length;
else
throw new ArgumentException("The stream does not have a length and payload length is not defined");
if (args.MaxObjectSizeCache == 0)
{
@ -456,12 +439,12 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
}
var restBytes = fullLength - args.CurrentStreamPosition;
var restBytes = args.FullLength - args.CurrentStreamPosition;
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.MaxObjectSizeCache, restBytes) : args.MaxObjectSizeCache;
//define collection capacity
var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0;
var objectsCount = fullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
var objectsCount = args.FullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
List<FrostFsObjectId> sentObjectIds = new(objectsCount);
@ -491,7 +474,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var largeObjectHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, [.. attributes])
{
PayloadLength = fullLength,
PayloadLength = args.FullLength,
};
args.Header.Split!.ParentHeader = largeObjectHeader;
@ -526,8 +509,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
private async Task<PutObjectResult> PutStreamObject(PrmObjectPut args)
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
var payload = args.Payload!;
@ -542,21 +523,26 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
try
{
// 0 means no limit from client, so server side cut is performed
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
if (args.CustomBuffer != null)
{
if (args.CustomBuffer.Length < chunkSize)
{
throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required");
}
chunkBuffer = args.CustomBuffer;
}
else
{
chunkBuffer = ClientContext.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
isRentBuffer = true;
}
var sentBytes = 0;
// 0 means no limit from client, so server side cut is performed
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
using var stream = await GetUploadStream(args, ctx).ConfigureAwait(false);
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
@ -581,7 +567,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
};
chunkRequest.Sign(ctx.Key);
chunkRequest.Sign(ClientContext.Key.ECDsaKey);
await stream.Write(chunkRequest).ConfigureAwait(false);
}
@ -589,7 +575,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
var response = await stream.Close().ConfigureAwait(false);
Verifier.CheckResponse(response);
return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.Span), sentBytes);
}
finally
{
@ -604,17 +590,14 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
{
var header = args.Header!;
if (ctx.Key == null)
throw new ArgumentNullException(nameof(args), "Key is null");
header.OwnerId ??= ctx.OwnerId;
header.Version ??= ctx.Version;
header.OwnerId ??= ClientContext.Owner;
header.Version ??= ClientContext.Version;
var grpcHeader = header.GetHeader();
if (header.Split != null)
{
ObjectTools.SetSplitValues(grpcHeader, header.Split, ctx);
ObjectTools.SetSplitValues(grpcHeader, header.Split, ClientContext);
}
var oid = new ObjectID { Value = grpcHeader.Sha256() };
@ -630,17 +613,16 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
}
};
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var sessionToken = (args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false));
sessionToken.CreateObjectTokenContext(
var protoToken = sessionToken.CreateObjectTokenContext(
new Address { ContainerId = grpcHeader.ContainerId, ObjectId = oid },
ObjectSessionContext.Types.Verb.Put,
ctx.Key
);
ClientContext.Key);
initRequest.AddMetaHeader(args.XHeaders, sessionToken);
initRequest.AddMetaHeader(args.XHeaders, protoToken);
initRequest.Sign(ctx.Key);
initRequest.Sign(ClientContext.Key.ECDsaKey);
return await PutObjectInit(initRequest, ctx).ConfigureAwait(false);
}
@ -681,24 +663,6 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return new ObjectReader(call);
}
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, CallContext ctx)
{
using var stream = GetSearchReader(request, ctx);
while (true)
{
var ids = await stream.Read(ctx.CancellationToken).ConfigureAwait(false);
if (ids == null)
break;
foreach (var oid in ids)
{
yield return oid;
}
}
}
private SearchReader GetSearchReader(SearchRequest initRequest, CallContext ctx)
{
if (initRequest is null)