frostfs-sdk-csharp/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
Pavel Gross f5d1899dd2
All checks were successful
DCO / DCO (pull_request) Successful in 42s
[#13] Change GetObject result to stream
Signed-off-by: Pavel Gross <p.gross@yadro.com>
2024-06-26 15:15:58 +03:00

493 lines
14 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using Google.Protobuf;
using System.Threading.Tasks;
using FrostFS.Object;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using FrostFS.Session;
using FrostFS.SDK.ModelsV2;
using FrostFS.SDK.ClientV2.Extensions;
namespace FrostFS.SDK.ClientV2;
internal class ObjectServiceProvider : ContextAccessor
{
private readonly ObjectService.ObjectServiceClient objectServiceClient;
internal ObjectServiceProvider(ObjectService.ObjectServiceClient objectServiceClient, ClientEnvironment context)
: base (context)
{
this.objectServiceClient = objectServiceClient;
}
internal async Task<ObjectHeader> GetObjectHeadAsync(ContainerId cid, ObjectId oid, Context ctx)
{
var request = new HeadRequest
{
Body = new HeadRequest.Types.Body
{
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
}
}
};
request.AddMetaHeader();
request.Sign(Context.Key);
var response = await objectServiceClient!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
return response.Body.Header.Header.ToModel();
}
internal async Task<ModelsV2.Object> GetObjectAsync(ContainerId cid, ObjectId oid, Context ctx)
{
var sessionToken = await GetOrCreateSession(ctx);
var request = new GetRequest
{
Body = new GetRequest.Types.Body
{
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
}
}
};
request.AddMetaHeader();
request.AddObjectSessionToken(
sessionToken,
cid.ToGrpcMessage(),
oid.ToGrpcMessage(),
ObjectSessionContext.Types.Verb.Get,
Context.Key
);
request.Sign(Context.Key);
var obj = await GetObject(request, ctx);
return obj;
}
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
{
if (parameters.Header == null)
throw new ArgumentException("Value cannot be null", nameof(parameters.Header));
if (parameters.Payload == null)
throw new ArgumentException("Value cannot be null", nameof(parameters.Payload));
if (parameters.ClientCut)
return PutClientCutObject(parameters, ctx);
else
return PutStreamObject(parameters, ctx);
}
internal async Task<ObjectId> PutSingleObjectAsync(ModelsV2.Object @object, Context ctx)
{
var sessionToken = await GetOrCreateSession(ctx);
var obj = CreateObject(@object);
var request = new PutSingleRequest
{
Body = new PutSingleRequest.Types.Body()
{
Object = obj
}
};
request.AddMetaHeader();
request.AddObjectSessionToken(
sessionToken,
obj.Header.ContainerId,
obj.ObjectId,
ObjectSessionContext.Types.Verb.Put,
Context.Key
);
request.Sign(Context.Key);
var response = await objectServiceClient!.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
return ObjectId.FromHash(obj.ObjectId.Value.ToByteArray());
}
internal ObjectId CalculateObjectId(ObjectHeader header)
{
var grpcHeader = CreateHeader(header, []);
return new ObjectID { Value = grpcHeader.Sha256() }.ToModel();
}
internal async Task DeleteObjectAsync(ContainerId cid, ObjectId oid, Context ctx)
{
var request = new DeleteRequest
{
Body = new DeleteRequest.Types.Body
{
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
}
}
};
request.AddMetaHeader();
request.Sign(Context.Key);
var response = await objectServiceClient!.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
}
internal async IAsyncEnumerable<ObjectId> SearchObjectsAsync(
ContainerId cid,
IEnumerable<ObjectFilter> filters,
Context ctx)
{
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
{
ContainerId = cid.ToGrpcMessage(),
Filters = { },
Version = 1 // TODO: clarify this param
}
};
request.Body.Filters.AddRange(filters.Select(f => f.ToGrpcMessage()));
request.AddMetaHeader();
request.Sign(Context.Key);
var objectsIds = SearchObjects(request, ctx);
await foreach (var oid in objectsIds)
{
yield return ObjectId.FromHash(oid.Value.ToByteArray());
}
}
private async Task<ObjectId> PutClientCutObject(PutObjectParameters parameters, Context ctx)
{
var payloadStream = parameters.Payload!;
var header = parameters.Header!;
ObjectId? objectId;
List<ObjectId> sentObjectIds = [];
ModelsV2.Object? currentObject;
var networkSettings = await Context.NetmapService!.GetNetworkSettingsAsync(ctx);
var partSize = (int)networkSettings.MaxObjectSize;
var buffer = new byte[partSize];
var largeObject = new LargeObject(header.ContainerId);
var split = new Split();
var fullLength = (ulong)payloadStream.Length;
while (true)
{
var bytesCount = await payloadStream.ReadAsync(buffer, 0, partSize);
split.Previous = sentObjectIds.LastOrDefault();
largeObject.AppendBlock(buffer, bytesCount);
currentObject = new ModelsV2.Object(header.ContainerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer)
.AddAttributes(header.Attributes)
.SetSplit(split);
if (largeObject.PayloadLength == fullLength)
break;
objectId = await PutSingleObjectAsync(currentObject, ctx);
sentObjectIds.Add(objectId!);
}
if (sentObjectIds.Count != 0)
{
largeObject.CalculateHash();
currentObject.SetParent(largeObject);
objectId = await PutSingleObjectAsync(currentObject, ctx);
sentObjectIds.Add(objectId);
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
.AddChildren(sentObjectIds);
_ = await PutSingleObjectAsync(linkObject, ctx);
return CalculateObjectId(largeObject.Header);
}
return await PutSingleObjectAsync(currentObject, ctx);
}
private async Task<ObjectId> PutStreamObject(PutObjectParameters parameters, Context ctx)
{
var payload = parameters.Payload!;
var header = parameters.Header!;
var sessionToken = await GetOrCreateSession(ctx);
var hdr = header.ToGrpcMessage();
hdr.OwnerId = Context.Owner.ToGrpcMessage();
hdr.Version = Context.Version.ToGrpcMessage();
var oid = new ObjectID
{
Value = hdr.Sha256()
};
var request = new PutRequest
{
Body = new PutRequest.Types.Body
{
Init = new PutRequest.Types.Body.Types.Init
{
Header = hdr
},
}
};
request.AddMetaHeader();
request.AddObjectSessionToken(
sessionToken,
hdr.ContainerId,
oid,
ObjectSessionContext.Types.Verb.Put,
Context.Key
);
request.Sign(Context.Key);
using var stream = await PutObjectInit(request, ctx);
var buffer = new byte[Constants.ObjectChunkSize];
while (true)
{
var bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize, ctx.CancellationToken);
if (bufferLength == 0)
break;
request.Body = new PutRequest.Types.Body
{
Chunk = ByteString.CopyFrom(buffer[..bufferLength]),
};
request.VerifyHeader = null;
request.Sign(Context.Key);
await stream.Write(request);
}
var response = await stream.Close();
Verifier.CheckResponse(response);
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
// TODO: add implementation with stream writer!
private async Task<ModelsV2.Object> GetObject(GetRequest request, Context ctx)
{
var reader = GetObjectInit(request, ctx);
var obj = await reader.ReadHeader();
var @object = obj.ToModel();
@object.ObjectReader = reader;
return @object;
// obj.
// return obj.ToModel();
// var payload = new byte[obj.Header.PayloadLength];
// var offset = 0L;
// var chunk = await stream.ReadChunk();
// while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
// {
// var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
// Array.Copy(chunk, 0, payload, offset, length);
// offset += chunk.Length;
// chunk = await stream.ReadChunk();
// }
// obj.Payload = ByteString.CopyFrom(payload);
// return obj;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
{
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
var call = objectServiceClient!.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
return new ObjectReader(call);
}
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest, Context ctx)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
var call = objectServiceClient!.Put(null, ctx.Deadline, ctx.CancellationToken);
await call.RequestStream.WriteAsync(initRequest);
return new ObjectStreamer(call);
}
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, Context ctx)
{
using var stream = GetSearchReader(request, ctx);
while (true)
{
var ids = await stream.Read(ctx.CancellationToken);
if (ids == null)
break;
foreach (var oid in ids)
{
yield return oid;
}
}
}
private SearchReader GetSearchReader(SearchRequest initRequest, Context ctx)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
var call = objectServiceClient!.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
return new SearchReader(call);
}
private Object.Object CreateObject(ModelsV2.Object @object)
{
var grpcHeader = @object.Header.ToGrpcMessage();
grpcHeader.OwnerId = Context.Owner.ToGrpcMessage();
grpcHeader.Version = Context.Version.ToGrpcMessage();
if (@object.Payload != null)
{
grpcHeader.PayloadLength = (ulong)@object.Payload.Length;
grpcHeader.PayloadHash = Sha256Checksum(@object.Payload);
}
var split = @object.Header.Split;
if (split != null)
{
grpcHeader.Split = new Header.Types.Split
{
SplitId = split.SplitId != null ? ByteString.CopyFrom(split.SplitId.ToBinary()) : null
};
if (split.Children != null && split.Children.Count != 0)
grpcHeader.Split.Children.AddRange(split.Children.Select(id => id.ToGrpcMessage()));
if (split.ParentHeader is not null)
{
var grpcParentHeader = CreateHeader(split.ParentHeader, []);
grpcHeader.Split.Parent = new ObjectID { Value = grpcParentHeader.Sha256() };
grpcHeader.Split.ParentHeader = grpcParentHeader;
grpcHeader.Split.ParentSignature = new Refs.Signature
{
Key = ByteString.CopyFrom(Context.Key.PublicKey()),
Sign = ByteString.CopyFrom(Context.Key.SignData(grpcHeader.Split.Parent.ToByteArray())),
};
split.Parent = grpcHeader.Split.Parent.ToModel();
}
grpcHeader.Split.Previous = split.Previous?.ToGrpcMessage();
}
var obj = new Object.Object
{
Header = grpcHeader,
ObjectId = new ObjectID { Value = grpcHeader.Sha256() },
Payload = ByteString.CopyFrom(@object.Payload)
};
obj.Signature = new Refs.Signature
{
Key = ByteString.CopyFrom(Context.Key.PublicKey()),
Sign = ByteString.CopyFrom(Context.Key.SignData(obj.ObjectId.ToByteArray())),
};
return obj;
}
private Header CreateHeader(ObjectHeader header, byte[]? payload)
{
var grpcHeader = header.ToGrpcMessage();
grpcHeader.OwnerId = Context.Owner.ToGrpcMessage();
grpcHeader.Version = Context.Version.ToGrpcMessage();
if (header.PayloadCheckSum != null)
grpcHeader.PayloadHash = Sha256Checksum(header.PayloadCheckSum);
else if (payload != null)
grpcHeader.PayloadHash = Sha256Checksum(payload);
return grpcHeader;
}
private static Checksum Sha256Checksum(byte[] data)
{
return new Checksum
{
Type = ChecksumType.Sha256,
Sum = ByteString.CopyFrom(data.Sha256())
};
}
private async Task<Session.SessionToken> GetOrCreateSession(Context ctx)
{
if (string.IsNullOrEmpty(ctx.SessionToken))
{
return await Context.SessionService!.CreateSessionAsync(uint.MaxValue, ctx);
}
return Convert.FromBase64String(ctx.SessionToken).DeserializeSessionToken();
}
}