[#26] All: Remove V2 from naming
Rename project, namespaces and class names Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
parent
c406df1a78
commit
766f61a5f7
219 changed files with 219 additions and 974 deletions
713
src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
Normal file
713
src/FrostFS.SDK.Client/Services/ObjectServiceProvider.cs
Normal file
|
@ -0,0 +1,713 @@
|
|||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using FrostFS.Object;
|
||||
using FrostFS.Refs;
|
||||
using FrostFS.SDK.Client;
|
||||
using FrostFS.SDK.Client.Mappers.GRPC;
|
||||
using FrostFS.SDK.Cryptography;
|
||||
using FrostFS.Session;
|
||||
|
||||
using Google.Protobuf;
|
||||
|
||||
namespace FrostFS.SDK.Client;
|
||||
|
||||
internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientContext clientCtx)
|
||||
: ContextAccessor(clientCtx), ISessionProvider
|
||||
{
|
||||
private SessionProvider? sessions;
|
||||
private readonly ObjectService.ObjectServiceClient client = client;
|
||||
|
||||
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, CallContext ctx)
|
||||
{
|
||||
sessions ??= new(ClientContext);
|
||||
|
||||
if (ClientContext.SessionCache.Cache != null &&
|
||||
ClientContext.SessionCache.Cache.ContainsKey(ClientContext.SessionCacheKey))
|
||||
{
|
||||
return (SessionToken)ClientContext.SessionCache.Cache[ClientContext.SessionCacheKey];
|
||||
}
|
||||
|
||||
return await sessions.GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
ctx.Key ??= ClientContext.Key?.ECDsaKey;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var request = new HeadRequest
|
||||
{
|
||||
Body = new HeadRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = args.ContainerId.ContainerID,
|
||||
ObjectId = args.ObjectId.ToMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Head,
|
||||
ctx.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return response.Body.Header.Header.ToModel();
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
ctx.Key ??= ClientContext.Key?.ECDsaKey;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var request = new GetRequest
|
||||
{
|
||||
Body = new GetRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = args.ContainerId.ToMessage(),
|
||||
ObjectId = args.ObjectId.ToMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Get,
|
||||
ctx.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
return await GetObject(request, ctx).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
internal async Task<RangeReader> GetRangeAsync(PrmRangeGet args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
ctx.Key ??= ClientContext.Key?.ECDsaKey;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var request = new GetRangeRequest
|
||||
{
|
||||
Body = new GetRangeRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = args.ContainerId.ToMessage(),
|
||||
ObjectId = args.ObjectId.ToMessage()
|
||||
},
|
||||
Range = new Object.Range
|
||||
{
|
||||
Offset = args.Range.Offset,
|
||||
Length = args.Range.Length
|
||||
},
|
||||
Raw = args.Raw
|
||||
}
|
||||
};
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Range,
|
||||
ctx.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
var call = client.GetRange(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
return new RangeReader(call);
|
||||
}
|
||||
|
||||
internal async Task<ReadOnlyMemory<byte>[]> GetRangeHashAsync(PrmRangeHashGet args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
ctx.Key ??= ClientContext.Key?.ECDsaKey;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var request = new GetRangeHashRequest
|
||||
{
|
||||
Body = new GetRangeHashRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = args.ContainerId.ToMessage(),
|
||||
ObjectId = args.ObjectId.ToMessage()
|
||||
},
|
||||
Type = ChecksumType.Sha256,
|
||||
Salt = ByteString.CopyFrom(args.Salt) // TODO: create a type with calculated cashed ByteString inside
|
||||
}
|
||||
};
|
||||
|
||||
foreach (var range in args.Ranges)
|
||||
{
|
||||
request.Body.Ranges.Add(new Object.Range
|
||||
{
|
||||
Length = range.Length,
|
||||
Offset = range.Offset
|
||||
});
|
||||
}
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Rangehash,
|
||||
ctx.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
var response = await client.GetRangeHashAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
var hashCollection = response.Body.HashList.Select(h => h.Memory).ToArray();
|
||||
|
||||
return hashCollection;
|
||||
}
|
||||
|
||||
|
||||
internal async Task DeleteObjectAsync(PrmObjectDelete args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
ctx.Key ??= ClientContext.Key?.ECDsaKey;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var request = new DeleteRequest
|
||||
{
|
||||
Body = new DeleteRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = args.ContainerId.ToMessage(),
|
||||
ObjectId = args.ObjectId.ToMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
request.Body.Address,
|
||||
ObjectSessionContext.Types.Verb.Delete,
|
||||
ctx.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
}
|
||||
|
||||
internal async IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var request = new SearchRequest
|
||||
{
|
||||
Body = new SearchRequest.Types.Body
|
||||
{
|
||||
ContainerId = args.ContainerId.ToMessage(),
|
||||
Version = 1 // TODO: clarify this param
|
||||
}
|
||||
};
|
||||
|
||||
request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage()));
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
new Address { ContainerId = request.Body.ContainerId },
|
||||
ObjectSessionContext.Types.Verb.Search,
|
||||
ctx.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
var objectsIds = SearchObjects(request, ctx);
|
||||
|
||||
await foreach (var oid in objectsIds)
|
||||
{
|
||||
yield return FrostFsObjectId.FromHash(oid.Value.ToByteArray());
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args)
|
||||
{
|
||||
if (args is null)
|
||||
throw new ArgumentNullException(nameof(args));
|
||||
|
||||
if (args.Header == null)
|
||||
throw new ArgumentNullException(nameof(args), "Header is null");
|
||||
|
||||
if (args.Payload == null)
|
||||
throw new ArgumentNullException(nameof(args), "Payload is null");
|
||||
|
||||
if (args.ClientCut)
|
||||
{
|
||||
return await PutClientCutObject(args).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (args.Header.PayloadLength > 0)
|
||||
args.FullLength = args.Header.PayloadLength;
|
||||
else if (args.Payload.CanSeek)
|
||||
args.FullLength = (ulong)args.Payload.Length;
|
||||
|
||||
var response = await PutStreamObject(args).ConfigureAwait(false);
|
||||
|
||||
return response.ObjectId;
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ctx);
|
||||
|
||||
var request = new PutSingleRequest
|
||||
{
|
||||
Body = new() { Object = grpcObject }
|
||||
};
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId },
|
||||
ObjectSessionContext.Types.Verb.Put,
|
||||
ctx.Key);
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var chunkSize = args.MaxPayloadPatchChunkLength;
|
||||
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
|
||||
|
||||
var call = client.Patch(null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
{
|
||||
// common
|
||||
chunkBuffer = ClientContext.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
|
||||
|
||||
var address = new Address
|
||||
{
|
||||
ObjectId = args.Address.ObjectId,
|
||||
ContainerId = args.Address.ContainerId
|
||||
};
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
address,
|
||||
ObjectSessionContext.Types.Verb.Patch,
|
||||
ctx.Key
|
||||
);
|
||||
|
||||
var request = new PatchRequest()
|
||||
{
|
||||
Body = new()
|
||||
{
|
||||
Address = address,
|
||||
ReplaceAttributes = args.ReplaceAttributes,
|
||||
}
|
||||
};
|
||||
|
||||
bool isFirstChunk = true;
|
||||
ulong currentPos = args.Range.Offset;
|
||||
|
||||
while (true)
|
||||
{
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (bytesCount == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (isFirstChunk && args.NewAttributes != null && args.NewAttributes.Length > 0)
|
||||
{
|
||||
foreach (var attr in args.NewAttributes)
|
||||
{
|
||||
request.Body.NewAttributes.Add(attr.ToMessage());
|
||||
}
|
||||
}
|
||||
|
||||
request.Body.Patch = new PatchRequest.Types.Body.Types.Patch
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount),
|
||||
SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount }
|
||||
};
|
||||
|
||||
currentPos += (ulong)bytesCount;
|
||||
|
||||
request.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
request.Sign(ctx.Key);
|
||||
|
||||
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
|
||||
|
||||
isFirstChunk = false;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
|
||||
|
||||
if (chunkBuffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
var response = await call.ResponseAsync.ConfigureAwait(false);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return response.Body.ObjectId.ToModel();
|
||||
}
|
||||
|
||||
private async Task<FrostFsObjectId> PutClientCutObject(PrmObjectPut args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var tokenRaw = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
var token = new FrostFsSessionToken(tokenRaw.Serialize(), tokenRaw.Body.Id.ToUuid());
|
||||
|
||||
args.SessionToken = token;
|
||||
|
||||
var payloadStream = args.Payload!;
|
||||
var header = args.Header!;
|
||||
|
||||
var fullLength = header.PayloadLength;
|
||||
|
||||
if (payloadStream.CanSeek && fullLength == 0)
|
||||
fullLength = (ulong)payloadStream.Length;
|
||||
|
||||
args.FullLength = fullLength;
|
||||
|
||||
if (args.MaxObjectSizeCache == 0)
|
||||
{
|
||||
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx))
|
||||
.ConfigureAwait(false);
|
||||
|
||||
args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
|
||||
}
|
||||
|
||||
var restBytes = fullLength - args.CurrentStreamPosition;
|
||||
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.MaxObjectSizeCache, restBytes) : args.MaxObjectSizeCache;
|
||||
|
||||
//define collection capacity
|
||||
var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0;
|
||||
var objectsCount = fullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
|
||||
|
||||
List<FrostFsObjectId> sentObjectIds = new(objectsCount);
|
||||
|
||||
FrostFsSplit? split = null;
|
||||
SplitId splitId = new();
|
||||
|
||||
// keep attributes for the large object
|
||||
var attributes = args.Header!.Attributes;
|
||||
args.Header!.Attributes = null;
|
||||
|
||||
// send all parts except the last one as separate Objects
|
||||
while (restBytes > (ulong)args.MaxObjectSizeCache)
|
||||
{
|
||||
split = new FrostFsSplit(splitId, sentObjectIds.LastOrDefault());
|
||||
|
||||
args.Header!.Split = split;
|
||||
|
||||
var result = await PutStreamObject(args).ConfigureAwait(false);
|
||||
|
||||
sentObjectIds.Add(result.ObjectId);
|
||||
|
||||
restBytes -= (ulong)result.ObjectSize;
|
||||
}
|
||||
|
||||
// send the last part and create linkObject
|
||||
if (sentObjectIds.Count > 0)
|
||||
{
|
||||
var largeObjectHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, [.. attributes])
|
||||
{
|
||||
PayloadLength = fullLength,
|
||||
};
|
||||
|
||||
args.Header.Split!.ParentHeader = largeObjectHeader;
|
||||
|
||||
var result = await PutStreamObject(args).ConfigureAwait(false);
|
||||
|
||||
sentObjectIds.Add(result.ObjectId);
|
||||
|
||||
var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds);
|
||||
|
||||
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject, args.Context)).ConfigureAwait(false);
|
||||
|
||||
var parentHeader = args.Header.GetHeader();
|
||||
|
||||
return parentHeader.Split!.Parent.ToModel();
|
||||
}
|
||||
|
||||
// We are here if the payload is placed to one Object. It means no cut action, just simple PUT.
|
||||
args.Header!.Attributes = attributes;
|
||||
|
||||
var singlePartResult = await PutStreamObject(args).ConfigureAwait(false);
|
||||
|
||||
return singlePartResult.ObjectId;
|
||||
}
|
||||
|
||||
struct PutObjectResult(FrostFsObjectId objectId, int objectSize)
|
||||
{
|
||||
public FrostFsObjectId ObjectId = objectId;
|
||||
public int ObjectSize = objectSize;
|
||||
}
|
||||
|
||||
private async Task<PutObjectResult> PutStreamObject(PrmObjectPut args)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
var payload = args.Payload!;
|
||||
|
||||
var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
|
||||
|
||||
var restBytes = args.FullLength - args.CurrentStreamPosition;
|
||||
|
||||
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
|
||||
|
||||
bool isRentBuffer = false;
|
||||
byte[]? chunkBuffer = null;
|
||||
|
||||
try
|
||||
{
|
||||
if (args.CustomBuffer != null)
|
||||
{
|
||||
chunkBuffer = args.CustomBuffer;
|
||||
}
|
||||
else
|
||||
{
|
||||
chunkBuffer = ClientContext.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
|
||||
isRentBuffer = true;
|
||||
}
|
||||
|
||||
var sentBytes = 0;
|
||||
|
||||
// 0 means no limit from client, so server side cut is performed
|
||||
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
||||
|
||||
using var stream = await GetUploadStream(args, ctx).ConfigureAwait(false);
|
||||
|
||||
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
|
||||
{
|
||||
// send chunks limited to default or user's settings
|
||||
var bufferSize = objectLimitSize > 0 ?
|
||||
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
|
||||
: chunkSize;
|
||||
|
||||
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (bytesCount == 0)
|
||||
break;
|
||||
|
||||
sentBytes += bytesCount;
|
||||
|
||||
var chunkRequest = new PutRequest
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount)
|
||||
}
|
||||
};
|
||||
|
||||
chunkRequest.Sign(ctx.Key);
|
||||
|
||||
await stream.Write(chunkRequest).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var response = await stream.Close().ConfigureAwait(false);
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (isRentBuffer && chunkBuffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(chunkBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<ObjectStreamer<PutRequest, PutResponse>> GetUploadStream(PrmObjectPut args, CallContext ctx)
|
||||
{
|
||||
var header = args.Header!;
|
||||
|
||||
if (ctx.Key == null)
|
||||
throw new ArgumentNullException(nameof(args), "Key is null");
|
||||
|
||||
header.OwnerId ??= ctx.OwnerId;
|
||||
header.Version ??= ctx.Version;
|
||||
|
||||
var grpcHeader = header.GetHeader();
|
||||
|
||||
if (header.Split != null)
|
||||
{
|
||||
ObjectTools.SetSplitValues(grpcHeader, header.Split, ctx);
|
||||
}
|
||||
|
||||
var oid = new ObjectID { Value = grpcHeader.Sha256() };
|
||||
|
||||
var initRequest = new PutRequest
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
Init = new PutRequest.Types.Body.Types.Init
|
||||
{
|
||||
Header = grpcHeader
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
sessionToken.CreateObjectTokenContext(
|
||||
new Address { ContainerId = grpcHeader.ContainerId, ObjectId = oid },
|
||||
ObjectSessionContext.Types.Verb.Put,
|
||||
ctx.Key
|
||||
);
|
||||
|
||||
initRequest.AddMetaHeader(args.XHeaders, sessionToken);
|
||||
|
||||
initRequest.Sign(ctx.Key);
|
||||
|
||||
return await PutObjectInit(initRequest, ctx).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<ObjectStreamer<PutRequest, PutResponse>> PutObjectInit(PutRequest initRequest, CallContext ctx)
|
||||
{
|
||||
if (initRequest is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false);
|
||||
|
||||
return new ObjectStreamer<PutRequest, PutResponse>(call);
|
||||
}
|
||||
|
||||
private async Task<FrostFsObject> GetObject(GetRequest request, CallContext ctx)
|
||||
{
|
||||
var reader = GetObjectInit(request, ctx);
|
||||
|
||||
var grpcObject = await reader.ReadHeader().ConfigureAwait(false);
|
||||
var modelObject = grpcObject.ToModel();
|
||||
|
||||
modelObject.ObjectReader = reader;
|
||||
|
||||
return modelObject;
|
||||
}
|
||||
|
||||
private ObjectReader GetObjectInit(GetRequest initRequest, CallContext ctx)
|
||||
{
|
||||
if (initRequest is null)
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
|
||||
var call = client.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
return new ObjectReader(call);
|
||||
}
|
||||
|
||||
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, CallContext ctx)
|
||||
{
|
||||
using var stream = GetSearchReader(request, ctx);
|
||||
|
||||
while (true)
|
||||
{
|
||||
var ids = await stream.Read(ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (ids == null)
|
||||
break;
|
||||
|
||||
foreach (var oid in ids)
|
||||
{
|
||||
yield return oid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SearchReader GetSearchReader(SearchRequest initRequest, CallContext ctx)
|
||||
{
|
||||
if (initRequest is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
var call = client.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
|
||||
return new SearchReader(call);
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue