Compare commits

...

3 commits

Author SHA1 Message Date
c2894168d5 mulitenant client 2024-08-16 11:15:48 +03:00
Pavel Gross
549ec510c8 [#21] Client: Allows multinenant client
Using one client for several owners

Signed-off-by: Pavel Gross <p.gross@yando.com>
2024-08-16 11:15:39 +03:00
Pavel Gross
5d34ac9cab [#16] Optimize memory usage
Provide custom buffer and use ArrayPool

Signed-off-by: Pavel Gross <p.gross@yando.com>
2024-08-16 11:11:36 +03:00
3 changed files with 69 additions and 35 deletions

View file

@ -1,4 +1,4 @@
using FrostFS.SDK.ModelsV2; using System.Collections.Specialized;
using System.IO; using System.IO;
namespace FrostFS.SDK.ClientV2.Parameters; namespace FrostFS.SDK.ClientV2.Parameters;
@ -29,7 +29,7 @@ public sealed class PrmObjectPut : PrmBase, ISessionToken
/// Overrides default size of the buffer for stream transferring. /// Overrides default size of the buffer for stream transferring.
/// </summary> /// </summary>
/// <value>Size of the buffer</value> /// <value>Size of the buffer</value>
public int BufferMaxSize { get; set; } public int BufferMaxSize { get; set; }
/// <summary> /// <summary>
/// Allows to define a buffer for chunks to manage by the memory allocation and releasing. /// Allows to define a buffer for chunks to manage by the memory allocation and releasing.

View file

@ -306,46 +306,68 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
var restBytes = args.FullLength - args.CurrentStreamPosition; var restBytes = args.FullLength - args.CurrentStreamPosition;
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize); chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
bool isRentBuffer = false;
byte[]? chunkBuffer = null;
var chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize); try
var sentBytes = 0;
// 0 means no limit from client, so server side cut is performed
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
var stream = await GetUploadStream(args, ctx);
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
{ {
// send chanks limited to default or user's settings if (args.CustomBuffer != null)
var bufferSize = objectLimitSize > 0 ?
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
: chunkSize;
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
if (bytesCount == 0)
break;
sentBytes += bytesCount;
var chunkRequest = new PutRequest
{ {
Body = new PutRequest.Types.Body chunkBuffer = args.CustomBuffer;
}
else
{
chunkBuffer = env.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
isRentBuffer = true;
}
var sentBytes = 0;
// 0 means no limit from client, so server side cut is performed
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
var stream = await GetUploadStream(args, ctx);
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
{
// send chunks limited to default or user's settings
var bufferSize = objectLimitSize > 0 ?
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
: chunkSize;
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
if (bytesCount == 0)
break;
sentBytes += bytesCount;
var chunkRequest = new PutRequest
{ {
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount) Body = new PutRequest.Types.Body
} {
}; Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
}
};
chunkRequest.Sign(ctx.Key);
chunkRequest.Sign(ctx.Key); await stream.Write(chunkRequest);
}
var response = await stream.Close();
Verifier.CheckResponse(response);
await stream.Write(chunkRequest); return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
}
finally
{
if (isRentBuffer && chunkBuffer != null)
{
ArrayPool<byte>.Shared.Return(chunkBuffer);
}
} }
var response = await stream.Close();
Verifier.CheckResponse(response);
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
} }
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx) private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)

View file

@ -7,6 +7,8 @@ namespace FrostFS.SDK.ClientV2;
public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcChannel channel, ModelsV2.Version version) : IDisposable public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcChannel channel, ModelsV2.Version version) : IDisposable
{ {
private ArrayPool<byte> _arrayPool;
internal OwnerId? Owner { get; } = owner; internal OwnerId? Owner { get; } = owner;
internal GrpcChannel Channel { get; private set; } = channel; internal GrpcChannel Channel { get; private set; } = channel;
@ -19,6 +21,16 @@ public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcCh
internal ClientKey? Key { get; } = key != null ? new ClientKey(key) : null; internal ClientKey? Key { get; } = key != null ? new ClientKey(key) : null;
/// <summary>
/// Custom pool is used for predefined sizes of buffers like grpc chunk
/// </summary>
internal ArrayPool<byte> GetArrayPool(int size)
{
_arrayPool ??= ArrayPool<byte>.Create(size, 256);
return _arrayPool;
}
public void Dispose() public void Dispose()
{ {
Dispose(true); Dispose(true);