[#20] Optimize memory usage
Provide custom buffer and use ArrayPool Signed-off-by: Pavel Gross <p.gross@yando.com>
This commit is contained in:
parent
6083834582
commit
18126ea763
3 changed files with 76 additions and 35 deletions
|
@ -1,6 +1,7 @@
|
|||
using System.Collections.Specialized;
|
||||
using FrostFS.SDK.ModelsV2;
|
||||
using System.Collections.Specialized;
|
||||
using System.IO;
|
||||
using FrostFS.SDK.ModelsV2;
|
||||
|
||||
namespace FrostFS.SDK.ClientV2.Parameters;
|
||||
|
||||
public sealed class PrmObjectPut : IContext, ISessionToken
|
||||
|
@ -31,6 +32,11 @@ public sealed class PrmObjectPut : IContext, ISessionToken
|
|||
/// <value>Size of the buffer</value>
|
||||
public int BufferMaxSize { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Allows to define a buffer for chunks to manage by the memory allocation and releasing.
|
||||
/// </summary>
|
||||
public byte[]? CustomBuffer { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// FrostFS request X-Headers
|
||||
/// </summary>
|
||||
|
|
|
@ -306,46 +306,68 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
var restBytes = args.FullLength - args.CurrentStreamPosition;
|
||||
|
||||
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
|
||||
|
||||
bool isRentBuffer = false;
|
||||
byte[]? chunkBuffer = null;
|
||||
|
||||
var chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
var sentBytes = 0;
|
||||
|
||||
// 0 means no limit from client, so server side cut is performed
|
||||
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
||||
|
||||
var stream = await GetUploadStream(args, ctx);
|
||||
|
||||
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
|
||||
try
|
||||
{
|
||||
// send chanks limited to default or user's settings
|
||||
var bufferSize = objectLimitSize > 0 ?
|
||||
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
|
||||
: chunkSize;
|
||||
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
|
||||
|
||||
if (bytesCount == 0)
|
||||
break;
|
||||
|
||||
sentBytes += bytesCount;
|
||||
|
||||
var chunkRequest = new PutRequest
|
||||
if (args.CustomBuffer != null)
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
chunkBuffer = args.CustomBuffer;
|
||||
}
|
||||
else
|
||||
{
|
||||
chunkBuffer = env.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
|
||||
isRentBuffer = true;
|
||||
}
|
||||
|
||||
var sentBytes = 0;
|
||||
|
||||
// 0 means no limit from client, so server side cut is performed
|
||||
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
||||
|
||||
var stream = await GetUploadStream(args, ctx);
|
||||
|
||||
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
|
||||
{
|
||||
// send chanks limited to default or user's settings
|
||||
var bufferSize = objectLimitSize > 0 ?
|
||||
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
|
||||
: chunkSize;
|
||||
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
|
||||
|
||||
if (bytesCount == 0)
|
||||
break;
|
||||
|
||||
sentBytes += bytesCount;
|
||||
|
||||
var chunkRequest = new PutRequest
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
||||
}
|
||||
};
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
||||
}
|
||||
};
|
||||
|
||||
chunkRequest.Sign(Context.Key.ECDsaKey);
|
||||
|
||||
chunkRequest.Sign(Context.Key.ECDsaKey);
|
||||
await stream.Write(chunkRequest);
|
||||
}
|
||||
|
||||
await stream.Write(chunkRequest);
|
||||
var response = await stream.Close();
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (isRentBuffer && chunkBuffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
var response = await stream.Close();
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
|
||||
}
|
||||
|
||||
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)
|
||||
|
|
|
@ -4,11 +4,14 @@ using Grpc.Net.Client;
|
|||
using System;
|
||||
using System.Security.Cryptography;
|
||||
using FrostFS.SDK.Cryptography;
|
||||
using System.Buffers;
|
||||
|
||||
namespace FrostFS.SDK.ClientV2;
|
||||
|
||||
public class ClientEnvironment(Client client, ECDsa key, OwnerId owner, GrpcChannel channel, ModelsV2.Version version) : IDisposable
|
||||
{
|
||||
private ArrayPool<byte> _arrayPool;
|
||||
|
||||
internal OwnerId Owner { get; } = owner;
|
||||
internal GrpcChannel Channel { get; private set; } = channel;
|
||||
internal ModelsV2.Version Version { get; } = version;
|
||||
|
@ -18,6 +21,16 @@ public class ClientEnvironment(Client client, ECDsa key, OwnerId owner, GrpcChan
|
|||
|
||||
internal ClientKey Key { get; } = new ClientKey(key);
|
||||
|
||||
/// <summary>
|
||||
/// Custom pool is used for predefined sizes of buffers like grpc chunk
|
||||
/// </summary>
|
||||
internal ArrayPool<byte> GetArrayPool(int size)
|
||||
{
|
||||
_arrayPool ??= ArrayPool<byte>.Create(size, 256);
|
||||
|
||||
return _arrayPool;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
|
|
Loading…
Reference in a new issue