400 lines
12 KiB
C#
400 lines
12 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
|
|
using Google.Protobuf;
|
|
|
|
using FrostFS.Object;
|
|
using FrostFS.Refs;
|
|
using FrostFS.SDK.ClientV2.Mappers.GRPC;
|
|
using FrostFS.SDK.Cryptography;
|
|
using FrostFS.Session;
|
|
using FrostFS.SDK.ModelsV2;
|
|
using FrostFS.SDK.ClientV2.Extensions;
|
|
|
|
namespace FrostFS.SDK.ClientV2;
|
|
|
|
internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment ctx) : ContextAccessor(ctx)
|
|
{
|
|
readonly ObjectTools tools = new(ctx);
|
|
|
|
internal async Task<ObjectHeader> GetObjectHeadAsync(ContainerId cid, ObjectId oid, Context ctx)
|
|
{
|
|
var request = new HeadRequest
|
|
{
|
|
Body = new HeadRequest.Types.Body
|
|
{
|
|
Address = new Address
|
|
{
|
|
ContainerId = cid.ToGrpcMessage(),
|
|
ObjectId = oid.ToGrpcMessage()
|
|
}
|
|
}
|
|
};
|
|
|
|
request.AddMetaHeader();
|
|
request.Sign(Context.Key);
|
|
|
|
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
|
|
|
Verifier.CheckResponse(response);
|
|
|
|
return response.Body.Header.Header.ToModel();
|
|
}
|
|
|
|
internal async Task<FrostFsObject> GetObjectAsync(ContainerId cid, ObjectId oid, Context ctx)
|
|
{
|
|
var sessionToken = await GetOrCreateSession(ctx);
|
|
|
|
var request = new GetRequest
|
|
{
|
|
Body = new GetRequest.Types.Body
|
|
{
|
|
Address = new Address
|
|
{
|
|
ContainerId = cid.ToGrpcMessage(),
|
|
ObjectId = oid.ToGrpcMessage()
|
|
}
|
|
}
|
|
};
|
|
|
|
request.AddMetaHeader();
|
|
request.AddObjectSessionToken(
|
|
sessionToken,
|
|
cid.ToGrpcMessage(),
|
|
oid.ToGrpcMessage(),
|
|
ObjectSessionContext.Types.Verb.Get,
|
|
Context.Key
|
|
);
|
|
|
|
request.Sign(Context.Key);
|
|
|
|
return await GetObject(request, ctx);
|
|
}
|
|
|
|
internal async Task DeleteObjectAsync(ContainerId cid, ObjectId oid, Context ctx)
|
|
{
|
|
var request = new DeleteRequest
|
|
{
|
|
Body = new DeleteRequest.Types.Body
|
|
{
|
|
Address = new Address
|
|
{
|
|
ContainerId = cid.ToGrpcMessage(),
|
|
ObjectId = oid.ToGrpcMessage()
|
|
}
|
|
}
|
|
};
|
|
|
|
request.AddMetaHeader();
|
|
request.Sign(Context.Key);
|
|
|
|
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
|
|
|
Verifier.CheckResponse(response);
|
|
}
|
|
|
|
internal async IAsyncEnumerable<ObjectId> SearchObjectsAsync(
|
|
ContainerId cid,
|
|
IEnumerable<ObjectFilter> filters,
|
|
Context ctx)
|
|
{
|
|
var request = new SearchRequest
|
|
{
|
|
Body = new SearchRequest.Types.Body
|
|
{
|
|
ContainerId = cid.ToGrpcMessage(),
|
|
Filters = { },
|
|
Version = 1 // TODO: clarify this param
|
|
}
|
|
};
|
|
|
|
request.Body.Filters.AddRange(filters.Select(f => f.ToGrpcMessage()));
|
|
|
|
request.AddMetaHeader();
|
|
request.Sign(Context.Key);
|
|
|
|
var objectsIds = SearchObjects(request, ctx);
|
|
|
|
await foreach (var oid in objectsIds)
|
|
{
|
|
yield return ObjectId.FromHash(oid.Value.ToByteArray());
|
|
}
|
|
}
|
|
|
|
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
|
|
{
|
|
if (parameters.Header == null)
|
|
throw new ArgumentException("Value cannot be null", nameof(parameters.Header));
|
|
|
|
if (parameters.Payload == null)
|
|
throw new ArgumentException("Value cannot be null", nameof(parameters.Payload));
|
|
|
|
if (parameters.ClientCut)
|
|
return PutClientCutObject(parameters, ctx);
|
|
else
|
|
return PutStreamObject(parameters, ctx);
|
|
}
|
|
|
|
internal async Task<ObjectId> PutSingleObjectAsync(FrostFsObject modelObject, Context ctx)
|
|
{
|
|
var sessionToken = await GetOrCreateSession(ctx);
|
|
|
|
var grpcObject = tools.CreateObject(modelObject);
|
|
|
|
var request = new PutSingleRequest
|
|
{
|
|
Body = new PutSingleRequest.Types.Body()
|
|
{
|
|
Object = grpcObject
|
|
}
|
|
};
|
|
|
|
request.AddMetaHeader();
|
|
request.AddObjectSessionToken(
|
|
sessionToken,
|
|
grpcObject.Header.ContainerId,
|
|
grpcObject.ObjectId,
|
|
ObjectSessionContext.Types.Verb.Put,
|
|
Context.Key
|
|
);
|
|
|
|
request.Sign(Context.Key);
|
|
|
|
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
|
|
|
Verifier.CheckResponse(response);
|
|
|
|
return ObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
|
|
}
|
|
|
|
private async Task<ObjectId> PutClientCutObject(PutObjectParameters parameters, Context ctx)
|
|
{
|
|
var payloadStream = parameters.Payload!;
|
|
var header = parameters.Header!;
|
|
|
|
ObjectId? objectId;
|
|
List<ObjectId> sentObjectIds = [];
|
|
|
|
FrostFsObject? currentObject;
|
|
|
|
var networkSettings = await Context.Client.GetNetworkSettingsAsync(ctx);
|
|
|
|
var objectSize = (int)networkSettings.MaxObjectSize;
|
|
|
|
var fullLength = header.PayloadLength;
|
|
|
|
if (payloadStream.CanSeek)
|
|
{
|
|
objectSize = (int)Math.Min(objectSize, payloadStream.Length);
|
|
|
|
if (fullLength == 0)
|
|
fullLength = (ulong)payloadStream.Length;
|
|
}
|
|
|
|
if (fullLength == 0)
|
|
throw new ArgumentException("Payload stream must be able to seek or PayloadLength must be specified");
|
|
|
|
var buffer = new byte[objectSize];
|
|
|
|
var largeObject = new LargeObject(header.ContainerId);
|
|
|
|
var split = new Split();
|
|
|
|
while (true)
|
|
{
|
|
var bytesCount = await payloadStream.ReadAsync(buffer, 0, objectSize);
|
|
|
|
split.Previous = sentObjectIds.LastOrDefault();
|
|
|
|
largeObject.AppendBlock(buffer, bytesCount);
|
|
|
|
currentObject = new FrostFsObject(header.ContainerId, bytesCount < objectSize ? buffer[..bytesCount] : buffer)
|
|
.SetSplit(split);
|
|
|
|
if (largeObject.PayloadLength == fullLength)
|
|
break;
|
|
|
|
objectId = await PutSingleObjectAsync(currentObject, ctx);
|
|
|
|
sentObjectIds.Add(objectId!);
|
|
}
|
|
|
|
if (sentObjectIds.Count != 0)
|
|
{
|
|
largeObject.AddAttributes(parameters.Header!.Attributes);
|
|
largeObject.CalculateHash();
|
|
|
|
currentObject.SetParent(largeObject);
|
|
|
|
objectId = await PutSingleObjectAsync(currentObject, ctx);
|
|
sentObjectIds.Add(objectId);
|
|
|
|
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
|
|
.AddChildren(sentObjectIds);
|
|
|
|
linkObject.Header.Attributes.Clear();
|
|
|
|
_ = await PutSingleObjectAsync(linkObject, ctx);
|
|
|
|
return tools.CalculateObjectId(largeObject.Header);
|
|
}
|
|
|
|
currentObject.AddAttributes(parameters.Header!.Attributes);
|
|
|
|
return await PutSingleObjectAsync(currentObject, ctx);
|
|
}
|
|
|
|
private async Task<ObjectId> PutStreamObject(PutObjectParameters parameters, Context ctx)
|
|
{
|
|
var payload = parameters.Payload!;
|
|
var header = parameters.Header!;
|
|
|
|
var sessionToken = await GetOrCreateSession(ctx);
|
|
|
|
var hdr = header.ToGrpcMessage();
|
|
hdr.OwnerId = Context.Owner.ToGrpcMessage();
|
|
hdr.Version = Context.Version.ToGrpcMessage();
|
|
|
|
var oid = new ObjectID { Value = hdr.Sha256() };
|
|
|
|
var initRequest = new PutRequest
|
|
{
|
|
Body = new PutRequest.Types.Body
|
|
{
|
|
Init = new PutRequest.Types.Body.Types.Init
|
|
{
|
|
Header = hdr
|
|
}
|
|
}
|
|
};
|
|
|
|
initRequest.AddMetaHeader();
|
|
initRequest.AddObjectSessionToken(
|
|
sessionToken,
|
|
hdr.ContainerId,
|
|
oid,
|
|
ObjectSessionContext.Types.Verb.Put,
|
|
Context.Key
|
|
);
|
|
|
|
initRequest.Sign(Context.Key);
|
|
|
|
using var stream = await PutObjectInit(initRequest, ctx);
|
|
|
|
var bufferSize = parameters.BufferMaxSize > 0 ? parameters.BufferMaxSize : Constants.ObjectChunkSize;
|
|
|
|
if (payload.CanSeek)
|
|
{
|
|
bufferSize = (int)Math.Min(payload.Length, bufferSize);
|
|
}
|
|
else if (header.PayloadLength > 0)
|
|
{
|
|
bufferSize = (int)Math.Min((long)header.PayloadLength, bufferSize);
|
|
}
|
|
|
|
var buffer = new byte[bufferSize];
|
|
|
|
while (true)
|
|
{
|
|
var bytesCount = await payload.ReadAsync(buffer, 0, bufferSize, ctx.CancellationToken);
|
|
|
|
if (bytesCount == 0)
|
|
break;
|
|
|
|
var chunkRequest = new PutRequest(initRequest)
|
|
{
|
|
Body = new PutRequest.Types.Body
|
|
{
|
|
Chunk = ByteString.CopyFrom(buffer[..bytesCount]),
|
|
},
|
|
VerifyHeader = null
|
|
};
|
|
|
|
chunkRequest.Sign(Context.Key);
|
|
await stream.Write(chunkRequest);
|
|
}
|
|
|
|
var response = await stream.Close();
|
|
Verifier.CheckResponse(response);
|
|
|
|
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
|
|
}
|
|
|
|
private async Task<FrostFsObject> GetObject(GetRequest request, Context ctx)
|
|
{
|
|
var reader = GetObjectInit(request, ctx);
|
|
|
|
var grpcObject = await reader.ReadHeader();
|
|
var modelObject = grpcObject.ToModel();
|
|
|
|
modelObject.ObjectReader = reader;
|
|
|
|
return modelObject;
|
|
}
|
|
|
|
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
|
|
{
|
|
if (initRequest is null)
|
|
throw new ArgumentNullException(nameof(initRequest));
|
|
|
|
var call = client.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
|
|
|
return new ObjectReader(call);
|
|
}
|
|
|
|
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, Context ctx)
|
|
{
|
|
if (initRequest is null)
|
|
{
|
|
throw new ArgumentNullException(nameof(initRequest));
|
|
}
|
|
|
|
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
|
|
|
|
await call.RequestStream.WriteAsync(initRequest);
|
|
|
|
return new ObjectStreamer(call);
|
|
}
|
|
|
|
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, Context ctx)
|
|
{
|
|
using var stream = GetSearchReader(request, ctx);
|
|
|
|
while (true)
|
|
{
|
|
var ids = await stream.Read(ctx.CancellationToken);
|
|
|
|
if (ids == null)
|
|
break;
|
|
|
|
foreach (var oid in ids)
|
|
{
|
|
yield return oid;
|
|
}
|
|
}
|
|
}
|
|
|
|
private SearchReader GetSearchReader(SearchRequest initRequest, Context ctx)
|
|
{
|
|
if (initRequest is null)
|
|
{
|
|
throw new ArgumentNullException(nameof(initRequest));
|
|
}
|
|
|
|
var call = client.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
|
|
|
return new SearchReader(call);
|
|
}
|
|
|
|
private async Task<Session.SessionToken> GetOrCreateSession(Context ctx)
|
|
{
|
|
if (string.IsNullOrEmpty(ctx.SessionToken))
|
|
{
|
|
return await Context.Client.CreateSessionInternalAsync(uint.MaxValue, ctx);
|
|
}
|
|
|
|
return Convert.FromBase64String(ctx.SessionToken).DeserializeSessionToken();
|
|
}
|
|
}
|