[#28] Clients: Make immutable parameters
Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
parent
749000a090
commit
9bb7b5eff8
62 changed files with 2742 additions and 963 deletions
|
@ -43,10 +43,8 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
return token;
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
|
||||
internal async Task<FrostFsHeaderResult> GetObjectHeadAsync(PrmObjectHeadGet args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var request = new HeadRequest
|
||||
{
|
||||
Body = new HeadRequest.Types.Body
|
||||
|
@ -55,7 +53,8 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
{
|
||||
ContainerId = args.ContainerId.ContainerID,
|
||||
ObjectId = args.ObjectId.ToMessage()
|
||||
}
|
||||
},
|
||||
Raw = args.Raw
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -70,17 +69,27 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
request.Sign(ClientContext.Key.ECDsaKey);
|
||||
|
||||
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
|
||||
var response = await client!.HeadAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return response.Body.Header.Header.ToModel();
|
||||
var result = new FrostFsHeaderResult();
|
||||
|
||||
if (response.Body.Header != null)
|
||||
{
|
||||
result.HeaderInfo = response.Body.Header?.Header.ToModel();
|
||||
}
|
||||
|
||||
if (response.Body.SplitInfo != null)
|
||||
{
|
||||
result.SplitInfo = new FrostFsSplitInfo(response.Body.SplitInfo);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
|
||||
internal async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var request = new GetRequest
|
||||
{
|
||||
Body = new GetRequest.Types.Body
|
||||
|
@ -107,10 +116,8 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
return await GetObject(request, ctx).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
internal async Task<RangeReader> GetRangeAsync(PrmRangeGet args)
|
||||
internal async Task<RangeReader> GetRangeAsync(PrmRangeGet args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var request = new GetRangeRequest
|
||||
{
|
||||
Body = new GetRangeRequest.Types.Body
|
||||
|
@ -140,14 +147,12 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
request.Sign(ClientContext.Key.ECDsaKey);
|
||||
|
||||
var call = client.GetRange(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.GetRange(request, null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
return new RangeReader(call);
|
||||
}
|
||||
|
||||
internal async Task<ReadOnlyMemory<byte>[]> GetRangeHashAsync(PrmRangeHashGet args)
|
||||
internal async Task<ReadOnlyMemory<byte>[]> GetRangeHashAsync(PrmRangeHashGet args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var request = new GetRangeHashRequest
|
||||
{
|
||||
Body = new GetRangeHashRequest.Types.Body
|
||||
|
@ -182,7 +187,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
request.Sign(ClientContext.Key.ECDsaKey);
|
||||
|
||||
var response = await client.GetRangeHashAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var response = await client.GetRangeHashAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
|
@ -191,11 +196,8 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
return hashCollection;
|
||||
}
|
||||
|
||||
|
||||
internal async Task DeleteObjectAsync(PrmObjectDelete args)
|
||||
internal async Task DeleteObjectAsync(PrmObjectDelete args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var request = new DeleteRequest
|
||||
{
|
||||
Body = new DeleteRequest.Types.Body
|
||||
|
@ -218,15 +220,13 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
request.AddMetaHeader(args.XHeaders, protoToken);
|
||||
request.Sign(ClientContext.Key.ECDsaKey);
|
||||
|
||||
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var response = await client.DeleteAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
}
|
||||
|
||||
internal async IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args)
|
||||
internal async IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var request = new SearchRequest
|
||||
{
|
||||
Body = new SearchRequest.Types.Body
|
||||
|
@ -265,11 +265,8 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
}
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args)
|
||||
internal async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args, CallContext ctx)
|
||||
{
|
||||
if (args is null)
|
||||
throw new ArgumentNullException(nameof(args));
|
||||
|
||||
if (args.Header == null)
|
||||
throw new ArgumentNullException(nameof(args), "Header is null");
|
||||
|
||||
|
@ -278,27 +275,25 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
if (args.ClientCut)
|
||||
{
|
||||
return await PutClientCutObject(args).ConfigureAwait(false);
|
||||
return await PutClientCutObject(args, ctx).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (args.Header.PayloadLength > 0)
|
||||
args.FullLength = args.Header.PayloadLength;
|
||||
args.PutObjectContext.FullLength = args.Header.PayloadLength;
|
||||
else if (args.Payload.CanSeek)
|
||||
args.FullLength = (ulong)args.Payload.Length;
|
||||
args.PutObjectContext.FullLength = (ulong)args.Payload.Length;
|
||||
else
|
||||
throw new ArgumentException("The stream does not have a length and payload length is not defined");
|
||||
|
||||
var response = await PutStreamObject(args).ConfigureAwait(false);
|
||||
var response = await PutStreamObject(args, ctx).ConfigureAwait(false);
|
||||
|
||||
return response.ObjectId;
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
|
||||
internal async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ClientContext);
|
||||
|
||||
var request = new PutSingleRequest
|
||||
|
@ -317,21 +312,19 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
|
||||
request.Sign(ClientContext.Key.ECDsaKey);
|
||||
|
||||
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
|
||||
var response = await client.PutSingleAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.Span);
|
||||
}
|
||||
|
||||
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args)
|
||||
internal async Task<FrostFsObjectId> PatchObjectAsync(PrmObjectPatch args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var chunkSize = args.MaxPayloadPatchChunkLength;
|
||||
var chunkSize = args.MaxChunkLength;
|
||||
Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null");
|
||||
|
||||
var call = client.Patch(null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
byte[]? chunkBuffer = null;
|
||||
try
|
||||
|
@ -415,36 +408,32 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
return response.Body.ObjectId.ToModel();
|
||||
}
|
||||
|
||||
private async Task<FrostFsObjectId> PutClientCutObject(PrmObjectPut args)
|
||||
private async Task<FrostFsObjectId> PutClientCutObject(PrmObjectPut args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
args.SessionToken ??= await GetDefaultSession(args, ctx).ConfigureAwait(false);
|
||||
|
||||
var payloadStream = args.Payload!;
|
||||
var header = args.Header!;
|
||||
|
||||
if (header.PayloadLength > 0)
|
||||
args.FullLength = header.PayloadLength;
|
||||
args.PutObjectContext.FullLength = header.PayloadLength;
|
||||
else if (payloadStream.CanSeek)
|
||||
args.FullLength = (ulong)payloadStream.Length;
|
||||
args.PutObjectContext.FullLength = (ulong)payloadStream.Length;
|
||||
else
|
||||
throw new ArgumentException("The stream does not have a length and payload length is not defined");
|
||||
|
||||
if (args.MaxObjectSizeCache == 0)
|
||||
if (args.PutObjectContext.MaxObjectSizeCache == 0)
|
||||
{
|
||||
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(new PrmNetworkSettings(ctx))
|
||||
var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
|
||||
args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
|
||||
}
|
||||
|
||||
var restBytes = args.FullLength - args.CurrentStreamPosition;
|
||||
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.MaxObjectSizeCache, restBytes) : args.MaxObjectSizeCache;
|
||||
var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition;
|
||||
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes) : args.PutObjectContext.MaxObjectSizeCache;
|
||||
|
||||
//define collection capacity
|
||||
var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0;
|
||||
var objectsCount = args.FullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
|
||||
var objectsCount = args.PutObjectContext.FullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0;
|
||||
|
||||
List<FrostFsObjectId> sentObjectIds = new(objectsCount);
|
||||
|
||||
|
@ -456,13 +445,13 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
args.Header!.Attributes = null;
|
||||
|
||||
// send all parts except the last one as separate Objects
|
||||
while (restBytes > (ulong)args.MaxObjectSizeCache)
|
||||
while (restBytes > (ulong)args.PutObjectContext.MaxObjectSizeCache)
|
||||
{
|
||||
split = new FrostFsSplit(splitId, sentObjectIds.LastOrDefault());
|
||||
|
||||
args.Header!.Split = split;
|
||||
|
||||
var result = await PutStreamObject(args).ConfigureAwait(false);
|
||||
var result = await PutStreamObject(args, default).ConfigureAwait(false);
|
||||
|
||||
sentObjectIds.Add(result.ObjectId);
|
||||
|
||||
|
@ -474,18 +463,18 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
{
|
||||
var largeObjectHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, [.. attributes])
|
||||
{
|
||||
PayloadLength = args.FullLength,
|
||||
PayloadLength = args.PutObjectContext.FullLength,
|
||||
};
|
||||
|
||||
args.Header.Split!.ParentHeader = largeObjectHeader;
|
||||
|
||||
var result = await PutStreamObject(args).ConfigureAwait(false);
|
||||
var result = await PutStreamObject(args, default).ConfigureAwait(false);
|
||||
|
||||
sentObjectIds.Add(result.ObjectId);
|
||||
|
||||
var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds);
|
||||
|
||||
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject, args.Context)).ConfigureAwait(false);
|
||||
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false);
|
||||
|
||||
var parentHeader = args.Header.GetHeader();
|
||||
|
||||
|
@ -495,7 +484,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
// We are here if the payload is placed to one Object. It means no cut action, just simple PUT.
|
||||
args.Header!.Attributes = attributes;
|
||||
|
||||
var singlePartResult = await PutStreamObject(args).ConfigureAwait(false);
|
||||
var singlePartResult = await PutStreamObject(args, default).ConfigureAwait(false);
|
||||
|
||||
return singlePartResult.ObjectId;
|
||||
}
|
||||
|
@ -506,15 +495,13 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
public int ObjectSize = objectSize;
|
||||
}
|
||||
|
||||
private async Task<PutObjectResult> PutStreamObject(PrmObjectPut args)
|
||||
private async Task<PutObjectResult> PutStreamObject(PrmObjectPut args, CallContext ctx)
|
||||
{
|
||||
var ctx = args.Context!;
|
||||
|
||||
var payload = args.Payload!;
|
||||
|
||||
var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
|
||||
|
||||
var restBytes = args.FullLength - args.CurrentStreamPosition;
|
||||
var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition;
|
||||
|
||||
chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize);
|
||||
|
||||
|
@ -524,7 +511,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
try
|
||||
{
|
||||
// 0 means no limit from client, so server side cut is performed
|
||||
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
|
||||
var objectLimitSize = args.ClientCut ? args.PutObjectContext.MaxObjectSizeCache : 0;
|
||||
|
||||
if (args.CustomBuffer != null)
|
||||
{
|
||||
|
@ -634,7 +621,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.Put(null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false);
|
||||
|
||||
|
@ -658,7 +645,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
if (initRequest is null)
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
|
||||
var call = client.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.Get(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
return new ObjectReader(call);
|
||||
}
|
||||
|
@ -670,7 +657,7 @@ internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient cl
|
|||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
var call = client.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
|
||||
var call = client.Search(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken);
|
||||
|
||||
return new SearchReader(call);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue