Compare commits
3 commits
master
...
update_pat
Author | SHA1 | Date | |
---|---|---|---|
|
586703636a | ||
|
e179c32a3c | ||
7c66b4cbe2 |
6 changed files with 156 additions and 91 deletions
|
@ -24,25 +24,14 @@ public static class ObjectHeaderMapper
|
|||
_ => throw new ArgumentException($"Unknown ObjectType. Value: '{header.ObjectType}'.")
|
||||
};
|
||||
|
||||
FrostFsSplit? split = null;
|
||||
|
||||
if (header.Split != null)
|
||||
{
|
||||
var children = header.Split.Children.Count != 0 ? new ReadOnlyCollection<FrostFsObjectId>(
|
||||
header.Split.Children.Select(x => x.ToModel()).ToList()) : null;
|
||||
|
||||
split = new FrostFsSplit(new SplitId(header.Split.SplitId.ToUuid()),
|
||||
header.Split.Previous?.ToModel(),
|
||||
header.Split.Parent?.ToModel(),
|
||||
header.Split.ParentHeader?.ToModel(),
|
||||
null,
|
||||
children);
|
||||
}
|
||||
FrostFsSplit? split = header!.Split != null
|
||||
? header.Split.ToModel()
|
||||
: null;
|
||||
|
||||
var model = new FrostFsObjectHeader(
|
||||
new FrostFsContainerId(Base58.Encode(header.ContainerId.Value.Span)),
|
||||
objTypeName,
|
||||
header.Attributes.Select(attribute => attribute.ToModel()).ToArray(),
|
||||
[.. header.Attributes.Select(attribute => attribute.ToModel())],
|
||||
split,
|
||||
header.OwnerId.ToModel(),
|
||||
header.Version.ToModel())
|
||||
|
@ -52,4 +41,18 @@ public static class ObjectHeaderMapper
|
|||
|
||||
return model;
|
||||
}
|
||||
|
||||
public static FrostFsSplit ToModel(this Header.Types.Split split)
|
||||
{
|
||||
var children = split!.Children.Count != 0
|
||||
? new ReadOnlyCollection<FrostFsObjectId>([.. split.Children.Select(x => x.ToModel())])
|
||||
: null;
|
||||
|
||||
return new FrostFsSplit(new SplitId(split.SplitId.ToUuid()),
|
||||
split.Previous?.ToModel(),
|
||||
split.Parent?.ToModel(),
|
||||
split.ParentHeader?.ToModel(),
|
||||
null,
|
||||
children);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
using System.Collections.ObjectModel;
|
||||
using System.Linq;
|
||||
using FrostFS.Object;
|
||||
using FrostFS.SDK.Client.Mappers.GRPC;
|
||||
|
||||
namespace FrostFS.SDK;
|
||||
|
||||
|
@ -9,6 +12,8 @@ public class FrostFsSplit(SplitId splitId,
|
|||
FrostFsSignature? parentSignature = null,
|
||||
ReadOnlyCollection<FrostFsObjectId>? children = null)
|
||||
{
|
||||
private Header.Types.Split? _split;
|
||||
|
||||
public FrostFsSplit() : this(new SplitId())
|
||||
{
|
||||
}
|
||||
|
@ -17,11 +22,33 @@ public class FrostFsSplit(SplitId splitId,
|
|||
|
||||
public FrostFsObjectId? Previous { get; } = previous;
|
||||
|
||||
public FrostFsObjectId? Parent { get; } = parent;
|
||||
public FrostFsObjectId? Parent { get; internal set; } = parent;
|
||||
|
||||
public FrostFsSignature? ParentSignature { get; } = parentSignature;
|
||||
|
||||
public FrostFsObjectHeader? ParentHeader { get; set; } = parentHeader;
|
||||
|
||||
public ReadOnlyCollection<FrostFsObjectId>? Children { get; } = children;
|
||||
|
||||
public Header.Types.Split GetSplit()
|
||||
{
|
||||
if (_split == null)
|
||||
{
|
||||
_split = new Header.Types.Split
|
||||
{
|
||||
SplitId = SplitId?.GetSplitId(),
|
||||
Parent = Parent?.ToMessage(),
|
||||
ParentHeader = ParentHeader?.GetHeader(),
|
||||
ParentSignature = ParentSignature?.ToMessage(),
|
||||
Previous = Previous?.ToMessage(),
|
||||
};
|
||||
|
||||
if (Children != null)
|
||||
{
|
||||
_split.Children.AddRange(Children.Select(x => x.ToMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
return _split;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ public readonly struct PrmObjectPatch(
|
|||
FrostFsSessionToken? sessionToken = null,
|
||||
bool replaceAttributes = false,
|
||||
FrostFsAttributePair[]? newAttributes = null,
|
||||
FrostFsSplit? newSplitHeader = null,
|
||||
string[]? xheaders = null) : ISessionToken, System.IEquatable<PrmObjectPatch>
|
||||
{
|
||||
public FrostFsAddress Address { get; } = address;
|
||||
|
@ -23,6 +24,8 @@ public readonly struct PrmObjectPatch(
|
|||
|
||||
public FrostFsAttributePair[]? NewAttributes { get; } = newAttributes;
|
||||
|
||||
public FrostFsSplit? NewSplitHeader { get; } = newSplitHeader;
|
||||
|
||||
public bool ReplaceAttributes { get; } = replaceAttributes;
|
||||
|
||||
public int MaxChunkLength { get; } = maxChunkLength;
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using FrostFS.Object;
|
||||
using FrostFS.Refs;
|
||||
using FrostFS.SDK.Client;
|
||||
using FrostFS.SDK.Client.Interfaces;
|
||||
using FrostFS.SDK.Client.Mappers.GRPC;
|
||||
using FrostFS.Session;
|
||||
|
@ -295,86 +293,96 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args, CallContext ctx)
|
||||
{
|
||||
var chunkSize = args.MaxChunkLength;
|
||||
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
|
||||
|
||||
|
||||
var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
|
||||
var address = new Address
|
||||
{
|
||||
// common
|
||||
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
|
||||
bool isFirstChunk = true;
|
||||
ulong currentPos = args.Range.Offset;
|
||||
|
||||
var address = new Address
|
||||
ObjectId = args.Address.ObjectId,
|
||||
ContainerId = args.Address.ContainerId
|
||||
};
|
||||
var request = new PatchRequest
|
||||
{
|
||||
Body = new()
|
||||
{
|
||||
ObjectId = args.Address.ObjectId,
|
||||
ContainerId = args.Address.ContainerId
|
||||
};
|
||||
Address = address,
|
||||
}
|
||||
};
|
||||
var protoToken = sessionToken.CreateObjectTokenContext(
|
||||
address,
|
||||
ObjectSessionContext.Types.Verb.Patch,
|
||||
ClientContext.Key);
|
||||
|
||||
while (true)
|
||||
request.AddMetaHeader(args.XHeaders, protoToken);
|
||||
|
||||
if (args.NewAttributes is { Length: > 0 })
|
||||
{
|
||||
request.Body.ReplaceAttributes = args.ReplaceAttributes;
|
||||
foreach (var attr in args.NewAttributes)
|
||||
{
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (bytesCount == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
var request = new PatchRequest()
|
||||
{
|
||||
Body = new()
|
||||
{
|
||||
Address = address,
|
||||
Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
{
|
||||
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
|
||||
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (isFirstChunk)
|
||||
{
|
||||
var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
var protoToken = sessionToken.CreateObjectTokenContext(
|
||||
address,
|
||||
ObjectSessionContext.Types.Verb.Patch,
|
||||
ClientContext.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, protoToken);
|
||||
|
||||
if (args.NewAttributes != null && args.NewAttributes.Length > 0)
|
||||
{
|
||||
foreach (var attr in args.NewAttributes)
|
||||
{
|
||||
request.Body.NewAttributes.Add(attr.ToMessage());
|
||||
request.Body.ReplaceAttributes = args.ReplaceAttributes;
|
||||
}
|
||||
}
|
||||
|
||||
isFirstChunk = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
request.AddMetaHeader(args.XHeaders);
|
||||
}
|
||||
|
||||
request.Sign(ClientContext.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
currentPos += (ulong)bytesCount;
|
||||
request.Body.NewAttributes.Add(attr.ToMessage());
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
||||
if (args.NewSplitHeader != null)
|
||||
{
|
||||
if (chunkBuffer != null)
|
||||
request.Body.NewSplitHeader = ObjectTools.CreateSplit(args.NewSplitHeader, ClientContext.Key,
|
||||
ClientContext.Owner, ClientContext.Version);
|
||||
if (request.Body.NewSplitHeader.Parent != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
args.NewSplitHeader.Parent = request.Body.NewSplitHeader.Parent.ToModel();
|
||||
}
|
||||
}
|
||||
request.Sign(ClientContext.Key);
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
if (args.Payload != null)
|
||||
{
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
{
|
||||
// common
|
||||
chunkBuffer = ArrayPool<byte>.Shared.Rent(chunkSize);
|
||||
ulong currentPos = args.Range.Offset;
|
||||
|
||||
while (true)
|
||||
{
|
||||
var bytesCount = await args.Payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (bytesCount == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
request = new PatchRequest()
|
||||
{
|
||||
Body = new()
|
||||
{
|
||||
Address = address,
|
||||
Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
{
|
||||
Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)),
|
||||
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader(args.XHeaders);
|
||||
request.Sign(ClientContext.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
currentPos += (ulong)bytesCount;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (chunkBuffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -385,7 +393,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
return response.Body.ObjectId.ToModel();
|
||||
}
|
||||
|
||||
|
||||
internal async Task<FrostFsObjectId> PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx)
|
||||
{
|
||||
var stream = args.Payload!;
|
||||
|
@ -567,7 +575,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
break;
|
||||
|
||||
sentBytes += bytesCount;
|
||||
|
||||
|
||||
var chunkRequest = new PutRequest
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
|
@ -622,7 +630,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
var initRequest = new PutRequest
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
{
|
||||
Init = new PutRequest.Types.Body.Types.Init
|
||||
{
|
||||
Header = grpcHeader,
|
||||
|
|
|
@ -131,6 +131,26 @@ public static class ObjectTools
|
|||
grpcHeader.Split.Previous = split.Previous?.ToMessage();
|
||||
}
|
||||
|
||||
internal static Header.Types.Split CreateSplit(FrostFsSplit split, ClientKey key, FrostFsOwner owner, FrostFsVersion version)
|
||||
{
|
||||
if (split.ParentHeader == null)
|
||||
{
|
||||
return split.GetSplit();
|
||||
}
|
||||
split.ParentHeader.PayloadCheckSum ??= Array.Empty<byte>().Sha256();
|
||||
var grpcParentHeader = CreateHeader(split.ParentHeader, split.ParentHeader.PayloadCheckSum, owner, version);
|
||||
|
||||
var res = split.GetSplit();
|
||||
res.ParentHeader = grpcParentHeader;
|
||||
res.Parent ??= new ObjectID { Value = grpcParentHeader.Sha256() };
|
||||
res.ParentSignature ??= new Signature
|
||||
{
|
||||
Key = key.PublicKeyProto,
|
||||
Sign = key.ECDsaKey.SignData(res.Parent.ToByteArray()),
|
||||
};
|
||||
return res;
|
||||
}
|
||||
|
||||
internal static Header CreateHeader(
|
||||
FrostFsObjectHeader header,
|
||||
ReadOnlyMemory<byte> payloadChecksum,
|
||||
|
|
|
@ -887,6 +887,10 @@ message PatchRequest {
|
|||
// key, then it just replaces it while merging the lists.
|
||||
bool replace_attributes = 3;
|
||||
|
||||
// New split header for the object. This defines how the object will relate
|
||||
// to other objects in a split operation.
|
||||
neo.fs.v2.object.Header.Split new_split_header = 5;
|
||||
|
||||
// The patch for the object's payload.
|
||||
message Patch {
|
||||
// The range of the source object for which the payload is replaced by the
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue