[#17] Client: Add extra parameter
API methods' parameters types with optional session, polling settings, xHeaders etc. and corresponding handlers have been added Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
parent
00a1e9412f
commit
7b9c19f37c
42 changed files with 1054 additions and 386 deletions
|
@ -12,6 +12,8 @@ using FrostFS.SDK.Cryptography;
|
|||
using FrostFS.Session;
|
||||
using FrostFS.SDK.ModelsV2;
|
||||
using FrostFS.SDK.ClientV2.Extensions;
|
||||
using System.Threading;
|
||||
using FrostFS.SDK.ClientV2.Parameters;
|
||||
|
||||
namespace FrostFS.SDK.ClientV2;
|
||||
|
||||
|
@ -19,21 +21,30 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
{
|
||||
readonly ObjectTools tools = new(ctx);
|
||||
|
||||
internal async Task<ObjectHeader> GetObjectHeadAsync(ContainerId cid, ObjectId oid, Context ctx)
|
||||
internal async Task<ObjectHeader> GetObjectHeadAsync(PrmGetObjectHead args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
var request = new HeadRequest
|
||||
{
|
||||
Body = new HeadRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
ObjectId = oid.ToGrpcMessage()
|
||||
ContainerId = args.ContainerId.ToGrpcMessage(),
|
||||
ObjectId = args.ObjectId.ToGrpcMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
var sessionToken = await GetOrCreateSession(args, ctx);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Head,
|
||||
Context.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(Context.Key);
|
||||
|
||||
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
@ -43,51 +54,59 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
return response.Body.Header.Header.ToModel();
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObject> GetObjectAsync(ContainerId cid, ObjectId oid, Context ctx)
|
||||
internal async Task<FrostFsObject> GetObjectAsync(PrmGetObject args)
|
||||
{
|
||||
var sessionToken = await GetOrCreateSession(ctx);
|
||||
|
||||
var ctx = args.Context!;
|
||||
|
||||
var request = new GetRequest
|
||||
{
|
||||
Body = new GetRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
ObjectId = oid.ToGrpcMessage()
|
||||
ContainerId = args.ContainerId.ToGrpcMessage(),
|
||||
ObjectId = args.ObjectId.ToGrpcMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.AddObjectSessionToken(
|
||||
sessionToken,
|
||||
cid.ToGrpcMessage(),
|
||||
oid.ToGrpcMessage(),
|
||||
var sessionToken = await GetOrCreateSession(args, ctx);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Get,
|
||||
Context.Key
|
||||
);
|
||||
Context.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(Context.Key);
|
||||
|
||||
return await GetObject(request, ctx);
|
||||
}
|
||||
|
||||
internal async Task DeleteObjectAsync(ContainerId cid, ObjectId oid, Context ctx)
|
||||
internal async Task DeleteObjectAsync(PrmDeleteObject args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
var request = new DeleteRequest
|
||||
{
|
||||
Body = new DeleteRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
ObjectId = oid.ToGrpcMessage()
|
||||
ContainerId = args.ContainerId.ToGrpcMessage(),
|
||||
ObjectId = args.ObjectId.ToGrpcMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
var sessionToken = await GetOrCreateSession(args, ctx);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Delete,
|
||||
Context.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
request.Sign(Context.Key);
|
||||
|
||||
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
@ -95,24 +114,30 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
Verifier.CheckResponse(response);
|
||||
}
|
||||
|
||||
internal async IAsyncEnumerable<ObjectId> SearchObjectsAsync(
|
||||
ContainerId cid,
|
||||
IEnumerable<ObjectFilter> filters,
|
||||
Context ctx)
|
||||
internal async IAsyncEnumerable<ObjectId> SearchObjectsAsync(PrmSearchObject args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
var request = new SearchRequest
|
||||
{
|
||||
Body = new SearchRequest.Types.Body
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
ContainerId = args.ContainerId.ToGrpcMessage(),
|
||||
Filters = { },
|
||||
Version = 1 // TODO: clarify this param
|
||||
}
|
||||
};
|
||||
|
||||
request.Body.Filters.AddRange(filters.Select(f => f.ToGrpcMessage()));
|
||||
request.Body.Filters.AddRange(args.Filters.Select(f => f.ToGrpcMessage()));
|
||||
|
||||
request.AddMetaHeader();
|
||||
var sessionToken = await GetOrCreateSession(args, ctx);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
new Address { ContainerId = request.Body.ContainerId },
|
||||
ObjectSessionContext.Types.Verb.Search,
|
||||
Context.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(Context.Key);
|
||||
|
||||
var objectsIds = SearchObjects(request, ctx);
|
||||
|
@ -123,25 +148,24 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
}
|
||||
}
|
||||
|
||||
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
|
||||
internal Task<ObjectId> PutObjectAsync(PrmPutObject args)
|
||||
{
|
||||
if (parameters.Header == null)
|
||||
throw new ArgumentException("Value cannot be null", nameof(parameters.Header));
|
||||
if (args.Header == null)
|
||||
throw new ArgumentException("Value cannot be null", nameof(args.Header));
|
||||
|
||||
if (parameters.Payload == null)
|
||||
throw new ArgumentException("Value cannot be null", nameof(parameters.Payload));
|
||||
if (args.Payload == null)
|
||||
throw new ArgumentException("Value cannot be null", nameof(args.Payload));
|
||||
|
||||
if (parameters.ClientCut)
|
||||
return PutClientCutObject(parameters, ctx);
|
||||
if (args.ClientCut)
|
||||
return PutClientCutObject(args);
|
||||
else
|
||||
return PutStreamObject(parameters, ctx);
|
||||
return PutStreamObject(args);
|
||||
}
|
||||
|
||||
internal async Task<ObjectId> PutSingleObjectAsync(FrostFsObject modelObject, Context ctx)
|
||||
internal async Task<ObjectId> PutSingleObjectAsync(PrmPutSingleObject args)
|
||||
{
|
||||
var sessionToken = await GetOrCreateSession(ctx);
|
||||
|
||||
var grpcObject = tools.CreateObject(modelObject);
|
||||
var ctx = args.Context!;
|
||||
var grpcObject = tools.CreateObject(args.FrostFsObject);
|
||||
|
||||
var request = new PutSingleRequest
|
||||
{
|
||||
|
@ -151,14 +175,14 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.AddObjectSessionToken(
|
||||
sessionToken,
|
||||
grpcObject.Header.ContainerId,
|
||||
grpcObject.ObjectId,
|
||||
var sessionToken = await GetOrCreateSession(args, ctx);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId},
|
||||
ObjectSessionContext.Types.Verb.Put,
|
||||
Context.Key
|
||||
);
|
||||
Context.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(Context.Key);
|
||||
|
||||
|
@ -169,17 +193,23 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
return ObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
|
||||
}
|
||||
|
||||
private async Task<ObjectId> PutClientCutObject(PutObjectParameters parameters, Context ctx)
|
||||
static readonly AsyncLocal<Session.SessionToken> asyncLocalSession = new ();
|
||||
|
||||
private async Task<ObjectId> PutClientCutObject(PrmPutObject args)
|
||||
{
|
||||
var payloadStream = parameters.Payload!;
|
||||
var header = parameters.Header!;
|
||||
var ctx = args.Context!;
|
||||
var tokenRaw = await GetOrCreateSession(args, ctx);
|
||||
var token = new ModelsV2.SessionToken(tokenRaw.Serialize());
|
||||
|
||||
var payloadStream = args.Payload!;
|
||||
var header = args.Header!;
|
||||
|
||||
ObjectId? objectId;
|
||||
List<ObjectId> sentObjectIds = [];
|
||||
|
||||
FrostFsObject? currentObject;
|
||||
|
||||
var networkSettings = await Context.Client.GetNetworkSettingsAsync(ctx);
|
||||
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmGetNetworkSettings(ctx));
|
||||
|
||||
var objectSize = (int)networkSettings.MaxObjectSize;
|
||||
|
||||
|
@ -208,27 +238,30 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
|
||||
split.Previous = sentObjectIds.LastOrDefault();
|
||||
|
||||
largeObject.AppendBlock(buffer, bytesCount);
|
||||
largeObject.Header.PayloadLength += (ulong)bytesCount;
|
||||
|
||||
currentObject = new FrostFsObject(header.ContainerId, bytesCount < objectSize ? buffer[..bytesCount] : buffer)
|
||||
currentObject = new FrostFsObject(header.ContainerId)
|
||||
.SetPayload(bytesCount < objectSize ? buffer[..bytesCount] : buffer)
|
||||
.SetSplit(split);
|
||||
|
||||
if (largeObject.PayloadLength == fullLength)
|
||||
break;
|
||||
|
||||
objectId = await PutSingleObjectAsync(currentObject, ctx);
|
||||
|
||||
objectId = await PutSingleObjectAsync(new PrmPutSingleObject(currentObject, ctx) { SessionToken = token });
|
||||
|
||||
sentObjectIds.Add(objectId!);
|
||||
}
|
||||
|
||||
if (sentObjectIds.Count != 0)
|
||||
{
|
||||
largeObject.AddAttributes(parameters.Header!.Attributes);
|
||||
largeObject.CalculateHash();
|
||||
|
||||
largeObject.AddAttributes(args.Header!.Attributes);
|
||||
|
||||
currentObject.SetParent(largeObject);
|
||||
|
||||
objectId = await PutSingleObjectAsync(currentObject, ctx);
|
||||
var putSingleObjectParams = new PrmPutSingleObject(currentObject, ctx) { SessionToken = token };
|
||||
|
||||
objectId = await PutSingleObjectAsync(putSingleObjectParams);
|
||||
|
||||
sentObjectIds.Add(objectId);
|
||||
|
||||
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
|
||||
|
@ -236,22 +269,21 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
|
||||
linkObject.Header.Attributes.Clear();
|
||||
|
||||
_ = await PutSingleObjectAsync(linkObject, ctx);
|
||||
_ = await PutSingleObjectAsync(new PrmPutSingleObject(linkObject, ctx){ SessionToken = token });
|
||||
|
||||
return tools.CalculateObjectId(largeObject.Header);
|
||||
}
|
||||
|
||||
currentObject.AddAttributes(parameters.Header!.Attributes);
|
||||
currentObject.AddAttributes(args.Header!.Attributes);
|
||||
|
||||
return await PutSingleObjectAsync(currentObject, ctx);
|
||||
return await PutSingleObjectAsync(new PrmPutSingleObject(currentObject, ctx));
|
||||
}
|
||||
|
||||
private async Task<ObjectId> PutStreamObject(PutObjectParameters parameters, Context ctx)
|
||||
private async Task<ObjectId> PutStreamObject(PrmPutObject args)
|
||||
{
|
||||
var payload = parameters.Payload!;
|
||||
var header = parameters.Header!;
|
||||
|
||||
var sessionToken = await GetOrCreateSession(ctx);
|
||||
var ctx = args.Context!;
|
||||
var payload = args.Payload!;
|
||||
var header = args.Header!;
|
||||
|
||||
var hdr = header.ToGrpcMessage();
|
||||
hdr.OwnerId = Context.Owner.ToGrpcMessage();
|
||||
|
@ -270,20 +302,21 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
}
|
||||
};
|
||||
|
||||
initRequest.AddMetaHeader();
|
||||
initRequest.AddObjectSessionToken(
|
||||
sessionToken,
|
||||
hdr.ContainerId,
|
||||
oid,
|
||||
var sessionToken = await GetOrCreateSession(args, ctx);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
new Address { ContainerId = hdr.ContainerId, ObjectId = oid },
|
||||
ObjectSessionContext.Types.Verb.Put,
|
||||
Context.Key
|
||||
);
|
||||
|
||||
initRequest.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
initRequest.Sign(Context.Key);
|
||||
|
||||
using var stream = await PutObjectInit(initRequest, ctx);
|
||||
|
||||
var bufferSize = parameters.BufferMaxSize > 0 ? parameters.BufferMaxSize : Constants.ObjectChunkSize;
|
||||
var bufferSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
|
||||
|
||||
if (payload.CanSeek)
|
||||
{
|
||||
|
@ -307,12 +340,13 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(buffer[..bytesCount]),
|
||||
Chunk = ByteString.CopyFrom(buffer.AsSpan()[..bytesCount]),
|
||||
},
|
||||
VerifyHeader = null
|
||||
};
|
||||
|
||||
chunkRequest.Sign(Context.Key);
|
||||
|
||||
await stream.Write(chunkRequest);
|
||||
}
|
||||
|
||||
|
@ -388,13 +422,13 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
|
|||
return new SearchReader(call);
|
||||
}
|
||||
|
||||
private async Task<Session.SessionToken> GetOrCreateSession(Context ctx)
|
||||
private async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
|
||||
{
|
||||
if (string.IsNullOrEmpty(ctx.SessionToken))
|
||||
if (args.SessionToken is null)
|
||||
{
|
||||
return await Context.Client.CreateSessionInternalAsync(uint.MaxValue, ctx);
|
||||
return await Context.Client.CreateSessionInternalAsync(new PrmCreateSession(uint.MaxValue, ctx));
|
||||
}
|
||||
|
||||
return Convert.FromBase64String(ctx.SessionToken).DeserializeSessionToken();
|
||||
|
||||
return new Session.SessionToken().Deserialize(args.SessionToken.Token);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue