Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Aleksey Kravchenko
586703636a Add previous field handling in FrostFsSplit grpc export 2025-03-28 14:02:07 +03:00
Aleksey Kravchenko
e179c32a3c Add attr-only patch without body modification 2025-03-28 09:24:44 +03:00
7c66b4cbe2 [42] Client: add splitId patch
Signed-off-by: Pavel Gross <p.gross@yadro.com>
2025-03-26 16:51:42 +03:00
6 changed files with 156 additions and 91 deletions

View file

@ -24,25 +24,14 @@ public static class ObjectHeaderMapper
_ => throw new ArgumentException($"Unknown ObjectType. Value: '{header.ObjectType}'.")
};
FrostFsSplit? split = null;
if (header.Split != null)
{
var children = header.Split.Children.Count != 0 ? new ReadOnlyCollection<FrostFsObjectId>(
header.Split.Children.Select(x => x.ToModel()).ToList()) : null;
split = new FrostFsSplit(new SplitId(header.Split.SplitId.ToUuid()),
header.Split.Previous?.ToModel(),
header.Split.Parent?.ToModel(),
header.Split.ParentHeader?.ToModel(),
null,
children);
}
FrostFsSplit? split = header!.Split != null
? header.Split.ToModel()
: null;
var model = new FrostFsObjectHeader(
new FrostFsContainerId(Base58.Encode(header.ContainerId.Value.Span)),
objTypeName,
header.Attributes.Select(attribute => attribute.ToModel()).ToArray(),
[.. header.Attributes.Select(attribute => attribute.ToModel())],
split,
header.OwnerId.ToModel(),
header.Version.ToModel())
@ -52,4 +41,18 @@ public static class ObjectHeaderMapper
return model;
}
public static FrostFsSplit ToModel(this Header.Types.Split split)
{
var children = split!.Children.Count != 0
? new ReadOnlyCollection<FrostFsObjectId>([.. split.Children.Select(x => x.ToModel())])
: null;
return new FrostFsSplit(new SplitId(split.SplitId.ToUuid()),
split.Previous?.ToModel(),
split.Parent?.ToModel(),
split.ParentHeader?.ToModel(),
null,
children);
}
}

View file

@ -1,4 +1,7 @@
using System.Collections.ObjectModel;
using System.Linq;
using FrostFS.Object;
using FrostFS.SDK.Client.Mappers.GRPC;
namespace FrostFS.SDK;
@ -9,6 +12,8 @@ public class FrostFsSplit(SplitId splitId,
FrostFsSignature? parentSignature = null,
ReadOnlyCollection<FrostFsObjectId>? children = null)
{
private Header.Types.Split? _split;
public FrostFsSplit() : this(new SplitId())
{
}
@ -17,11 +22,33 @@ public class FrostFsSplit(SplitId splitId,
public FrostFsObjectId? Previous { get; } = previous;
public FrostFsObjectId? Parent { get; } = parent;
public FrostFsObjectId? Parent { get; internal set; } = parent;
public FrostFsSignature? ParentSignature { get; } = parentSignature;
public FrostFsObjectHeader? ParentHeader { get; set; } = parentHeader;
public ReadOnlyCollection<FrostFsObjectId>? Children { get; } = children;
public Header.Types.Split GetSplit()
{
if (_split == null)
{
_split = new Header.Types.Split
{
SplitId = SplitId?.GetSplitId(),
Parent = Parent?.ToMessage(),
ParentHeader = ParentHeader?.GetHeader(),
ParentSignature = ParentSignature?.ToMessage(),
Previous = Previous?.ToMessage(),
};
if (Children != null)
{
_split.Children.AddRange(Children.Select(x => x.ToMessage()));
}
}
return _split;
}
}

View file

@ -10,6 +10,7 @@ public readonly struct PrmObjectPatch(
FrostFsSessionToken? sessionToken = null,
bool replaceAttributes = false,
FrostFsAttributePair[]? newAttributes = null,
FrostFsSplit? newSplitHeader = null,
string[]? xheaders = null) : ISessionToken, System.IEquatable<PrmObjectPatch>
{
public FrostFsAddress Address { get; } = address;
@ -23,6 +24,8 @@ public readonly struct PrmObjectPatch(
public FrostFsAttributePair[]? NewAttributes { get; } = newAttributes;
public FrostFsSplit? NewSplitHeader { get; } = newSplitHeader;
public bool ReplaceAttributes { get; } = replaceAttributes;
public int MaxChunkLength { get; } = maxChunkLength;

View file

@ -1,13 +1,11 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using FrostFS.Object;
using FrostFS.Refs;
using FrostFS.SDK.Client;
using FrostFS.SDK.Client.Interfaces;
using FrostFS.SDK.Client.Mappers.GRPC;
using FrostFS.Session;
@ -295,86 +293,96 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args, CallContext ctx)
{
var chunkSize = args.MaxChunkLength;
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken);
byte[]? chunkBuffer = null;
try
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
var address = new Address
{
// common
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
bool isFirstChunk = true;
ulong currentPos = args.Range.Offset;
var address = new Address
ObjectId = args.Address.ObjectId,
ContainerId = args.Address.ContainerId
};
var request = new PatchRequest
{
Body = new()
{
ObjectId = args.Address.ObjectId,
ContainerId = args.Address.ContainerId
};
Address = address,
}
};
var protoToken = sessionToken.CreateObjectTokenContext(
address,
ObjectSessionContext.Types.Verb.Patch,
ClientContext.Key);
while (true)
request.AddMetaHeader(args.XHeaders, protoToken);
if (args.NewAttributes is { Length: > 0 })
{
request.Body.ReplaceAttributes = args.ReplaceAttributes;
foreach (var attr in args.NewAttributes)
{
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false);
if (bytesCount == 0)
{
break;
}
var request = new PatchRequest()
{
Body = new()
{
Address = address,
Patch = new PatchRequest.Types.Body.Types.Patch
{
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
}
}
};
if (isFirstChunk)
{
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
var protoToken = sessionToken.CreateObjectTokenContext(
address,
ObjectSessionContext.Types.Verb.Patch,
ClientContext.Key);
request.AddMetaHeader(args.XHeaders, protoToken);
if (args.NewAttributes != null && args.NewAttributes.Length > 0)
{
foreach (var attr in args.NewAttributes)
{
request.Body.NewAttributes.Add(attr.ToMessage());
request.Body.ReplaceAttributes = args.ReplaceAttributes;
}
}
isFirstChunk = false;
}
else
{
request.AddMetaHeader(args.XHeaders);
}
request.Sign(ClientContext.Key);
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
currentPos += (ulong)bytesCount;
request.Body.NewAttributes.Add(attr.ToMessage());
}
}
finally
if (args.NewSplitHeader != null)
{
if (chunkBuffer != null)
request.Body.NewSplitHeader = ObjectTools.CreateSplit(args.NewSplitHeader, ClientContext.Key,
ClientContext.Owner, ClientContext.Version);
if (request.Body.NewSplitHeader.Parent != null)
{
ArrayPool<byte>.Shared.Return(chunkBuffer);
args.NewSplitHeader.Parent = request.Body.NewSplitHeader.Parent.ToModel();
}
}
request.Sign(ClientContext.Key);
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
if (args.Payload != null)
{
byte[]? chunkBuffer = null;
try
{
// common
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
ulong currentPos = args.Range.Offset;
while (true)
{
var bytesCount = await args.Payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken)
.ConfigureAwait(false);
if (bytesCount == 0)
{
break;
}
request = new PatchRequest()
{
Body = new()
{
Address = address,
Patch = new PatchRequest.Types.Body.Types.Patch
{
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
}
}
};
request.AddMetaHeader(args.XHeaders);
request.Sign(ClientContext.Key);
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
currentPos += (ulong)bytesCount;
}
}
finally
{
if (chunkBuffer != null)
{
ArrayPool<byte>.Shared.Return(chunkBuffer);
}
}
}
@ -385,7 +393,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
return response.Body.ObjectId.ToModel();
}
internal async Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
{
var stream = args.Payload!;
@ -567,7 +575,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
break;
sentBytes += bytesCount;
var chunkRequest = new PutRequest
{
Body = new PutRequest.Types.Body
@ -622,7 +630,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
var initRequest = new PutRequest
{
Body = new PutRequest.Types.Body
{
{
Init = new PutRequest.Types.Body.Types.Init
{
Header = grpcHeader,

View file

@ -131,6 +131,26 @@ public static class ObjectTools
grpcHeader.Split.Previous = split.Previous?.ToMessage();
}
internal static Header.Types.Split CreateSplit(FrostFsSplit split, ClientKey key, FrostFsOwner owner, FrostFsVersion version)
{
if (split.ParentHeader == null)
{
return split.GetSplit();
}
split.ParentHeader.PayloadCheckSum ??= Array.Empty<byte>().Sha256();
var grpcParentHeader = CreateHeader(split.ParentHeader, split.ParentHeader.PayloadCheckSum, owner, version);
var res = split.GetSplit();
res.ParentHeader = grpcParentHeader;
res.Parent ??= new ObjectID { Value = grpcParentHeader.Sha256() };
res.ParentSignature ??= new Signature
{
Key = key.PublicKeyProto,
Sign = key.ECDsaKey.SignData(res.Parent.ToByteArray()),
};
return res;
}
internal static Header CreateHeader(
FrostFsObjectHeader header,
ReadOnlyMemory<byte> payloadChecksum,

View file

@ -887,6 +887,10 @@ message PatchRequest {
// key, then it just replaces it while merging the lists.
bool replace_attributes = 3;
// New split header for the object. This defines how the object will relate
// to other objects in a split operation.
neo.fs.v2.object.Header.Split new_split_header = 5;
// The patch for the object's payload.
message Patch {
// The range of the source object for which the payload is replaced by the