[#13] Client: Use code analyzers

Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
Pavel Gross 2024-09-23 18:53:21 +03:00
parent d7dbbf8da8
commit d1271df207
102 changed files with 2168 additions and 733 deletions

View file

@ -6,9 +6,8 @@ using System.Threading.Tasks;
using FrostFS.Object;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Extensions;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using FrostFS.Session;
@ -16,19 +15,30 @@ using Google.Protobuf;
namespace FrostFS.SDK.ClientV2;
internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment env)
: ContextAccessor(env), ISessionProvider
internal sealed class ObjectServiceProvider : ContextAccessor, ISessionProvider
{
readonly SessionProvider sessions = new (env);
private readonly SessionProvider sessions;
private ObjectService.ObjectServiceClient client;
public async ValueTask<Session.SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
internal ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment env)
: base(env)
{
return await sessions.GetOrCreateSession(args, ctx);
this.sessions = new(Context);
this.client = client;
}
public async ValueTask<SessionToken> GetOrCreateSession(ISessionToken args, Context ctx)
{
return await sessions.GetOrCreateSession(args, ctx).ConfigureAwait(false);
}
internal async Task<FrostFsObjectHeader> GetObjectHeadAsync(PrmObjectHeadGet args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
var request = new HeadRequest
{
@ -42,7 +52,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
@ -53,8 +63,8 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
request.Sign(ctx.Key);
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken);
var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
Verifier.CheckResponse(response);
return response.Body.Header.Header.ToModel();
@ -63,7 +73,12 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
internal async Task<FrostFsObject> GetObjectAsync(PrmObjectGet args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
var request = new GetRequest
{
Body = new GetRequest.Types.Body
@ -76,7 +91,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
@ -87,12 +102,17 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
request.Sign(ctx.Key);
return await GetObject(request, ctx);
return await GetObject(request, ctx).ConfigureAwait(false);
}
internal async Task DeleteObjectAsync(PrmObjectDelete args)
{
var ctx = args.Context!;
ctx.Key ??= Context.Key?.ECDsaKey;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
var request = new DeleteRequest
{
Body = new DeleteRequest.Types.Body
@ -105,7 +125,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
request.Body.Address,
@ -116,26 +136,29 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
request.Sign(ctx.Key);
var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken);
Verifier.CheckResponse(response);
}
internal async IAsyncEnumerable<FrostFsObjectId> SearchObjectsAsync(PrmObjectSearch args)
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
{
ContainerId = args.ContainerId.ToMessage(),
Filters = { },
Version = 1 // TODO: clarify this param
}
};
request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage()));
var sessionToken = await GetOrCreateSession(args, ctx);
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = request.Body.ContainerId },
@ -143,7 +166,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
ctx.Key);
request.AddMetaHeader(args.XHeaders, sessionToken);
request.Sign(ctx.Key);
var objectsIds = SearchObjects(request, ctx);
@ -156,14 +179,17 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
internal async Task<FrostFsObjectId> PutObjectAsync(PrmObjectPut args)
{
if (args is null)
throw new ArgumentNullException(nameof(args));
if (args.Header == null)
throw new ArgumentException("Value cannot be null", nameof(args.Header));
throw new ArgumentException(nameof(args.Header));
if (args.Payload == null)
throw new ArgumentException("Value cannot be null", nameof(args.Payload));
throw new ArgumentException(nameof(args.Payload));
if (args.ClientCut)
return await PutClientCutObject(args);
return await PutClientCutObject(args).ConfigureAwait(false);
else
{
if (args.Header.PayloadLength > 0)
@ -171,24 +197,28 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
else if (args.Payload.CanSeek)
args.FullLength = (ulong)args.Payload.Length;
return (await PutStreamObject(args)).ObjectId;
return (await PutStreamObject(args).ConfigureAwait(false)).ObjectId;
}
}
internal async Task<FrostFsObjectId> PutSingleObjectAsync(PrmSingleObjectPut args)
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ctx);
var request = new PutSingleRequest
{
Body = new () { Object = grpcObject }
Body = new() { Object = grpcObject }
};
var sessionToken = await GetOrCreateSession(args, ctx);
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId},
new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId },
ObjectSessionContext.Types.Verb.Put,
ctx.Key);
@ -196,18 +226,18 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
request.Sign(ctx.Key);
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken);
var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken).ConfigureAwait(false);
Verifier.CheckResponse(response);
return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray());
}
private async Task<FrostFsObjectId> PutClientCutObject(PrmObjectPut args)
{
var ctx = args.Context!;
var tokenRaw = await GetOrCreateSession(args, ctx);
var tokenRaw = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
var token = new FrostFsSessionToken(tokenRaw.Serialize());
args.SessionToken = token;
@ -216,7 +246,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
var header = args.Header!;
var fullLength = header.PayloadLength;
if (payloadStream.CanSeek && fullLength == 0)
fullLength = (ulong)payloadStream.Length;
@ -224,12 +254,14 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
if (args.MaxObjectSizeCache == 0)
{
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings() { Context = ctx });
var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings() { Context = ctx })
.ConfigureAwait(false);
args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize;
}
var restBytes = fullLength - args.CurrentStreamPosition;
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.MaxObjectSizeCache, restBytes) : args.MaxObjectSizeCache;
var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.MaxObjectSizeCache, restBytes) : args.MaxObjectSizeCache;
//define collection capacity
var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0;
@ -238,23 +270,20 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
List<FrostFsObjectId> sentObjectIds = new(objectsCount);
FrostFsSplit? split = null;
SplitId splitId = new();
// keep attributes for the large object
var attributes = args.Header!.Attributes;
args.Header!.Attributes = null;
// send all parts except the last one as separate Objects
while (restBytes > (ulong)args.MaxObjectSizeCache)
while (restBytes > (ulong)args.MaxObjectSizeCache)
{
if (split == null)
{
split = new FrostFsSplit();
args.Header!.Attributes = [];
}
split = new FrostFsSplit(splitId, sentObjectIds.LastOrDefault());
split!.Previous = sentObjectIds.LastOrDefault();
args.Header!.Split = split;
var result = await PutStreamObject(args);
var result = await PutStreamObject(args).ConfigureAwait(false);
sentObjectIds.Add(result.ObjectId);
@ -264,26 +293,30 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
// send the last part and create linkObject
if (sentObjectIds.Count > 0)
{
var largeObjectHeader = new FrostFsObjectHeader(header.ContainerId) { PayloadLength = fullLength };
largeObjectHeader.Attributes.AddRange(attributes);
var largeObjectHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, [.. attributes])
{
PayloadLength = fullLength,
};
args.Header.Split!.ParentHeader = largeObjectHeader;
var result = await PutStreamObject(args);
var result = await PutStreamObject(args).ConfigureAwait(false);
sentObjectIds.Add(result.ObjectId);
var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader)
.AddChildren(sentObjectIds);
var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds);
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject) { Context = args.Context});
_ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject) { Context = args.Context }).ConfigureAwait(false);
return split.Parent!;
var parentHeader = args.Header.GetHeader();
return parentHeader.Split!.Parent.ToModel();
}
// We are here if the payload is placed to one Object. It means no cut action, just simple PUT.
var singlePartResult = await PutStreamObject(args);
args.Header!.Attributes = attributes;
var singlePartResult = await PutStreamObject(args).ConfigureAwait(false);
return singlePartResult.ObjectId;
}
@ -297,6 +330,9 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
private async Task<PutObjectResult> PutStreamObject(PrmObjectPut args)
{
var ctx = args.Context!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
var payload = args.Payload!;
var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize;
@ -307,7 +343,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
bool isRentBuffer = false;
byte[]? chunkBuffer = null;
try
{
if (args.CustomBuffer != null)
@ -316,7 +352,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
}
else
{
chunkBuffer = env.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
chunkBuffer = Context.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize);
isRentBuffer = true;
}
@ -325,7 +361,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
// 0 means no limit from client, so server side cut is performed
var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0;
var stream = await GetUploadStream(args, ctx);
using var stream = await GetUploadStream(args, ctx).ConfigureAwait(false);
while (objectLimitSize == 0 || sentBytes < objectLimitSize)
{
@ -334,7 +370,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
(int)Math.Min(objectLimitSize - sentBytes, chunkSize)
: chunkSize;
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken);
var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken).ConfigureAwait(false);
if (bytesCount == 0)
break;
@ -351,10 +387,10 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
chunkRequest.Sign(ctx.Key);
await stream.Write(chunkRequest);
await stream.Write(chunkRequest).ConfigureAwait(false);
}
var response = await stream.Close();
var response = await stream.Close().ConfigureAwait(false);
Verifier.CheckResponse(response);
return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes);
@ -372,6 +408,9 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
{
var header = args.Header!;
if (ctx.Key == null)
throw new InvalidObjectException(nameof(ctx.Key));
header.OwnerId ??= ctx.OwnerId;
header.Version ??= ctx.Version;
@ -395,7 +434,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
}
};
var sessionToken = await GetOrCreateSession(args, ctx);
var sessionToken = await GetOrCreateSession(args, ctx).ConfigureAwait(false);
sessionToken.CreateObjectTokenContext(
new Address { ContainerId = grpcHeader.ContainerId, ObjectId = oid },
@ -407,19 +446,19 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
initRequest.Sign(ctx.Key);
return await PutObjectInit(initRequest, ctx);
return await PutObjectInit(initRequest, ctx).ConfigureAwait(false);
}
private async Task<FrostFsObject> GetObject(GetRequest request, Context ctx)
{
var reader = GetObjectInit(request, ctx);
var grpcObject = await reader.ReadHeader();
var grpcObject = await reader.ReadHeader().ConfigureAwait(false);
var modelObject = grpcObject.ToModel();
modelObject.ObjectReader = reader;
return modelObject;
return modelObject;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
@ -428,7 +467,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
throw new ArgumentNullException(nameof(initRequest));
var call = client.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken);
return new ObjectReader(call);
}
@ -441,7 +480,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
var call = client.Put(null, ctx.Deadline, ctx.CancellationToken);
await call.RequestStream.WriteAsync(initRequest);
await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false);
return new ObjectStreamer(call);
}
@ -449,14 +488,14 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request, Context ctx)
{
using var stream = GetSearchReader(request, ctx);
while (true)
{
var ids = await stream.Read(ctx.CancellationToken);
var ids = await stream.Read(ctx.CancellationToken).ConfigureAwait(false);
if (ids == null)
break;
foreach (var oid in ids)
{
yield return oid;
@ -472,7 +511,7 @@ internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, C
}
var call = client.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken);
return new SearchReader(call);
}
}
}