using System; using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using FrostFS.Object; using FrostFS.Refs; using FrostFS.SDK.ClientV2.Extensions; using FrostFS.SDK.ClientV2.Mappers.GRPC; using FrostFS.SDK.ClientV2; using FrostFS.SDK.Cryptography; using FrostFS.Session; using Google.Protobuf; namespace FrostFS.SDK.ClientV2; internal class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientEnvironment env) : ContextAccessor(env), ISessionProvider { readonly SessionProvider sessions = new (env); public async ValueTask GetOrCreateSession(ISessionToken args, Context ctx) { return await sessions.GetOrCreateSession(args, ctx); } internal async Task GetObjectHeadAsync(PrmObjectHeadGet args) { var ctx = args.Context!; var request = new HeadRequest { Body = new HeadRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ContainerID, ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = await GetOrCreateSession(args, ctx); sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Head, ctx.Key); request.AddMetaHeader(args.XHeaders, sessionToken); request.Sign(ctx.Key); var response = await client!.HeadAsync(request, null, ctx.Deadline, ctx.CancellationToken); Verifier.CheckResponse(response); return response.Body.Header.Header.ToModel(); } internal async Task GetObjectAsync(PrmObjectGet args) { var ctx = args.Context!; var request = new GetRequest { Body = new GetRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = await GetOrCreateSession(args, ctx); sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Get, ctx.Key); request.AddMetaHeader(args.XHeaders, sessionToken); request.Sign(ctx.Key); return await GetObject(request, ctx); } internal async Task DeleteObjectAsync(PrmObjectDelete args) { var ctx = args.Context!; var request = new DeleteRequest { Body = new DeleteRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = await GetOrCreateSession(args, ctx); sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Delete, ctx.Key); request.AddMetaHeader(args.XHeaders, sessionToken); request.Sign(ctx.Key); var response = await client.DeleteAsync(request, null, ctx.Deadline, ctx.CancellationToken); Verifier.CheckResponse(response); } internal async IAsyncEnumerable SearchObjectsAsync(PrmObjectSearch args) { var ctx = args.Context!; var request = new SearchRequest { Body = new SearchRequest.Types.Body { ContainerId = args.ContainerId.ToMessage(), Filters = { }, Version = 1 // TODO: clarify this param } }; request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage())); var sessionToken = await GetOrCreateSession(args, ctx); sessionToken.CreateObjectTokenContext( new Address { ContainerId = request.Body.ContainerId }, ObjectSessionContext.Types.Verb.Search, ctx.Key); request.AddMetaHeader(args.XHeaders, sessionToken); request.Sign(ctx.Key); var objectsIds = SearchObjects(request, ctx); await foreach (var oid in objectsIds) { yield return FrostFsObjectId.FromHash(oid.Value.ToByteArray()); } } internal async Task PutObjectAsync(PrmObjectPut args) { if (args.Header == null) throw new ArgumentException("Value cannot be null", nameof(args.Header)); if (args.Payload == null) throw new ArgumentException("Value cannot be null", nameof(args.Payload)); if (args.ClientCut) return await PutClientCutObject(args); else { if (args.Header.PayloadLength > 0) args.FullLength = args.Header.PayloadLength; else if (args.Payload.CanSeek) args.FullLength = (ulong)args.Payload.Length; return (await PutStreamObject(args)).ObjectId; } } internal async Task PutSingleObjectAsync(PrmSingleObjectPut args) { var ctx = args.Context!; var grpcObject = ObjectTools.CreateObject(args.FrostFsObject, ctx); var request = new PutSingleRequest { Body = new () { Object = grpcObject } }; var sessionToken = await GetOrCreateSession(args, ctx); sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcObject.Header.ContainerId, ObjectId = grpcObject.ObjectId}, ObjectSessionContext.Types.Verb.Put, ctx.Key); request.AddMetaHeader(args.XHeaders, sessionToken); request.Sign(ctx.Key); var response = await client.PutSingleAsync(request, null, ctx.Deadline, ctx.CancellationToken); Verifier.CheckResponse(response); return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.ToByteArray()); } private async Task PutClientCutObject(PrmObjectPut args) { var ctx = args.Context!; var tokenRaw = await GetOrCreateSession(args, ctx); var token = new FrostFsSessionToken(tokenRaw.Serialize()); args.SessionToken = token; var payloadStream = args.Payload!; var header = args.Header!; var fullLength = header.PayloadLength; if (payloadStream.CanSeek && fullLength == 0) fullLength = (ulong)payloadStream.Length; args.FullLength = fullLength; if (args.MaxObjectSizeCache == 0) { var networkSettings = await Context.Client.GetNetworkSettingsAsync(new PrmNetworkSettings() { Context = ctx }); args.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize; } var restBytes = fullLength - args.CurrentStreamPosition; var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.MaxObjectSizeCache, restBytes) : args.MaxObjectSizeCache; //define collection capacity var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0; var objectsCount = fullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0; List sentObjectIds = new(objectsCount); FrostFsSplit? split = null; // keep attributes for the large object var attributes = args.Header!.Attributes; // send all parts except the last one as separate Objects while (restBytes > (ulong)args.MaxObjectSizeCache) { if (split == null) { split = new FrostFsSplit(); args.Header!.Attributes = []; } split!.Previous = sentObjectIds.LastOrDefault(); args.Header!.Split = split; var result = await PutStreamObject(args); sentObjectIds.Add(result.ObjectId); restBytes -= (ulong)result.ObjectSize; } // send the last part and create linkObject if (sentObjectIds.Count > 0) { var largeObjectHeader = new FrostFsObjectHeader(header.ContainerId) { PayloadLength = fullLength }; largeObjectHeader.Attributes.AddRange(attributes); args.Header.Split!.ParentHeader = largeObjectHeader; var result = await PutStreamObject(args); sentObjectIds.Add(result.ObjectId); var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader) .AddChildren(sentObjectIds); _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject) { Context = args.Context}); return split.Parent!; } // We are here if the payload is placed to one Object. It means no cut action, just simple PUT. var singlePartResult = await PutStreamObject(args); return singlePartResult.ObjectId; } struct PutObjectResult(FrostFsObjectId objectId, int objectSize) { public FrostFsObjectId ObjectId = objectId; public int ObjectSize = objectSize; } private async Task PutStreamObject(PrmObjectPut args) { var ctx = args.Context!; var payload = args.Payload!; var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; var restBytes = args.FullLength - args.CurrentStreamPosition; chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize); bool isRentBuffer = false; byte[]? chunkBuffer = null; try { if (args.CustomBuffer != null) { chunkBuffer = args.CustomBuffer; } else { chunkBuffer = env.GetArrayPool(Constants.ObjectChunkSize).Rent(chunkSize); isRentBuffer = true; } var sentBytes = 0; // 0 means no limit from client, so server side cut is performed var objectLimitSize = args.ClientCut ? args.MaxObjectSizeCache : 0; var stream = await GetUploadStream(args, ctx); while (objectLimitSize == 0 || sentBytes < objectLimitSize) { // send chunks limited to default or user's settings var bufferSize = objectLimitSize > 0 ? (int)Math.Min(objectLimitSize - sentBytes, chunkSize) : chunkSize; var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken); if (bytesCount == 0) break; sentBytes += bytesCount; var chunkRequest = new PutRequest { Body = new PutRequest.Types.Body { Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount) } }; chunkRequest.Sign(ctx.Key); await stream.Write(chunkRequest); } var response = await stream.Close(); Verifier.CheckResponse(response); return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()), sentBytes); } finally { if (isRentBuffer && chunkBuffer != null) { ArrayPool.Shared.Return(chunkBuffer); } } } private async Task GetUploadStream(PrmObjectPut args, Context ctx) { var header = args.Header!; header.OwnerId ??= ctx.OwnerId; header.Version ??= ctx.Version; var grpcHeader = header.GetHeader(); if (header.Split != null) { ObjectTools.SetSplitValues(grpcHeader, header.Split, ctx); } var oid = new ObjectID { Value = grpcHeader.Sha256() }; var initRequest = new PutRequest { Body = new PutRequest.Types.Body { Init = new PutRequest.Types.Body.Types.Init { Header = grpcHeader } } }; var sessionToken = await GetOrCreateSession(args, ctx); sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcHeader.ContainerId, ObjectId = oid }, ObjectSessionContext.Types.Verb.Put, ctx.Key ); initRequest.AddMetaHeader(args.XHeaders, sessionToken); initRequest.Sign(ctx.Key); return await PutObjectInit(initRequest, ctx); } private async Task GetObject(GetRequest request, Context ctx) { var reader = GetObjectInit(request, ctx); var grpcObject = await reader.ReadHeader(); var modelObject = grpcObject.ToModel(); modelObject.ObjectReader = reader; return modelObject; } private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx) { if (initRequest is null) throw new ArgumentNullException(nameof(initRequest)); var call = client.Get(initRequest, null, ctx.Deadline, ctx.CancellationToken); return new ObjectReader(call); } private async Task PutObjectInit(PutRequest initRequest, Context ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Put(null, ctx.Deadline, ctx.CancellationToken); await call.RequestStream.WriteAsync(initRequest); return new ObjectStreamer(call); } private async IAsyncEnumerable SearchObjects(SearchRequest request, Context ctx) { using var stream = GetSearchReader(request, ctx); while (true) { var ids = await stream.Read(ctx.CancellationToken); if (ids == null) break; foreach (var oid in ids) { yield return oid; } } } private SearchReader GetSearchReader(SearchRequest initRequest, Context ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Search(initRequest, null, ctx.Deadline, ctx.CancellationToken); return new SearchReader(call); } }