Compare commits
3 commits
2aa1b4382f
...
c2894168d5
Author | SHA1 | Date | |
---|---|---|---|
c2894168d5 | |||
|
549ec510c8 | ||
|
5d34ac9cab |
3 changed files with 69 additions and 35 deletions
|
@ -1,4 +1,4 @@
|
||||||
using FrostFS.SDK.ModelsV2;
|
using System.Collections.Specialized;
|
||||||
using System.IO;
|
using System.IO;
|
||||||
|
|
||||||
namespace FrostFS.SDK.ClientV2.Parameters;
|
namespace FrostFS.SDK.ClientV2.Parameters;
|
||||||
|
@ -29,7 +29,7 @@ public sealed class PrmObjectPut : PrmBase, ISessionToken
|
||||||
/// Overrides default size of the buffer for stream transferring.
|
/// Overrides default size of the buffer for stream transferring.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <value>Size of the buffer</value>
|
/// <value>Size of the buffer</value>
|
||||||
public int BufferMaxSize { get; set; }
|
public int BufferMaxSize { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Allows to define a buffer for chunks to manage by the memory allocation and releasing.
|
/// Allows to define a buffer for chunks to manage by the memory allocation and releasing.
|
||||||
|
|
|
@ -306,46 +306,68 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
||||||
var restBytes = args.FullLength - args.CurrentStreamPosition;
|
var restBytes = args.FullLength - args.CurrentStreamPosition;
|
||||||
|
|
||||||
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
|
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
|
||||||
|
|
||||||
|
bool isRentBuffer = false;
|
||||||
|
byte[]? chunkBuffer = null;
|
||||||
|
|
||||||
var chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
try
|
||||||
var sentBytes = 0;
|
|
||||||
|
|
||||||
// 0 means no limit from client, so server side cut is performed
|
|
||||||
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
|
||||||
|
|
||||||
var stream = await GetUploadStream(args, ctx);
|
|
||||||
|
|
||||||
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
|
|
||||||
{
|
{
|
||||||
// send chanks limited to default or user's settings
|
if (args.CustomBuffer != null)
|
||||||
var bufferSize = objectLimitSize > 0 ?
|
|
||||||
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
|
|
||||||
: chunkSize;
|
|
||||||
|
|
||||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
|
|
||||||
|
|
||||||
if (bytesCount == 0)
|
|
||||||
break;
|
|
||||||
|
|
||||||
sentBytes += bytesCount;
|
|
||||||
|
|
||||||
var chunkRequest = new PutRequest
|
|
||||||
{
|
{
|
||||||
Body = new PutRequest.Types.Body
|
chunkBuffer = args.CustomBuffer;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
chunkBuffer = env.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
|
||||||
|
isRentBuffer = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
var sentBytes = 0;
|
||||||
|
|
||||||
|
// 0 means no limit from client, so server side cut is performed
|
||||||
|
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
||||||
|
|
||||||
|
var stream = await GetUploadStream(args, ctx);
|
||||||
|
|
||||||
|
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
|
||||||
|
{
|
||||||
|
// send chunks limited to default or user's settings
|
||||||
|
var bufferSize = objectLimitSize > 0 ?
|
||||||
|
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
|
||||||
|
: chunkSize;
|
||||||
|
|
||||||
|
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
|
||||||
|
|
||||||
|
if (bytesCount == 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
sentBytes += bytesCount;
|
||||||
|
|
||||||
|
var chunkRequest = new PutRequest
|
||||||
{
|
{
|
||||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
Body = new PutRequest.Types.Body
|
||||||
}
|
{
|
||||||
};
|
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
chunkRequest.Sign(ctx.Key);
|
||||||
|
|
||||||
chunkRequest.Sign(ctx.Key);
|
await stream.Write(chunkRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
var response = await stream.Close();
|
||||||
|
Verifier.CheckResponse(response);
|
||||||
|
|
||||||
await stream.Write(chunkRequest);
|
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (isRentBuffer && chunkBuffer != null)
|
||||||
|
{
|
||||||
|
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var response = await stream.Close();
|
|
||||||
Verifier.CheckResponse(response);
|
|
||||||
|
|
||||||
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)
|
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)
|
||||||
|
|
|
@ -7,6 +7,8 @@ namespace FrostFS.SDK.ClientV2;
|
||||||
|
|
||||||
public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcChannel channel, ModelsV2.Version version) : IDisposable
|
public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcChannel channel, ModelsV2.Version version) : IDisposable
|
||||||
{
|
{
|
||||||
|
private ArrayPool<byte> _arrayPool;
|
||||||
|
|
||||||
internal OwnerId? Owner { get; } = owner;
|
internal OwnerId? Owner { get; } = owner;
|
||||||
|
|
||||||
internal GrpcChannel Channel { get; private set; } = channel;
|
internal GrpcChannel Channel { get; private set; } = channel;
|
||||||
|
@ -19,6 +21,16 @@ public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcCh
|
||||||
|
|
||||||
internal ClientKey? Key { get; } = key != null ? new ClientKey(key) : null;
|
internal ClientKey? Key { get; } = key != null ? new ClientKey(key) : null;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Custom pool is used for predefined sizes of buffers like grpc chunk
|
||||||
|
/// </summary>
|
||||||
|
internal ArrayPool<byte> GetArrayPool(int size)
|
||||||
|
{
|
||||||
|
_arrayPool ??= ArrayPool<byte>.Create(size, 256);
|
||||||
|
|
||||||
|
return _arrayPool;
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
Dispose(true);
|
Dispose(true);
|
||||||
|
|
Loading…
Reference in a new issue