using System; using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using FrostFS.Object; using FrostFS.Refs; using FrostFS.SDK.Client; using FrostFS.SDK.Client.Interfaces; using FrostFS.SDK.Client.Mappers.GRPC; using FrostFS.Session; using Google.Protobuf; namespace FrostFS.SDK.Client; internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientContext clientCtx) : ContextAccessor(clientCtx) { private SessionProvider? sessions; private readonly ObjectService.ObjectServiceClient client = client; public async ValueTask GetDefaultSession(ISessionToken args, CallContext ctx) { sessions ??= new(ClientContext); if (!ClientContext.SessionCache!.TryGetValue(ClientContext.SessionCacheKey, out var token)) { var protoToken = await sessions.GetDefaultSession(args, ctx).ConfigureAwait(false); token = new FrostFsSessionToken(protoToken); ClientContext.SessionCache.SetValue(ClientContext.SessionCacheKey, token); } if (token == null) { throw new FrostFsException("Cannot create session"); } return token; } internal async Task GetObjectHeadAsync(PrmObjectHeadGet args, CallContext ctx) { var request = new HeadRequest { Body = new HeadRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.GetContainerID(), ObjectId = args.ObjectId.ToMessage() }, Raw = args.Raw } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Head, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key.ECDsaKey); var response = await client!.HeadAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false); Verifier.CheckResponse(response); var result = new FrostFsHeaderResult(); if (response.Body.Header != null) { result.HeaderInfo = response.Body.Header?.Header.ToModel(); } if (response.Body.SplitInfo != null) { result.SplitInfo = new FrostFsSplitInfo(response.Body.SplitInfo); } return result; } internal async Task GetObjectAsync(PrmObjectGet args, CallContext ctx) { var request = new GetRequest { Body = new GetRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Get, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key.ECDsaKey); return await GetObject(request, ctx).ConfigureAwait(false); } internal async Task GetRangeAsync(PrmRangeGet args, CallContext ctx) { var request = new GetRangeRequest { Body = new GetRangeRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() }, Range = new Object.Range { Offset = args.Range.Offset, Length = args.Range.Length }, Raw = args.Raw } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Range, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key.ECDsaKey); var call = client.GetRange(request, null, ctx.GetDeadline(), ctx.CancellationToken); return new RangeReader(call); } internal async Task[]> GetRangeHashAsync(PrmRangeHashGet args, CallContext ctx) { var request = new GetRangeHashRequest { Body = new GetRangeHashRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() }, Type = ChecksumType.Sha256, Salt = ByteString.CopyFrom(args.Salt) // TODO: create a type with calculated cashed ByteString inside } }; foreach (var range in args.Ranges) { request.Body.Ranges.Add(new Object.Range { Length = range.Length, Offset = range.Offset }); } var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Rangehash, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key.ECDsaKey); var response = await client.GetRangeHashAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken); Verifier.CheckResponse(response); var hashCollection = response.Body.HashList.Select(h => h.Memory).ToArray(); return hashCollection; } internal async Task DeleteObjectAsync(PrmObjectDelete args, CallContext ctx) { var request = new DeleteRequest { Body = new DeleteRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Delete, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key.ECDsaKey); var response = await client.DeleteAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken); Verifier.CheckResponse(response); } internal async IAsyncEnumerable SearchObjectsAsync(PrmObjectSearch args, CallContext ctx) { var request = new SearchRequest { Body = new SearchRequest.Types.Body { ContainerId = args.ContainerId.ToMessage(), Version = 1 // TODO: clarify this param } }; request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage())); var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = request.Body.ContainerId }, ObjectSessionContext.Types.Verb.Search, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key.ECDsaKey); using var stream = GetSearchReader(request, ctx); while (true) { var ids = await stream.Read(ctx.CancellationToken).ConfigureAwait(false); if (ids == null) yield break; foreach (var oid in ids) { yield return FrostFsObjectId.FromHash(oid.Value.Span); } } } internal async Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) { var grpcObject = ObjectTools.CreateSingleObject(args.FrostFsObject, ClientContext); var request = new PutSingleRequest { Body = new() { Object = grpcObject } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcObject.Header.ContainerId }, ObjectSessionContext.Types.Verb.Put, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key.ECDsaKey); var response = await client.PutSingleAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false); Verifier.CheckResponse(response); return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.Span); } internal async Task PatchObjectAsync(PrmObjectPatch args, CallContext ctx) { var chunkSize = args.MaxChunkLength; Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null"); var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken); byte[]? chunkBuffer = null; try { // common chunkBuffer = ArrayPool.Shared.Rent(chunkSize); bool isFirstChunk = true; ulong currentPos = args.Range.Offset; var address = new Address { ObjectId = args.Address.ObjectId, ContainerId = args.Address.ContainerId }; while (true) { var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false); if (bytesCount == 0) { break; } var request = new PatchRequest() { Body = new() { Address = address, Patch = new PatchRequest.Types.Body.Types.Patch { Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount), SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount } } } }; if (isFirstChunk) { var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( address, ObjectSessionContext.Types.Verb.Patch, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); if (args.NewAttributes != null && args.NewAttributes.Length > 0) { foreach (var attr in args.NewAttributes) { request.Body.NewAttributes.Add(attr.ToMessage()); request.Body.ReplaceAttributes = args.ReplaceAttributes; } } isFirstChunk = false; } else { request.AddMetaHeader(args.XHeaders); } request.Sign(ClientContext.Key.ECDsaKey); await call.RequestStream.WriteAsync(request).ConfigureAwait(false); currentPos += (ulong)bytesCount; } } finally { if (chunkBuffer != null) { ArrayPool.Shared.Return(chunkBuffer); } } await call.RequestStream.CompleteAsync().ConfigureAwait(false); var response = await call.ResponseAsync.ConfigureAwait(false); Verifier.CheckResponse(response); return response.Body.ObjectId.ToModel(); } internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var payloadStream = args.Payload!; var header = args.Header!; if (header.PayloadLength > 0) args.PutObjectContext.FullLength = header.PayloadLength; else if (payloadStream.CanSeek) args.PutObjectContext.FullLength = (ulong)payloadStream.Length; else throw new ArgumentException("The stream does not have a length and payload length is not defined"); if (args.PutObjectContext.MaxObjectSizeCache == 0) { var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx) .ConfigureAwait(false); args.PutObjectContext.MaxObjectSizeCache = (int)networkSettings.MaxObjectSize; } var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition; var objectSize = restBytes > 0 ? (int)Math.Min((ulong)args.PutObjectContext.MaxObjectSizeCache, restBytes) : args.PutObjectContext.MaxObjectSizeCache; //define collection capacity var restPart = (restBytes % (ulong)objectSize) > 0 ? 1 : 0; var objectsCount = args.PutObjectContext.FullLength > 0 ? (int)(restBytes / (ulong)objectSize) + restPart : 0; List sentObjectIds = new(objectsCount); FrostFsSplit? split = null; SplitId splitId = new(); // keep attributes for the large object var attributes = args.Header!.Attributes; args.Header!.Attributes = null; // send all parts except the last one as separate Objects while (restBytes > (ulong)args.PutObjectContext.MaxObjectSizeCache) { split = new FrostFsSplit(splitId, sentObjectIds.LastOrDefault()); args.Header!.Split = split; var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); sentObjectIds.Add(result.ObjectId); restBytes -= (ulong)result.ObjectSize; } // send the last part and create linkObject if (sentObjectIds.Count > 0) { var largeObjectHeader = new FrostFsObjectHeader( header.ContainerId, FrostFsObjectType.Regular, attributes != null ? [.. attributes] : []) { PayloadLength = args.PutObjectContext.FullLength, }; args.Header.Split!.ParentHeader = largeObjectHeader; var result = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); sentObjectIds.Add(result.ObjectId); var linkObject = new FrostFsLinkObject(header.ContainerId, split!.SplitId, largeObjectHeader, sentObjectIds); _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); var parentHeader = args.Header.GetHeader(); return parentHeader.Split!.Parent.ToModel(); } // We are here if the payload is placed to one Object. It means no cut action, just simple PUT. args.Header!.Attributes = attributes; var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); return singlePartResult.ObjectId; } struct PutObjectResult(FrostFsObjectId objectId, int objectSize) { public FrostFsObjectId ObjectId = objectId; public int ObjectSize = objectSize; } private async Task PutMultipartStreamObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var payload = args.Payload!; var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition; chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize); bool isRentBuffer = false; byte[]? chunkBuffer = null; try { // 0 means no limit from client, so server side cut is performed var objectLimitSize = args.PutObjectContext.MaxObjectSizeCache; if (args.CustomBuffer != null) { if (args.CustomBuffer.Length < chunkSize) { throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required"); } chunkBuffer = args.CustomBuffer; } else { chunkBuffer = ArrayPool.Shared.Rent(chunkSize); isRentBuffer = true; } var sentBytes = 0; using var stream = await GetUploadStream(args, ctx).ConfigureAwait(false); while (objectLimitSize == 0 || sentBytes < objectLimitSize) { // send chunks limited to default or user's settings var bufferSize = objectLimitSize > 0 ? Math.Min(objectLimitSize - sentBytes, chunkSize) : chunkSize; var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken).ConfigureAwait(false); if (bytesCount == 0) break; sentBytes += bytesCount; var chunkRequest = new PutRequest { Body = new PutRequest.Types.Body { Chunk = ByteString.CopyFrom(chunkBuffer, 0, bytesCount) } }; chunkRequest.AddMetaHeader(args.XHeaders); chunkRequest.Sign(ClientContext.Key.ECDsaKey); await stream.Write(chunkRequest).ConfigureAwait(false); } var response = await stream.Close().ConfigureAwait(false); Verifier.CheckResponse(response); return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.Span), sentBytes); } finally { if (isRentBuffer && chunkBuffer != null) { ArrayPool.Shared.Return(chunkBuffer); } } } internal async Task PutStreamObjectAsync(PrmObjectPutBase args, CallContext ctx) { var stream = await GetUploadStream(args, ctx).ConfigureAwait(false); return new ObjectWriter(ClientContext, args, stream); } private async Task> GetUploadStream(PrmObjectPutBase args, CallContext ctx) { var header = args.Header!; header.OwnerId ??= ClientContext.Owner; header.Version ??= ClientContext.Version; var grpcHeader = header.GetHeader(); if (header.Split != null) { ObjectTools.SetSplitValues(grpcHeader, header.Split, ClientContext.Owner, ClientContext.Version, ClientContext.Key); } var initRequest = new PutRequest { Body = new PutRequest.Types.Body { Init = new PutRequest.Types.Body.Types.Init { Header = grpcHeader } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcHeader.ContainerId }, ObjectSessionContext.Types.Verb.Put, ClientContext.Key); initRequest.AddMetaHeader(args.XHeaders, protoToken); initRequest.Sign(ClientContext.Key.ECDsaKey); return await PutObjectInit(initRequest, ctx).ConfigureAwait(false); } private async Task> PutObjectInit(PutRequest initRequest, CallContext ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Put(null, ctx.GetDeadline(), ctx.CancellationToken); await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false); return new ObjectStreamer(call); } private async Task GetObject(GetRequest request, CallContext ctx) { var reader = GetObjectInit(request, ctx); var grpcObject = await reader.ReadHeader().ConfigureAwait(false); var modelObject = grpcObject.ToModel(); modelObject.ObjectReader = reader; return modelObject; } private ObjectReader GetObjectInit(GetRequest initRequest, CallContext ctx) { if (initRequest is null) throw new ArgumentNullException(nameof(initRequest)); var call = client.Get(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken); return new ObjectReader(call); } private SearchReader GetSearchReader(SearchRequest initRequest, CallContext ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Search(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken); return new SearchReader(call); } }