frostfs-sdk-csharp/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
Pavel Gross 35fe791406 [#19] Client: Use specific classes for search
Signed-off-by: Pavel Gross <p.gross@yando.com>
2024-07-25 14:37:58 +03:00

429 lines
13 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Google.Protobuf;
using FrostFS.Object;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using FrostFS.Session;
using FrostFS.SDK.ModelsV2;
using FrostFS.SDK.ClientV2.Extensions;
using FrostFS.SDK.ClientV2.Parameters;
namespace FrostFS.SDK.ClientV2;
internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment ctx) : ContextAccessor(ctx), ISessionProvider
{
readonly ObjectTools tools = new(ctx);
readonly SessionProvider sessions = new (ctx);
public async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
{
return await sessions.GetOrCreateSession(args, ctx);
}
internal async Task<ObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
var ctx = args.Context!;
var request = new HeadRequest
{
Body = new HeadRequest.Types.Body
{
Address = new Address
{
ContainerId = args.ContainerId.ToGrpcMessage(),
ObjectId = args.ObjectId.ToGrpcMessage()
}
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Head,
Context.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
return response.Body.Header.Header.ToModel();
}
internal async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
{
var ctx = args.Context!;
var request = new GetRequest
{
Body = new GetRequest.Types.Body
{
Address = new Address
{
ContainerId = args.ContainerId.ToGrpcMessage(),
ObjectId = args.ObjectId.ToGrpcMessage()
}
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Get,
Context.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
return await GetObject(request, ctx);
}
internal async Task DeleteObjectAsync(PrmObjectDelete args)
{
var ctx = args.Context!;
var request = new DeleteRequest
{
Body = new DeleteRequest.Types.Body
{
Address = new Address
{
ContainerId = args.ContainerId.ToGrpcMessage(),
ObjectId = args.ObjectId.ToGrpcMessage()
}
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
ObjectSessionContext.Types.Verb.Delete,
Context.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
}
internal async IAsyncEnumerable<ObjectId> SearchObjectsAsync(PrmObjectSearch args)
{
var ctx = args.Context!;
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
{
ContainerId = args.ContainerId.ToGrpcMessage(),
Filters = { },
Version = 1 // TODO: clarify this param
}
};
request.Body.Filters.AddRange(args.Filters.Select(f => f.ToGrpcMessage()));
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = request.Body.ContainerId },
ObjectSessionContext.Types.Verb.Search,
Context.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
var objectsIds = SearchObjects(request, ctx);
await foreach (var oid in objectsIds)
{
yield return ObjectId.FromHash(oid.Value.ToByteArray());
}
}
internal Task<ObjectId> PutObjectAsync(PrmObjectPut args)
{
if (args.Header == null)
throw new ArgumentException("Value cannot be null", nameof(args.Header));
if (args.Payload == null)
throw new ArgumentException("Value cannot be null", nameof(args.Payload));
if (args.ClientCut)
return PutClientCutObject(args);
else
return PutStreamObject(args);
}
internal async Task<ObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
{
var ctx = args.Context!;
var grpcObject = tools.CreateObject(args.FrostFsObject);
var request = new PutSingleRequest
{
Body = new PutSingleRequest.Types.Body()
{
Object = grpcObject
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId},
ObjectSessionContext.Types.Verb.Put,
Context.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(Context.Key);
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
return ObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
}
private async Task<ObjectId> PutClientCutObject(PrmObjectPut args)
{
var ctx = args.Context!;
var tokenRaw = await GetOrCreateSession(args, ctx);
var token = new ModelsV2.SessionToken(tokenRaw.Serialize());
var payloadStream = args.Payload!;
var header = args.Header!;
ObjectId? objectId;
List<ObjectId> sentObjectIds = [];
FrostFsObject? currentObject;
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx));
var objectSize = (int)networkSettings.MaxObjectSize;
var fullLength = header.PayloadLength;
if (payloadStream.CanSeek)
{
objectSize = (int)Math.Min(objectSize, payloadStream.Length);
if (fullLength == 0)
fullLength = (ulong)payloadStream.Length;
}
if (fullLength == 0)
throw new ArgumentException("Payload stream must be able to seek or PayloadLength must be specified");
var buffer = new byte[objectSize];
var largeObject = new LargeObject(header.ContainerId);
var split = new Split();
while (true)
{
var bytesCount = await payloadStream.ReadAsync(buffer, 0, objectSize);
split.Previous = sentObjectIds.LastOrDefault();
largeObject.Header.PayloadLength += (ulong)bytesCount;
currentObject = new FrostFsObject(header.ContainerId)
.SetPayload(bytesCount < objectSize ? buffer[..bytesCount] : buffer)
.SetSplit(split);
if (largeObject.PayloadLength == fullLength)
break;
objectId = await PutSingleObjectAsync(new PrmSingleObjectPut(currentObject, ctx) { SessionToken = token });
sentObjectIds.Add(objectId!);
}
if (sentObjectIds.Count != 0)
{
largeObject.AddAttributes(args.Header!.Attributes);
currentObject.SetParent(largeObject);
var putSingleObjectParams = new PrmSingleObjectPut(currentObject, ctx) { SessionToken = token };
objectId = await PutSingleObjectAsync(putSingleObjectParams);
sentObjectIds.Add(objectId);
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
.AddChildren(sentObjectIds);
linkObject.Header.Attributes.Clear();
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject, ctx){ SessionToken = token });
return tools.CalculateObjectId(largeObject.Header);
}
currentObject
.SetSplit(null)
.AddAttributes(args.Header!.Attributes);
return await PutSingleObjectAsync(new PrmSingleObjectPut(currentObject, ctx));
}
private async Task<ObjectId> PutStreamObject(PrmObjectPut args)
{
var ctx = args.Context!;
var payload = args.Payload!;
var header = args.Header!;
var hdr = header.ToGrpcMessage();
hdr.OwnerId = Context.Owner.ToGrpcMessage();
hdr.Version = Context.Version.ToGrpcMessage();
var oid = new ObjectID { Value = hdr.Sha256() };
var initRequest = new PutRequest
{
Body = new PutRequest.Types.Body
{
Init = new PutRequest.Types.Body.Types.Init
{
Header = hdr
}
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = hdr.ContainerId, ObjectId = oid },
ObjectSessionContext.Types.Verb.Put,
Context.Key
);
initRequest.AddMetaHeader(args.XHeaders, sessionToken);
initRequest.Sign(Context.Key);
using var stream = await PutObjectInit(initRequest, ctx);
var bufferSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
if (payload.CanSeek)
{
bufferSize = (int)Math.Min(payload.Length, bufferSize);
}
else if (header.PayloadLength > 0)
{
bufferSize = (int)Math.Min((long)header.PayloadLength, bufferSize);
}
var buffer = new byte[bufferSize];
while (true)
{
var bytesCount = await payload.ReadAsync(buffer, 0, bufferSize, ctx.CancellationToken);
if (bytesCount == 0)
break;
var chunkRequest = new PutRequest(initRequest)
{
Body = new PutRequest.Types.Body
{
Chunk = ByteString.CopyFrom(buffer.AsSpan()[..bytesCount]),
},
VerifyHeader = null
};
chunkRequest.Sign(Context.Key);
await stream.Write(chunkRequest);
}
var response = await stream.Close();
Verifier.CheckResponse(response);
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
private async Task<FrostFsObject> GetObject(GetRequest request, Context ctx)
{
var reader = GetObjectInit(request, ctx);
var grpcObject = await reader.ReadHeader();
var modelObject = grpcObject.ToModel();
modelObject.ObjectReader = reader;
return modelObject;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
{
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
var call = client.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
return new ObjectReader(call);
}
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, Context ctx)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
await call.RequestStream.WriteAsync(initRequest);
return new ObjectStreamer(call);
}
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, Context ctx)
{
using var stream = GetSearchReader(request, ctx);
while (true)
{
var ids = await stream.Read(ctx.CancellationToken);
if (ids == null)
break;
foreach (var oid in ids)
{
yield return oid;
}
}
}
private SearchReader GetSearchReader(SearchRequest initRequest, Context ctx)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
var call = client.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
return new SearchReader(call);
}
}