[#69] Fix for Patch and uint types
Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
parent
eebba7665b
commit
0816be732a
20 changed files with 212 additions and 153 deletions
|
@ -8,7 +8,7 @@ using System.Runtime.CompilerServices;
|
|||
"e15ab287e6239c98d5dfa91615bd77485d523a3a3f65a4e5028454cedd5ac4d9eca6da18b81985"+
|
||||
"ac6905d33cc64b5a2587050c16f67b71ef8889dbd3c90ef7cc0b06bbbe09886601d195f5db179a"+
|
||||
"3c2a25b1")]
|
||||
[assembly: AssemblyFileVersion("1.0.6.0")]
|
||||
[assembly: AssemblyFileVersion("1.0.7.0")]
|
||||
[assembly: AssemblyProduct("FrostFS.SDK.Client")]
|
||||
[assembly: AssemblyTitle("FrostFS.SDK.Client")]
|
||||
[assembly: AssemblyVersion("1.0.6")]
|
||||
[assembly: AssemblyVersion("1.0.7")]
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
<Nullable>enable</Nullable>
|
||||
<AnalysisMode>AllEnabledByDefault</AnalysisMode>
|
||||
<PackageId>FrostFS.SDK.Client</PackageId>
|
||||
<Version>1.0.6</Version>
|
||||
<Version>1.0.7</Version>
|
||||
<Description>
|
||||
C# SDK for FrostFS gRPC native protocol
|
||||
</Description>
|
||||
|
|
|
@ -16,8 +16,8 @@ public static class MetaHeaderMapper
|
|||
return new RequestMetaHeader
|
||||
{
|
||||
Version = metaHeader.Version.ToMessage(),
|
||||
Epoch = (uint)metaHeader.Epoch,
|
||||
Ttl = (uint)metaHeader.Ttl
|
||||
Epoch = metaHeader.Epoch,
|
||||
Ttl = metaHeader.Ttl
|
||||
};
|
||||
}
|
||||
}
|
|
@ -11,7 +11,7 @@ public static class PolicyMapper
|
|||
{
|
||||
return new Replica
|
||||
{
|
||||
Count = (uint)replica.Count,
|
||||
Count = replica.Count,
|
||||
Selector = replica.Selector,
|
||||
EcDataCount = replica.EcDataCount,
|
||||
EcParityCount = replica.EcParityCount
|
||||
|
@ -25,7 +25,7 @@ public static class PolicyMapper
|
|||
throw new ArgumentNullException(nameof(replica));
|
||||
}
|
||||
|
||||
return new FrostFsReplica((int)replica.Count, replica.Selector)
|
||||
return new FrostFsReplica(replica.Count, replica.Selector)
|
||||
{
|
||||
EcDataCount = replica.EcDataCount,
|
||||
EcParityCount = replica.EcParityCount
|
||||
|
|
|
@ -18,7 +18,7 @@ public static class VersionMapper
|
|||
throw new System.ArgumentNullException(nameof(model));
|
||||
}
|
||||
|
||||
var key = model.Major << 16 + model.Minor;
|
||||
var key = (int)model.Major << 16 + (int)model.Minor;
|
||||
|
||||
if (!_cacheMessages.ContainsKey(key))
|
||||
{
|
||||
|
@ -28,8 +28,8 @@ public static class VersionMapper
|
|||
_spinlock.Enter(ref lockTaken);
|
||||
var message = new Version
|
||||
{
|
||||
Major = (uint)model.Major,
|
||||
Minor = (uint)model.Minor
|
||||
Major = model.Major,
|
||||
Minor = model.Minor
|
||||
};
|
||||
|
||||
_cacheMessages.Add(key, message);
|
||||
|
@ -64,7 +64,7 @@ public static class VersionMapper
|
|||
try
|
||||
{
|
||||
_spinlock.Enter(ref lockTaken);
|
||||
var model = new FrostFsVersion((int)message.Major, (int)message.Minor);
|
||||
var model = new FrostFsVersion(message.Major, message.Minor);
|
||||
|
||||
_cacheModels.Add(key, model);
|
||||
return model;
|
||||
|
|
|
@ -4,12 +4,12 @@ namespace FrostFS.SDK;
|
|||
|
||||
public struct FrostFsReplica : IEquatable<FrostFsReplica>
|
||||
{
|
||||
public int Count { get; set; }
|
||||
public uint Count { get; set; }
|
||||
public string Selector { get; set; }
|
||||
public uint EcDataCount { get; set; }
|
||||
public uint EcParityCount { get; set; }
|
||||
|
||||
public FrostFsReplica(int count, string? selector = null)
|
||||
public FrostFsReplica(uint count, string? selector = null)
|
||||
{
|
||||
selector ??= string.Empty;
|
||||
|
||||
|
@ -31,12 +31,12 @@ public struct FrostFsReplica : IEquatable<FrostFsReplica>
|
|||
|
||||
public readonly uint CountNodes()
|
||||
{
|
||||
return Count != 0 ? (uint)Count : EcDataCount + EcParityCount;
|
||||
return Count != 0 ? Count : EcDataCount + EcParityCount;
|
||||
}
|
||||
|
||||
public override readonly int GetHashCode()
|
||||
{
|
||||
return (Count + Selector.GetHashCode()) ^ (int)EcDataCount ^ (int)EcParityCount;
|
||||
return Count.GetHashCode() ^ Selector.GetHashCode() ^ (int)EcDataCount ^ (int)EcParityCount;
|
||||
}
|
||||
|
||||
public static bool operator ==(FrostFsReplica left, FrostFsReplica right)
|
||||
|
|
|
@ -3,12 +3,12 @@ using FrostFS.SDK.Client.Mappers.GRPC;
|
|||
|
||||
namespace FrostFS.SDK;
|
||||
|
||||
public class FrostFsVersion(int major, int minor)
|
||||
public class FrostFsVersion(uint major, uint minor)
|
||||
{
|
||||
private Version? version;
|
||||
|
||||
public int Major { get; set; } = major;
|
||||
public int Minor { get; set; } = minor;
|
||||
public uint Major { get; set; } = major;
|
||||
public uint Minor { get; set; } = minor;
|
||||
|
||||
internal Version VersionID
|
||||
{
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
namespace FrostFS.SDK;
|
||||
|
||||
public class MetaHeader(FrostFsVersion version, int epoch, int ttl)
|
||||
public class MetaHeader(FrostFsVersion version, ulong epoch, uint ttl)
|
||||
{
|
||||
public FrostFsVersion Version { get; set; } = version;
|
||||
public int Epoch { get; set; } = epoch;
|
||||
public int Ttl { get; set; } = ttl;
|
||||
public ulong Epoch { get; set; } = epoch;
|
||||
public uint Ttl { get; set; } = ttl;
|
||||
|
||||
public static MetaHeader Default()
|
||||
{
|
||||
|
|
|
@ -295,86 +295,88 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args, CallContext ctx)
|
||||
{
|
||||
var chunkSize = args.MaxChunkLength;
|
||||
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
|
||||
|
||||
|
||||
var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
var address = new Address
|
||||
{
|
||||
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
ObjectId = args.Address.ObjectId,
|
||||
ContainerId = args.Address.ContainerId
|
||||
};
|
||||
|
||||
bool isFirstChunk = true;
|
||||
ulong currentPos = args.Range.Offset;
|
||||
|
||||
var address = new Address
|
||||
if (args.Payload != null && args.Payload.Length > 0)
|
||||
{
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
{
|
||||
ObjectId = args.Address.ObjectId,
|
||||
ContainerId = args.Address.ContainerId
|
||||
};
|
||||
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
|
||||
while (true)
|
||||
{
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false);
|
||||
bool isFirstChunk = true;
|
||||
ulong currentPos = args.Range.Offset;
|
||||
|
||||
if (bytesCount == 0)
|
||||
while (true)
|
||||
{
|
||||
break;
|
||||
}
|
||||
var bytesCount = await args.Payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
var request = new PatchRequest()
|
||||
{
|
||||
Body = new()
|
||||
if (bytesCount == 0)
|
||||
{
|
||||
Address = address,
|
||||
Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
break;
|
||||
}
|
||||
|
||||
PatchRequest request;
|
||||
|
||||
if (isFirstChunk)
|
||||
{
|
||||
request = await CreateFirstRequest(args, ctx, address).ConfigureAwait(false);
|
||||
|
||||
request.Body.Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
{
|
||||
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
|
||||
SourceRange = new Range { Offset = currentPos, Length = (ulong)bytesCount }
|
||||
}
|
||||
};
|
||||
|
||||
isFirstChunk = false;
|
||||
}
|
||||
};
|
||||
|
||||
if (isFirstChunk)
|
||||
{
|
||||
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
var protoToken = sessionToken.CreateObjectTokenContext(
|
||||
address,
|
||||
ObjectSessionContext.Types.Verb.Patch,
|
||||
ClientContext.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, protoToken);
|
||||
|
||||
if (args.NewAttributes != null && args.NewAttributes.Length > 0)
|
||||
else
|
||||
{
|
||||
foreach (var attr in args.NewAttributes)
|
||||
request = new PatchRequest()
|
||||
{
|
||||
request.Body.NewAttributes.Add(attr.ToMessage());
|
||||
request.Body.ReplaceAttributes = args.ReplaceAttributes;
|
||||
}
|
||||
Body = new()
|
||||
{
|
||||
Address = address,
|
||||
Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
{
|
||||
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
|
||||
SourceRange = new Range { Offset = currentPos, Length = (ulong)bytesCount }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader(args.XHeaders);
|
||||
}
|
||||
|
||||
isFirstChunk = false;
|
||||
request.Sign(ClientContext.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
currentPos += (ulong)bytesCount;
|
||||
}
|
||||
else
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (chunkBuffer != null)
|
||||
{
|
||||
request.AddMetaHeader(args.XHeaders);
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
|
||||
request.Sign(ClientContext.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
currentPos += (ulong)bytesCount;
|
||||
}
|
||||
}
|
||||
finally
|
||||
else if (args.NewAttributes != null && args.NewAttributes.Length > 0)
|
||||
{
|
||||
if (chunkBuffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
PatchRequest request = await CreateFirstRequest(args, ctx, address).ConfigureAwait(false);
|
||||
|
||||
request.Sign(ClientContext.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
|
||||
|
@ -383,9 +385,36 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
Verifier.CheckResponse(response);
|
||||
|
||||
return response.Body.ObjectId.ToModel();
|
||||
|
||||
async Task<PatchRequest> CreateFirstRequest(PrmObjectPatch args, CallContext ctx, Address address)
|
||||
{
|
||||
var body = new PatchRequest.Types.Body() { Address = address };
|
||||
|
||||
if (args.NewAttributes != null)
|
||||
{
|
||||
body.ReplaceAttributes = args.ReplaceAttributes;
|
||||
|
||||
foreach (var attr in args.NewAttributes!)
|
||||
{
|
||||
body.NewAttributes.Add(attr.ToMessage());
|
||||
}
|
||||
}
|
||||
|
||||
var request = new PatchRequest() { Body = body };
|
||||
|
||||
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
var protoToken = sessionToken.CreateObjectTokenContext(
|
||||
address,
|
||||
ObjectSessionContext.Types.Verb.Patch,
|
||||
ClientContext.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, protoToken);
|
||||
return request;
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
|
||||
internal async Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
|
||||
{
|
||||
if (args.Payload == null)
|
||||
throw new ArgumentException(nameof(args.Payload));
|
||||
|
@ -431,9 +460,9 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
|
||||
|
||||
progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
|
||||
|
||||
|
||||
var remain = fullLength;
|
||||
|
||||
|
||||
byte[]? buffer = null;
|
||||
bool isRentBuffer = false;
|
||||
|
||||
|
@ -443,7 +472,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
{
|
||||
if (args.CustomBuffer.Length < chunkSize)
|
||||
throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required");
|
||||
|
||||
|
||||
buffer = args.CustomBuffer;
|
||||
}
|
||||
else
|
||||
|
@ -457,7 +486,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
while (remain > 0)
|
||||
{
|
||||
var bytesToWrite = Math.Min((ulong)partSize, remain);
|
||||
var bytesToWrite = Math.Min((ulong)partSize, remain);
|
||||
var isLastPart = remain <= (ulong)partSize;
|
||||
|
||||
// When the last part of the object is uploaded, all metadata for the object must be added
|
||||
|
@ -502,7 +531,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
var part = new ObjectPartInfo(offset, uploaded, objectId);
|
||||
offset += uploaded;
|
||||
progressInfo.AddPart(part);
|
||||
|
||||
|
||||
remain -= bytesToWrite;
|
||||
|
||||
if (isLastPart)
|
||||
|
@ -582,16 +611,16 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0;
|
||||
var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0;
|
||||
|
||||
progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
|
||||
|
||||
// if the object fits one part, it can be loaded as non-complex object, but if it is not upload resuming
|
||||
if (objectsCount == 1 && progressInfo != null && progressInfo.GetLast().Length == 0)
|
||||
if (objectsCount == 1 && progressInfo.GetLast().Length == 0)
|
||||
{
|
||||
args.PutObjectContext.MaxObjectSizeCache = partSize;
|
||||
args.PutObjectContext.FullLength = fullLength;
|
||||
var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false);
|
||||
return singlePartResult.ObjectId;
|
||||
}
|
||||
|
||||
progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount);
|
||||
|
||||
var remain = fullLength;
|
||||
|
||||
|
@ -659,8 +688,10 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
offset += size;
|
||||
|
||||
if (i < objectsCount)
|
||||
{
|
||||
continue;
|
||||
|
||||
}
|
||||
|
||||
// Once all parts of the object are uploaded, they must be linked into a single entity
|
||||
var linkObject = new FrostFsLinkObject(args.Header.ContainerId, progressInfo.SplitId, parentHeader!, [.. progressInfo.GetParts().Select(p => p.ObjectId)]);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue