[#20] Client: Optimize memory usage

Avoid memory allocation, use cache and static

Signed-off-by: Pavel Gross <p.gross@yando.com>
This commit is contained in:
Pavel Gross 2024-08-01 16:17:36 +03:00
parent 35fe791406
commit 0ddde467cd
46 changed files with 596 additions and 372 deletions

View file

@ -13,13 +13,13 @@ using FrostFS.Session;
using FrostFS.SDK.ModelsV2;
using FrostFS.SDK.ClientV2.Extensions;
using FrostFS.SDK.ClientV2.Parameters;
using System.Buffers;
namespace FrostFS.SDK.ClientV2;
internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment ctx) : ContextAccessor(ctx), ISessionProvider
internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment env) : ContextAccessor(env), ISessionProvider
{
readonly ObjectTools tools = new(ctx);
readonly SessionProvider sessions = new (ctx);
readonly SessionProvider sessions = new (env);
public async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
{
@ -35,8 +35,8 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
{
Address = new Address
{
ContainerId = args.ContainerId.ToGrpcMessage(),
ObjectId = args.ObjectId.ToGrpcMessage()
ContainerId = args.ContainerId.ToMessage(),
ObjectId = args.ObjectId.ToMessage()
}
}
};
@ -46,11 +46,11 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Head,
Context.Key);
Context.Key.ECDsaKey);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
request.Sign(Context.Key.ECDsaKey);
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
@ -69,8 +69,8 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
{
Address = new Address
{
ContainerId = args.ContainerId.ToGrpcMessage(),
ObjectId = args.ObjectId.ToGrpcMessage()
ContainerId = args.ContainerId.ToMessage(),
ObjectId = args.ObjectId.ToMessage()
}
}
};
@ -80,11 +80,11 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Get,
Context.Key);
Context.Key.ECDsaKey);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
request.Sign(Context.Key.ECDsaKey);
return await GetObject(request, ctx);
}
@ -98,8 +98,8 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
{
Address = new Address
{
ContainerId = args.ContainerId.ToGrpcMessage(),
ObjectId = args.ObjectId.ToGrpcMessage()
ContainerId = args.ContainerId.ToMessage(),
ObjectId = args.ObjectId.ToMessage()
}
}
};
@ -109,10 +109,10 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Delete,
Context.Key);
Context.Key.ECDsaKey);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
request.Sign(Context.Key.ECDsaKey);
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
@ -126,24 +126,24 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
{
Body = new SearchRequest.Types.Body
{
ContainerId = args.ContainerId.ToGrpcMessage(),
ContainerId = args.ContainerId.ToMessage(),
Filters = { },
Version = 1 // TODO: clarify this param
}
};
request.Body.Filters.AddRange(args.Filters.Select(f => f.ToGrpcMessage()));
request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage()));
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = request.Body.ContainerId },
ObjectSessionContext.Types.Verb.Search,
Context.Key);
Context.Key.ECDsaKey);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
request.Sign(Context.Key.ECDsaKey);
var objectsIds = SearchObjects(request, ctx);
@ -153,24 +153,31 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
}
}
internal Task<ObjectId> PutObjectAsync(PrmObjectPut args)
internal async Task<ObjectId> PutObjectAsync(PrmObjectPut args)
{
if (args.Header == null)
throw new ArgumentException("Value cannot be null", nameof(args.Header));
if (args.Payload == null)
throw new ArgumentException("Value cannot be null", nameof(args.Payload));
if (args.ClientCut)
return PutClientCutObject(args);
else
return PutStreamObject(args);
return await PutClientCutObject(args);
else
{
if (args.Header.PayloadLength > 0)
args.FullLength = args.Header.PayloadLength;
else if (args.Payload.CanSeek)
args.FullLength = (ulong)args.Payload.Length;
return (await PutStreamObject(args)).ObjectId;
}
}
internal async Task<ObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
{
var ctx = args.Context!;
var grpcObject = tools.CreateObject(args.FrostFsObject);
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, env);
var request = new PutSingleRequest
{
@ -185,11 +192,11 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId},
ObjectSessionContext.Types.Verb.Put,
Context.Key);
Context.Key.ECDsaKey);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
request.Sign(Context.Key.ECDsaKey);
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken);
@ -201,156 +208,136 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
private async Task<ObjectId> PutClientCutObject(PrmObjectPut args)
{
var ctx = args.Context!;
var tokenRaw = await GetOrCreateSession(args, ctx);
var token = new ModelsV2.SessionToken(tokenRaw.Serialize());
args.SessionToken = token;
var payloadStream = args.Payload!;
var header = args.Header!;
ObjectId? objectId;
List<ObjectId> sentObjectIds = [];
FrostFsObject? currentObject;
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx));
var objectSize = (int)networkSettings.MaxObjectSize;
var fullLength = header.PayloadLength;
if (payloadStream.CanSeek)
{
objectSize = (int)Math.Min(objectSize, payloadStream.Length);
if (fullLength == 0)
fullLength = (ulong)payloadStream.Length;
}
if (fullLength == 0)
throw new ArgumentException("Payload stream must be able to seek or PayloadLength must be specified");
var buffer = new byte[objectSize];
if (payloadStream.CanSeek && fullLength == 0)
fullLength = (ulong)payloadStream.Length;
var largeObject = new LargeObject(header.ContainerId);
args.FullLength = fullLength;
var split = new Split();
while (true)
if (args.MaxObjectSizeCache == 0)
{
var bytesCount = await payloadStream.ReadAsync(buffer, 0, objectSize);
split.Previous = sentObjectIds.LastOrDefault();
largeObject.Header.PayloadLength += (ulong)bytesCount;
currentObject = new FrostFsObject(header.ContainerId)
.SetPayload(bytesCount < objectSize ? buffer[..bytesCount] : buffer)
.SetSplit(split);
if (largeObject.PayloadLength == fullLength)
break;
objectId = await PutSingleObjectAsync(new PrmSingleObjectPut(currentObject, ctx) { SessionToken = token });
sentObjectIds.Add(objectId!);
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx));
args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
}
if (sentObjectIds.Count != 0)
var restBytes = fullLength - args.CurrentStreamPosition;
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.MaxObjectSizeCache, restBytes) : args.MaxObjectSizeCache;
//define collection capacity
var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0;
var objectsCount = fullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
List<ObjectId> sentObjectIds = new(objectsCount);
Split? split = null;
// keep attributes for the large object
var attributes = args.Header!.Attributes;
// send all parts except the last one as separate Objects
while (restBytes > (ulong)args.MaxObjectSizeCache)
{
largeObject.AddAttributes(args.Header!.Attributes);
currentObject.SetParent(largeObject);
if (split == null)
{
split = new Split();
args.Header!.Attributes = [];
}
var putSingleObjectParams = new PrmSingleObjectPut(currentObject, ctx) { SessionToken = token };
split!.Previous = sentObjectIds.LastOrDefault();
args.Header!.Split = split;
objectId = await PutSingleObjectAsync(putSingleObjectParams);
var result = await PutStreamObject(args);
sentObjectIds.Add(objectId);
sentObjectIds.Add(result.ObjectId);
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
restBytes -= (ulong)result.ObjectSize;
}
// send the last part and create linkObject
if (sentObjectIds.Count > 0)
{
var largeObjectHeader = new ObjectHeader(header.ContainerId) { PayloadLength = fullLength };
largeObjectHeader.Attributes.AddRange(attributes);
args.Header.Split!.ParentHeader = largeObjectHeader;
var result = await PutStreamObject(args);
sentObjectIds.Add(result.ObjectId);
var linkObject = new LinkObject(header.ContainerId, split!.SplitId, largeObjectHeader)
.AddChildren(sentObjectIds);
linkObject.Header.Attributes.Clear();
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject) { Context = args.Context});
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject, ctx){ SessionToken = token });
return tools.CalculateObjectId(largeObject.Header);
return split.Parent!;
}
currentObject
.SetSplit(null)
.AddAttributes(args.Header!.Attributes);
return await PutSingleObjectAsync(new PrmSingleObjectPut(currentObject, ctx));
// We are here if the payload is placed to one Object. It means no cut action, just simple PUT.
var singlePartResult = await PutStreamObject(args);
return singlePartResult.ObjectId;
}
private async Task<ObjectId> PutStreamObject(PrmObjectPut args)
struct PutObjectResult(ObjectId objectId, int objectSize)
{
public ObjectId ObjectId = objectId;
public int ObjectSize = objectSize;
}
private async Task<PutObjectResult> PutStreamObject(PrmObjectPut args)
{
var ctx = args.Context!;
var payload = args.Payload!;
var header = args.Header!;
var hdr = header.ToGrpcMessage();
hdr.OwnerId = Context.Owner.ToGrpcMessage();
hdr.Version = Context.Version.ToGrpcMessage();
var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
var oid = new ObjectID { Value = hdr.Sha256() };
var restBytes = args.FullLength - args.CurrentStreamPosition;
var initRequest = new PutRequest
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
var chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
var sentBytes = 0;
// 0 means no limit from client, so server side cut is performed
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
var stream = await GetUploadStream(args, ctx);
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
{
Body = new PutRequest.Types.Body
{
Init = new PutRequest.Types.Body.Types.Init
{
Header = hdr
}
}
};
// send chanks limited to default or user's settings
var bufferSize = objectLimitSize > 0 ?
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
: chunkSize;
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = hdr.ContainerId, ObjectId = oid },
ObjectSessionContext.Types.Verb.Put,
Context.Key
);
initRequest.AddMetaHeader(args.XHeaders, sessionToken);
initRequest.Sign(Context.Key);
using var stream = await PutObjectInit(initRequest, ctx);
var bufferSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
if (payload.CanSeek)
{
bufferSize = (int)Math.Min(payload.Length, bufferSize);
}
else if (header.PayloadLength > 0)
{
bufferSize = (int)Math.Min((long)header.PayloadLength, bufferSize);
}
var buffer = new byte[bufferSize];
while (true)
{
var bytesCount = await payload.ReadAsync(buffer, 0, bufferSize, ctx.CancellationToken);
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
if (bytesCount == 0)
break;
var chunkRequest = new PutRequest(initRequest)
sentBytes += bytesCount;
var chunkRequest = new PutRequest
{
Body = new PutRequest.Types.Body
{
Chunk = ByteString.CopyFrom(buffer.AsSpan()[..bytesCount]),
},
VerifyHeader = null
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
}
};
chunkRequest.Sign(Context.Key);
chunkRequest.Sign(Context.Key.ECDsaKey);
await stream.Write(chunkRequest);
}
@ -358,7 +345,49 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
var response = await stream.Close();
Verifier.CheckResponse(response);
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
}
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)
{
var header = args.Header!;
header.OwnerId = Context.Owner;
header.Version = Context.Version;
var grpcHeader = header.ToMessage();
if (header.Split != null)
{
ObjectTools.SetSplitValues(grpcHeader, header.Split, env);
}
var oid = new ObjectID { Value = grpcHeader.Sha256() };
var initRequest = new PutRequest
{
Body = new PutRequest.Types.Body
{
Init = new PutRequest.Types.Body.Types.Init
{
Header = grpcHeader
}
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = grpcHeader.ContainerId, ObjectId = oid },
ObjectSessionContext.Types.Verb.Put,
Context.Key.ECDsaKey
);
initRequest.AddMetaHeader(args.XHeaders, sessionToken);
initRequest.Sign(Context.Key.ECDsaKey);
return await PutObjectInit(initRequest, ctx);
}
private async Task<FrostFsObject> GetObject(GetRequest request, Context ctx)