parent
605463ec24
commit
ae67b12313
28 changed files with 943 additions and 554 deletions
|
@ -1,12 +1,10 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Google.Protobuf;
|
||||
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using FrostFS.Object;
|
||||
using FrostFS.Refs;
|
||||
using FrostFS.SDK.ClientV2.Mappers.GRPC;
|
||||
|
@ -17,15 +15,9 @@ using FrostFS.SDK.ClientV2.Extensions;
|
|||
|
||||
namespace FrostFS.SDK.ClientV2;
|
||||
|
||||
internal class ObjectServiceProvider : ContextAccessor
|
||||
internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment ctx) : ContextAccessor(ctx)
|
||||
{
|
||||
private readonly ObjectService.ObjectServiceClient objectServiceClient;
|
||||
|
||||
internal ObjectServiceProvider(ObjectService.ObjectServiceClient objectServiceClient, ClientEnvironment context)
|
||||
: base (context)
|
||||
{
|
||||
this.objectServiceClient = objectServiceClient;
|
||||
}
|
||||
readonly ObjectTools tools = new(ctx);
|
||||
|
||||
internal async Task<ObjectHeader> GetObjectHeadAsync(ContainerId cid, ObjectId oid, Context ctx)
|
||||
{
|
||||
|
@ -44,7 +36,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
request.AddMetaHeader();
|
||||
request.Sign(Context.Key);
|
||||
|
||||
var response = await objectServiceClient!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
|
@ -97,45 +89,38 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
return PutStreamObject(parameters, ctx);
|
||||
}
|
||||
|
||||
internal async Task<ObjectId> PutSingleObjectAsync(ModelsV2.Object @object, Context ctx)
|
||||
internal async Task<ObjectId> PutSingleObjectAsync(ModelsV2.Object modelObject, Context ctx)
|
||||
{
|
||||
var sessionToken = await GetOrCreateSession(ctx);
|
||||
|
||||
var obj = CreateObject(@object);
|
||||
var grpcObject = tools.CreateObject(modelObject);
|
||||
|
||||
var request = new PutSingleRequest
|
||||
{
|
||||
Body = new PutSingleRequest.Types.Body()
|
||||
{
|
||||
Object = obj
|
||||
Object = grpcObject
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.AddObjectSessionToken(
|
||||
sessionToken,
|
||||
obj.Header.ContainerId,
|
||||
obj.ObjectId,
|
||||
grpcObject.Header.ContainerId,
|
||||
grpcObject.ObjectId,
|
||||
ObjectSessionContext.Types.Verb.Put,
|
||||
Context.Key
|
||||
);
|
||||
|
||||
request.Sign(Context.Key);
|
||||
|
||||
var response = await objectServiceClient!.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return ObjectId.FromHash(obj.ObjectId.Value.ToByteArray());
|
||||
return ObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
|
||||
}
|
||||
|
||||
internal ObjectId CalculateObjectId(ObjectHeader header)
|
||||
{
|
||||
var grpcHeader = CreateHeader(header, []);
|
||||
|
||||
return new ObjectID { Value = grpcHeader.Sha256() }.ToModel();
|
||||
}
|
||||
|
||||
|
||||
internal async Task DeleteObjectAsync(ContainerId cid, ObjectId oid, Context ctx)
|
||||
{
|
||||
var request = new DeleteRequest
|
||||
|
@ -153,7 +138,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
request.AddMetaHeader();
|
||||
request.Sign(Context.Key);
|
||||
|
||||
var response = await objectServiceClient!.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
}
|
||||
|
@ -195,7 +180,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
List<ObjectId> sentObjectIds = [];
|
||||
ModelsV2.Object? currentObject;
|
||||
|
||||
var networkSettings = await Context.NetmapService!.GetNetworkSettingsAsync(ctx);
|
||||
var networkSettings = await Context.Client.GetNetworkSettingsAsync(ctx);
|
||||
|
||||
var partSize = (int)networkSettings.MaxObjectSize;
|
||||
var buffer = new byte[partSize];
|
||||
|
@ -215,7 +200,6 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
largeObject.AppendBlock(buffer, bytesCount);
|
||||
|
||||
currentObject = new ModelsV2.Object(header.ContainerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer)
|
||||
.AddAttributes(header.Attributes)
|
||||
.SetSplit(split);
|
||||
|
||||
if (largeObject.PayloadLength == fullLength)
|
||||
|
@ -228,6 +212,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
|
||||
if (sentObjectIds.Count != 0)
|
||||
{
|
||||
largeObject.AddAttributes(parameters.Header!.Attributes);
|
||||
largeObject.CalculateHash();
|
||||
|
||||
currentObject.SetParent(largeObject);
|
||||
|
@ -238,11 +223,14 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
|
||||
.AddChildren(sentObjectIds);
|
||||
|
||||
linkObject.Header.Attributes.Clear();
|
||||
|
||||
_ = await PutSingleObjectAsync(linkObject, ctx);
|
||||
|
||||
return CalculateObjectId(largeObject.Header);
|
||||
return tools.CalculateObjectId(largeObject.Header);
|
||||
}
|
||||
|
||||
currentObject.AddAttributes(parameters.Header!.Attributes);
|
||||
return await PutSingleObjectAsync(currentObject, ctx);
|
||||
}
|
||||
|
||||
|
@ -257,10 +245,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
hdr.OwnerId = Context.Owner.ToGrpcMessage();
|
||||
hdr.Version = Context.Version.ToGrpcMessage();
|
||||
|
||||
var oid = new ObjectID
|
||||
{
|
||||
Value = hdr.Sha256()
|
||||
};
|
||||
var oid = new ObjectID { Value = hdr.Sha256() };
|
||||
|
||||
var request = new PutRequest
|
||||
{
|
||||
|
@ -269,7 +254,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
Init = new PutRequest.Types.Body.Types.Init
|
||||
{
|
||||
Header = hdr
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -314,13 +299,12 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
{
|
||||
var reader = GetObjectInit(request, ctx);
|
||||
|
||||
var obj = await reader.ReadHeader();
|
||||
var grpcObject = await reader.ReadHeader();
|
||||
var modelObject = grpcObject.ToModel();
|
||||
|
||||
modelObject.ObjectReader = reader;
|
||||
|
||||
var @object = obj.ToModel();
|
||||
|
||||
@object.ObjectReader = reader;
|
||||
|
||||
return @object;
|
||||
return modelObject;
|
||||
}
|
||||
|
||||
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
|
||||
|
@ -328,7 +312,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
if (initRequest is null)
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
|
||||
var call = objectServiceClient!.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
return new ObjectReader(call);
|
||||
}
|
||||
|
@ -340,7 +324,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
var call = objectServiceClient!.Put(null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
await call.RequestStream.WriteAsync(initRequest);
|
||||
|
||||
|
@ -372,98 +356,16 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
var call = objectServiceClient!.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
return new SearchReader(call);
|
||||
}
|
||||
|
||||
private Object.Object CreateObject(ModelsV2.Object @object)
|
||||
{
|
||||
var grpcHeader = @object.Header.ToGrpcMessage();
|
||||
|
||||
grpcHeader.OwnerId = Context.Owner.ToGrpcMessage();
|
||||
grpcHeader.Version = Context.Version.ToGrpcMessage();
|
||||
|
||||
if (@object.Payload != null)
|
||||
{
|
||||
grpcHeader.PayloadLength = (ulong)@object.Payload.Length;
|
||||
grpcHeader.PayloadHash = Sha256Checksum(@object.Payload);
|
||||
}
|
||||
|
||||
var split = @object.Header.Split;
|
||||
if (split != null)
|
||||
{
|
||||
grpcHeader.Split = new Header.Types.Split
|
||||
{
|
||||
SplitId = split.SplitId != null ? ByteString.CopyFrom(split.SplitId.ToBinary()) : null
|
||||
};
|
||||
|
||||
if (split.Children != null && split.Children.Count != 0)
|
||||
grpcHeader.Split.Children.AddRange(split.Children.Select(id => id.ToGrpcMessage()));
|
||||
|
||||
if (split.ParentHeader is not null)
|
||||
{
|
||||
var grpcParentHeader = CreateHeader(split.ParentHeader, []);
|
||||
|
||||
grpcHeader.Split.Parent = new ObjectID { Value = grpcParentHeader.Sha256() };
|
||||
grpcHeader.Split.ParentHeader = grpcParentHeader;
|
||||
grpcHeader.Split.ParentSignature = new Refs.Signature
|
||||
{
|
||||
Key = ByteString.CopyFrom(Context.Key.PublicKey()),
|
||||
Sign = ByteString.CopyFrom(Context.Key.SignData(grpcHeader.Split.Parent.ToByteArray())),
|
||||
};
|
||||
|
||||
split.Parent = grpcHeader.Split.Parent.ToModel();
|
||||
}
|
||||
|
||||
grpcHeader.Split.Previous = split.Previous?.ToGrpcMessage();
|
||||
}
|
||||
|
||||
var obj = new Object.Object
|
||||
{
|
||||
Header = grpcHeader,
|
||||
ObjectId = new ObjectID { Value = grpcHeader.Sha256() },
|
||||
Payload = ByteString.CopyFrom(@object.Payload)
|
||||
};
|
||||
|
||||
obj.Signature = new Refs.Signature
|
||||
{
|
||||
Key = ByteString.CopyFrom(Context.Key.PublicKey()),
|
||||
Sign = ByteString.CopyFrom(Context.Key.SignData(obj.ObjectId.ToByteArray())),
|
||||
};
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
||||
private Header CreateHeader(ObjectHeader header, byte[]? payload)
|
||||
{
|
||||
var grpcHeader = header.ToGrpcMessage();
|
||||
|
||||
grpcHeader.OwnerId = Context.Owner.ToGrpcMessage();
|
||||
grpcHeader.Version = Context.Version.ToGrpcMessage();
|
||||
|
||||
if (header.PayloadCheckSum != null)
|
||||
grpcHeader.PayloadHash = Sha256Checksum(header.PayloadCheckSum);
|
||||
else if (payload != null)
|
||||
grpcHeader.PayloadHash = Sha256Checksum(payload);
|
||||
|
||||
return grpcHeader;
|
||||
}
|
||||
|
||||
private static Checksum Sha256Checksum(byte[] data)
|
||||
{
|
||||
return new Checksum
|
||||
{
|
||||
Type = ChecksumType.Sha256,
|
||||
Sum = ByteString.CopyFrom(data.Sha256())
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<Session.SessionToken> GetOrCreateSession(Context ctx)
|
||||
{
|
||||
if (string.IsNullOrEmpty(ctx.SessionToken))
|
||||
{
|
||||
return await Context.SessionService!.CreateSessionAsync(uint.MaxValue, ctx);
|
||||
return await Context.Client.CreateSessionInternalAsync(uint.MaxValue, ctx);
|
||||
}
|
||||
|
||||
return Convert.FromBase64String(ctx.SessionToken).DeserializeSessionToken();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue