Add attr-only patch without body modification
This commit is contained in:
parent
7c66b4cbe2
commit
e179c32a3c
3 changed files with 104 additions and 81 deletions
|
@ -22,7 +22,7 @@ public class FrostFsSplit(SplitId splitId,
|
|||
|
||||
public FrostFsObjectId? Previous { get; } = previous;
|
||||
|
||||
public FrostFsObjectId? Parent { get; } = parent;
|
||||
public FrostFsObjectId? Parent { get; internal set; } = parent;
|
||||
|
||||
public FrostFsSignature? ParentSignature { get; } = parentSignature;
|
||||
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using FrostFS.Object;
|
||||
using FrostFS.Refs;
|
||||
using FrostFS.SDK.Client;
|
||||
using FrostFS.SDK.Client.Interfaces;
|
||||
using FrostFS.SDK.Client.Mappers.GRPC;
|
||||
using FrostFS.Session;
|
||||
|
@ -295,91 +293,96 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args, CallContext ctx)
|
||||
{
|
||||
var chunkSize = args.MaxChunkLength;
|
||||
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
|
||||
|
||||
var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
|
||||
var address = new Address
|
||||
{
|
||||
// common
|
||||
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
|
||||
bool isFirstChunk = true;
|
||||
ulong currentPos = args.Range.Offset;
|
||||
|
||||
var address = new Address
|
||||
ObjectId = args.Address.ObjectId,
|
||||
ContainerId = args.Address.ContainerId
|
||||
};
|
||||
var request = new PatchRequest
|
||||
{
|
||||
Body = new()
|
||||
{
|
||||
ObjectId = args.Address.ObjectId,
|
||||
ContainerId = args.Address.ContainerId
|
||||
};
|
||||
Address = address,
|
||||
}
|
||||
};
|
||||
var protoToken = sessionToken.CreateObjectTokenContext(
|
||||
address,
|
||||
ObjectSessionContext.Types.Verb.Patch,
|
||||
ClientContext.Key);
|
||||
|
||||
while (true)
|
||||
request.AddMetaHeader(args.XHeaders, protoToken);
|
||||
|
||||
if (args.NewAttributes is { Length: > 0 })
|
||||
{
|
||||
request.Body.ReplaceAttributes = args.ReplaceAttributes;
|
||||
foreach (var attr in args.NewAttributes)
|
||||
{
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (bytesCount == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
var request = new PatchRequest()
|
||||
{
|
||||
Body = new()
|
||||
{
|
||||
Address = address,
|
||||
Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
{
|
||||
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
|
||||
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (isFirstChunk)
|
||||
{
|
||||
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
var protoToken = sessionToken.CreateObjectTokenContext(
|
||||
address,
|
||||
ObjectSessionContext.Types.Verb.Patch,
|
||||
ClientContext.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, protoToken);
|
||||
|
||||
if (args.NewAttributes != null && args.NewAttributes.Length > 0)
|
||||
{
|
||||
foreach (var attr in args.NewAttributes)
|
||||
{
|
||||
request.Body.NewAttributes.Add(attr.ToMessage());
|
||||
request.Body.ReplaceAttributes = args.ReplaceAttributes;
|
||||
}
|
||||
}
|
||||
|
||||
if (args.NewSplitHeader != null)
|
||||
{
|
||||
request.Body.NewSplitHeader = args.NewSplitHeader.GetSplit();
|
||||
}
|
||||
|
||||
isFirstChunk = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
request.AddMetaHeader(args.XHeaders);
|
||||
}
|
||||
|
||||
request.Sign(ClientContext.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
currentPos += (ulong)bytesCount;
|
||||
request.Body.NewAttributes.Add(attr.ToMessage());
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
||||
if (args.NewSplitHeader != null)
|
||||
{
|
||||
if (chunkBuffer != null)
|
||||
request.Body.NewSplitHeader = ObjectTools.CreateSplit(args.NewSplitHeader, ClientContext.Key,
|
||||
ClientContext.Owner, ClientContext.Version);
|
||||
if (request.Body.NewSplitHeader.Parent != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
args.NewSplitHeader.Parent = request.Body.NewSplitHeader.Parent.ToModel();
|
||||
}
|
||||
}
|
||||
request.Sign(ClientContext.Key);
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
if (args.Payload != null)
|
||||
{
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
{
|
||||
// common
|
||||
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
ulong currentPos = args.Range.Offset;
|
||||
|
||||
while (true)
|
||||
{
|
||||
var bytesCount = await args.Payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (bytesCount == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
request = new PatchRequest()
|
||||
{
|
||||
Body = new()
|
||||
{
|
||||
Address = address,
|
||||
Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
{
|
||||
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
|
||||
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader(args.XHeaders);
|
||||
request.Sign(ClientContext.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
currentPos += (ulong)bytesCount;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (chunkBuffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -131,6 +131,26 @@ public static class ObjectTools
|
|||
grpcHeader.Split.Previous = split.Previous?.ToMessage();
|
||||
}
|
||||
|
||||
internal static Header.Types.Split CreateSplit(FrostFsSplit split, ClientKey key, FrostFsOwner owner, FrostFsVersion version)
|
||||
{
|
||||
if (split.ParentHeader == null)
|
||||
{
|
||||
return split.GetSplit();
|
||||
}
|
||||
split.ParentHeader.PayloadCheckSum ??= Array.Empty<byte>().Sha256();
|
||||
var grpcParentHeader = CreateHeader(split.ParentHeader, split.ParentHeader.PayloadCheckSum, owner, version);
|
||||
|
||||
var res = split.GetSplit();
|
||||
res.ParentHeader = grpcParentHeader;
|
||||
res.Parent ??= new ObjectID { Value = grpcParentHeader.Sha256() };
|
||||
res.ParentSignature ??= new Signature
|
||||
{
|
||||
Key = key.PublicKeyProto,
|
||||
Sign = key.ECDsaKey.SignData(res.Parent.ToByteArray()),
|
||||
};
|
||||
return res;
|
||||
}
|
||||
|
||||
internal static Header CreateHeader(
|
||||
FrostFsObjectHeader header,
|
||||
ReadOnlyMemory<byte> payloadChecksum,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue