Compare commits
3 commits
2aa1b4382f
...
c2894168d5
Author | SHA1 | Date | |
---|---|---|---|
c2894168d5 | |||
|
549ec510c8 | ||
|
5d34ac9cab |
3 changed files with 69 additions and 35 deletions
|
@ -1,4 +1,4 @@
|
|||
using FrostFS.SDK.ModelsV2;
|
||||
using System.Collections.Specialized;
|
||||
using System.IO;
|
||||
|
||||
namespace FrostFS.SDK.ClientV2.Parameters;
|
||||
|
@ -29,7 +29,7 @@ public sealed class PrmObjectPut : PrmBase, ISessionToken
|
|||
/// Overrides default size of the buffer for stream transferring.
|
||||
/// </summary>
|
||||
/// <value>Size of the buffer</value>
|
||||
public int BufferMaxSize { get; set; }
|
||||
public int BufferMaxSize { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Allows to define a buffer for chunks to manage by the memory allocation and releasing.
|
||||
|
|
|
@ -306,46 +306,68 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
var restBytes = args.FullLength - args.CurrentStreamPosition;
|
||||
|
||||
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
|
||||
|
||||
bool isRentBuffer = false;
|
||||
byte[]? chunkBuffer = null;
|
||||
|
||||
var chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
var sentBytes = 0;
|
||||
|
||||
// 0 means no limit from client, so server side cut is performed
|
||||
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
||||
|
||||
var stream = await GetUploadStream(args, ctx);
|
||||
|
||||
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
|
||||
try
|
||||
{
|
||||
// send chanks limited to default or user's settings
|
||||
var bufferSize = objectLimitSize > 0 ?
|
||||
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
|
||||
: chunkSize;
|
||||
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
|
||||
|
||||
if (bytesCount == 0)
|
||||
break;
|
||||
|
||||
sentBytes += bytesCount;
|
||||
|
||||
var chunkRequest = new PutRequest
|
||||
if (args.CustomBuffer != null)
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
chunkBuffer = args.CustomBuffer;
|
||||
}
|
||||
else
|
||||
{
|
||||
chunkBuffer = env.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
|
||||
isRentBuffer = true;
|
||||
}
|
||||
|
||||
var sentBytes = 0;
|
||||
|
||||
// 0 means no limit from client, so server side cut is performed
|
||||
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
||||
|
||||
var stream = await GetUploadStream(args, ctx);
|
||||
|
||||
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
|
||||
{
|
||||
// send chunks limited to default or user's settings
|
||||
var bufferSize = objectLimitSize > 0 ?
|
||||
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
|
||||
: chunkSize;
|
||||
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
|
||||
|
||||
if (bytesCount == 0)
|
||||
break;
|
||||
|
||||
sentBytes += bytesCount;
|
||||
|
||||
var chunkRequest = new PutRequest
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
||||
}
|
||||
};
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
||||
}
|
||||
};
|
||||
|
||||
chunkRequest.Sign(ctx.Key);
|
||||
|
||||
chunkRequest.Sign(ctx.Key);
|
||||
await stream.Write(chunkRequest);
|
||||
}
|
||||
|
||||
var response = await stream.Close();
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
await stream.Write(chunkRequest);
|
||||
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (isRentBuffer && chunkBuffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
var response = await stream.Close();
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return new PutObjectResult(ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
|
||||
}
|
||||
|
||||
private async Task<ObjectStreamer> GetUploadStream(PrmObjectPut args, Context ctx)
|
||||
|
|
|
@ -7,6 +7,8 @@ namespace FrostFS.SDK.ClientV2;
|
|||
|
||||
public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcChannel channel, ModelsV2.Version version) : IDisposable
|
||||
{
|
||||
private ArrayPool<byte> _arrayPool;
|
||||
|
||||
internal OwnerId? Owner { get; } = owner;
|
||||
|
||||
internal GrpcChannel Channel { get; private set; } = channel;
|
||||
|
@ -19,6 +21,16 @@ public class ClientEnvironment(Client client, ECDsa? key, OwnerId? owner, GrpcCh
|
|||
|
||||
internal ClientKey? Key { get; } = key != null ? new ClientKey(key) : null;
|
||||
|
||||
/// <summary>
|
||||
/// Custom pool is used for predefined sizes of buffers like grpc chunk
|
||||
/// </summary>
|
||||
internal ArrayPool<byte> GetArrayPool(int size)
|
||||
{
|
||||
_arrayPool ??= ArrayPool<byte>.Create(size, 256);
|
||||
|
||||
return _arrayPool;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
|
|
Loading…
Reference in a new issue