From 18126ea763e3e1a7745e2fb429bc15be243df05e Mon Sep 17 00:00:00 2001 From: Pavel Gross Date: Mon, 5 Aug 2024 11:21:05 +0300 Subject: [PATCH] [#20] Optimize memory usage Provide custom buffer and use ArrayPool Signed-off-by: Pavel Gross --- .../Parameters/PrmObjectPut.cs | 10 ++- .../Services/ObjectServiceProvider.cs | 88 ++++++++++++------- .../Tools/ClientEnvironment.cs | 13 +++ 3 files changed, 76 insertions(+), 35 deletions(-) diff --git a/src/FrostFS.SDK.ClientV2/Parameters/PrmObjectPut.cs b/src/FrostFS.SDK.ClientV2/Parameters/PrmObjectPut.cs index cd859d7..fea3bd5 100644 --- a/src/FrostFS.SDK.ClientV2/Parameters/PrmObjectPut.cs +++ b/src/FrostFS.SDK.ClientV2/Parameters/PrmObjectPut.cs @@ -1,6 +1,7 @@ -using System.Collections.Specialized; +using FrostFS.SDK.ModelsV2; +using System.Collections.Specialized; using System.IO; -using FrostFS.SDK.ModelsV2; + namespace FrostFS.SDK.ClientV2.Parameters; public sealed class PrmObjectPut : IContext, ISessionToken @@ -31,6 +32,11 @@ public sealed class PrmObjectPut : IContext, ISessionToken /// Size of the buffer public int BufferMaxSize { get; set; } + /// + /// Allows to define a buffer for chunks to manage by the memory allocation and releasing. + /// + public byte[]? CustomBuffer { get; set; } + /// /// FrostFS request X-Headers /// diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs index 6696aab..26ba3e8 100644 --- a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs +++ b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs @@ -306,46 +306,68 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C var restBytes = args.FullLength - args.CurrentStreamPosition; chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize); + + bool isRentBuffer = false; + byte[]? chunkBuffer = null; - var chunkBuffer = ArrayPool.Shared.Rent(chunkSize); - var sentBytes = 0; - - // 0 means no limit from client, so server side cut is performed - var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0; - - var stream = await GetUploadStream(args, ctx); - - while (objectLimitSize == 0 || sentBytes < objectLimitSize) + try { - // send chanks limited to default or user's settings - var bufferSize = objectLimitSize > 0 ? - (int)Math.Min(objectLimitSize - sentBytes, chunkSize) - : chunkSize; - - var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken); - - if (bytesCount == 0) - break; - - sentBytes += bytesCount; - - var chunkRequest = new PutRequest + if (args.CustomBuffer != null) { - Body = new PutRequest.Types.Body + chunkBuffer = args.CustomBuffer; + } + else + { + chunkBuffer = env.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize); + isRentBuffer = true; + } + + var sentBytes = 0; + + // 0 means no limit from client, so server side cut is performed + var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0; + + var stream = await GetUploadStream(args, ctx); + + while (objectLimitSize == 0 || sentBytes < objectLimitSize) + { + // send chanks limited to default or user's settings + var bufferSize = objectLimitSize > 0 ? + (int)Math.Min(objectLimitSize - sentBytes, chunkSize) + : chunkSize; + + var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken); + + if (bytesCount == 0) + break; + + sentBytes += bytesCount; + + var chunkRequest = new PutRequest { - Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount) - } - }; + Body = new PutRequest.Types.Body + { + Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount) + } + }; + + chunkRequest.Sign(Context.Key.ECDsaKey); - chunkRequest.Sign(Context.Key.ECDsaKey); + await stream.Write(chunkRequest); + } - await stream.Write(chunkRequest); + var response = await stream.Close(); + Verifier.CheckResponse(response); + + return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes); + } + finally + { + if (isRentBuffer && chunkBuffer != null) + { + ArrayPool.Shared.Return(chunkBuffer); + } } - - var response = await stream.Close(); - Verifier.CheckResponse(response); - - return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes); } private async Task GetUploadStream(PrmObjectPut args, Context ctx) diff --git a/src/FrostFS.SDK.ClientV2/Tools/ClientEnvironment.cs b/src/FrostFS.SDK.ClientV2/Tools/ClientEnvironment.cs index c1f4d84..4f4fa1b 100644 --- a/src/FrostFS.SDK.ClientV2/Tools/ClientEnvironment.cs +++ b/src/FrostFS.SDK.ClientV2/Tools/ClientEnvironment.cs @@ -4,11 +4,14 @@ using Grpc.Net.Client; using System; using System.Security.Cryptography; using FrostFS.SDK.Cryptography; +using System.Buffers; namespace FrostFS.SDK.ClientV2; public class ClientEnvironment(Client client, ECDsa key, OwnerId owner, GrpcChannel channel, ModelsV2.Version version) : IDisposable { + private ArrayPool _arrayPool; + internal OwnerId Owner { get; } = owner; internal GrpcChannel Channel { get; private set; } = channel; internal ModelsV2.Version Version { get; } = version; @@ -18,6 +21,16 @@ public class ClientEnvironment(Client client, ECDsa key, OwnerId owner, GrpcChan internal ClientKey Key { get; } = new ClientKey(key); + /// + /// Custom pool is used for predefined sizes of buffers like grpc chunk + /// + internal ArrayPool GetArrayPool(int size) + { + _arrayPool ??= ArrayPool.Create(size, 256); + + return _arrayPool; + } + public void Dispose() { Dispose(true);