using System; using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using FrostFS.Object; using FrostFS.Refs; using FrostFS.SDK.Client; using FrostFS.SDK.Client.Interfaces; using FrostFS.SDK.Client.Mappers.GRPC; using FrostFS.Session; using Google.Protobuf; namespace FrostFS.SDK.Client; internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientContext clientCtx) : ContextAccessor(clientCtx) { private SessionProvider? sessions; private readonly ObjectService.ObjectServiceClient client = client; public async ValueTask GetDefaultSession(ISessionToken args, CallContext ctx) { sessions ??= new(ClientContext); if (!ClientContext.SessionCache!.TryGetValue(ClientContext.SessionCacheKey, out var token)) { var protoToken = await sessions.GetDefaultSession(args, ctx).ConfigureAwait(false); token = new FrostFsSessionToken(protoToken); ClientContext.SessionCache.SetValue(ClientContext.SessionCacheKey, token); } if (token == null) { throw new FrostFsException("Cannot create session"); } return token; } internal async Task GetObjectHeadAsync(PrmObjectHeadGet args, CallContext ctx) { var request = new HeadRequest { Body = new HeadRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.GetContainerID(), ObjectId = args.ObjectId.ToMessage() }, Raw = args.Raw } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Head, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client!.HeadAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false); Verifier.CheckResponse(response); var result = new FrostFsHeaderResult(); if (response.Body.Header != null) { result.HeaderInfo = response.Body.Header?.Header.ToModel(); } if (response.Body.SplitInfo != null) { result.SplitInfo = new FrostFsSplitInfo(response.Body.SplitInfo); } return result; } internal async Task GetObjectAsync(PrmObjectGet args, CallContext ctx) { var request = new GetRequest { Body = new GetRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Get, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); return await GetObject(request, ctx).ConfigureAwait(false); } internal async Task GetRangeAsync(PrmRangeGet args, CallContext ctx) { var request = new GetRangeRequest { Body = new GetRangeRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() }, Range = new Object.Range { Offset = args.Range.Offset, Length = args.Range.Length }, Raw = args.Raw } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Range, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var call = client.GetRange(request, null, ctx.GetDeadline(), ctx.CancellationToken); return new RangeReader(call); } internal async Task[]> GetRangeHashAsync(PrmRangeHashGet args, CallContext ctx) { var request = new GetRangeHashRequest { Body = new GetRangeHashRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() }, Type = ChecksumType.Sha256, Salt = ByteString.CopyFrom(args.Salt) // TODO: create a type with calculated cashed ByteString inside } }; foreach (var range in args.Ranges) { request.Body.Ranges.Add(new Object.Range { Length = range.Length, Offset = range.Offset }); } var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Rangehash, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client.GetRangeHashAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken); Verifier.CheckResponse(response); var hashCollection = response.Body.HashList.Select(h => h.Memory).ToArray(); return hashCollection; } internal async Task DeleteObjectAsync(PrmObjectDelete args, CallContext ctx) { var request = new DeleteRequest { Body = new DeleteRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.ToMessage(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Delete, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client.DeleteAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken); Verifier.CheckResponse(response); } internal async IAsyncEnumerable SearchObjectsAsync(PrmObjectSearch args, CallContext ctx) { var request = new SearchRequest { Body = new SearchRequest.Types.Body { ContainerId = args.ContainerId.ToMessage(), Version = 1 // TODO: clarify this param } }; request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage())); var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = request.Body.ContainerId }, ObjectSessionContext.Types.Verb.Search, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); using var stream = GetSearchReader(request, ctx); while (true) { var ids = await stream.Read(ctx.CancellationToken).ConfigureAwait(false); if (ids == null) yield break; foreach (var oid in ids) { yield return FrostFsObjectId.FromHash(oid.Value.Span); } } } internal async Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) { var grpcObject = ObjectTools.CreateSingleObject(args.FrostFsObject, ClientContext); var request = new PutSingleRequest { Body = new() { Object = grpcObject } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcObject.Header.ContainerId }, ObjectSessionContext.Types.Verb.Put, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client.PutSingleAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false); Verifier.CheckResponse(response); return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.Span); } internal async Task PatchObjectAsync(PrmObjectPatch args, CallContext ctx) { var chunkSize = args.MaxChunkLength; Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null"); var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken); byte[]? chunkBuffer = null; try { // common chunkBuffer = ArrayPool.Shared.Rent(chunkSize); bool isFirstChunk = true; ulong currentPos = args.Range.Offset; var address = new Address { ObjectId = args.Address.ObjectId, ContainerId = args.Address.ContainerId }; while (true) { var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false); if (bytesCount == 0) { break; } var request = new PatchRequest() { Body = new() { Address = address, Patch = new PatchRequest.Types.Body.Types.Patch { Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)), SourceRange = new Object.Range { Offset = currentPos, Length = (ulong)bytesCount } } } }; if (isFirstChunk) { var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( address, ObjectSessionContext.Types.Verb.Patch, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); if (args.NewAttributes != null && args.NewAttributes.Length > 0) { foreach (var attr in args.NewAttributes) { request.Body.NewAttributes.Add(attr.ToMessage()); request.Body.ReplaceAttributes = args.ReplaceAttributes; } } isFirstChunk = false; } else { request.AddMetaHeader(args.XHeaders); } request.Sign(ClientContext.Key); await call.RequestStream.WriteAsync(request).ConfigureAwait(false); currentPos += (ulong)bytesCount; } } finally { if (chunkBuffer != null) { ArrayPool.Shared.Return(chunkBuffer); } } await call.RequestStream.CompleteAsync().ConfigureAwait(false); var response = await call.ResponseAsync.ConfigureAwait(false); Verifier.CheckResponse(response); return response.Body.ObjectId.ToModel(); } internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var stream = args.Payload!; var header = args.Header!; if (header.PayloadLength > 0) args.PutObjectContext.FullLength = header.PayloadLength; else if (stream.CanSeek) args.PutObjectContext.FullLength = (ulong)stream.Length; else throw new ArgumentException("The stream does not have a length and payload length is not defined"); if (args.PutObjectContext.FullLength == 0) throw new ArgumentException("The stream has zero length"); var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); var partSize = (int)networkSettings.MaxObjectSize; var restBytes = args.PutObjectContext.FullLength; var objectSize = (int)Math.Min((ulong)partSize, restBytes); // define collection capacity var objectsCount = (int)(restBytes / (ulong)objectSize) + ((restBytes % (ulong)objectSize) > 0 ? 1 : 0); // if the object fits one part, it can be loaded as non-complex object if (objectsCount == 1) { args.PutObjectContext.MaxObjectSizeCache = partSize; var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); return singlePartResult.ObjectId; } List parts = new(objectsCount); SplitId splitId = new(); // keep attributes for the large object var attributes = args.Header!.Attributes.ToArray(); header.Attributes = null; var remain = args.PutObjectContext.FullLength; FrostFsObjectHeader? parentHeader = null; var lastIndex = objectsCount - 1; bool rentBuffer = false; byte[]? buffer = null; try { for (int i = 0; i < objectsCount; i++) { if (args.CustomBuffer != null) { if (args.CustomBuffer.Length < partSize) { throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required"); } buffer = args.CustomBuffer; } else { buffer = ArrayPool.Shared.Rent(partSize); rentBuffer = true; } var bytesToWrite = Math.Min((ulong)partSize, remain); var size = await stream.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false); if (i == lastIndex) { parentHeader = new FrostFsObjectHeader(header.ContainerId, FrostFsObjectType.Regular, attributes) { PayloadLength = args.PutObjectContext.FullLength }; } // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter var partHeader = new FrostFsObjectHeader( header.ContainerId, FrostFsObjectType.Regular, [], new FrostFsSplit(splitId, parts.LastOrDefault(), parentHeader: parentHeader)) { PayloadLength = (ulong)size }; var obj = new FrostFsObject(partHeader) { SingleObjectPayload = buffer.AsMemory(0, size) }; var prm = new PrmSingleObjectPut(obj); var objId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false); parts.Add(objId); if (i < lastIndex) continue; // Once all parts of the object are uploaded, they must be linked into a single entity var linkObject = new FrostFsLinkObject(header.ContainerId, splitId, parentHeader!, parts); _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); // Retrieve the ID of the linked object return partHeader.GetHeader().Split!.Parent.ToModel(); } throw new FrostFsException("Unexpected error"); } finally { if (rentBuffer && buffer != null) { ArrayPool.Shared.Return(buffer); } } } struct PutObjectResult(FrostFsObjectId objectId, int objectSize) { public FrostFsObjectId ObjectId = objectId; public int ObjectSize = objectSize; } private async Task PutMultipartStreamObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var payload = args.Payload!; var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition; chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize); bool isRentBuffer = false; byte[]? chunkBuffer = null; try { // 0 means no limit from client, so server side cut is performed var objectLimitSize = args.PutObjectContext.MaxObjectSizeCache; if (args.CustomBuffer != null) { if (args.CustomBuffer.Length < chunkSize) { throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required"); } chunkBuffer = args.CustomBuffer; } else { chunkBuffer = ArrayPool.Shared.Rent(chunkSize); isRentBuffer = true; } var sentBytes = 0; using var stream = await GetUploadStream(args, ctx).ConfigureAwait(false); while (objectLimitSize == 0 || sentBytes < objectLimitSize) { // send chunks limited to default or user's settings var bufferSize = objectLimitSize > 0 ? Math.Min(objectLimitSize - sentBytes, chunkSize) : chunkSize; var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken).ConfigureAwait(false); if (bytesCount == 0) break; sentBytes += bytesCount; var chunkRequest = new PutRequest { Body = new PutRequest.Types.Body { Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)) } }; chunkRequest.AddMetaHeader(args.XHeaders); chunkRequest.Sign(ClientContext.Key); await stream.Write(chunkRequest).ConfigureAwait(false); } args.PutObjectContext.CurrentStreamPosition += (ulong)sentBytes; var response = await stream.Close().ConfigureAwait(false); Verifier.CheckResponse(response); return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.Span), sentBytes); } finally { if (isRentBuffer && chunkBuffer != null) { ArrayPool.Shared.Return(chunkBuffer); } } } internal async Task PutStreamObjectAsync(PrmObjectPutBase args, CallContext ctx) { var stream = await GetUploadStream(args, ctx).ConfigureAwait(false); return new ObjectWriter(ClientContext, args, stream); } private async Task> GetUploadStream(PrmObjectPutBase args, CallContext ctx) { var header = args.Header!; header.OwnerId ??= ClientContext.Owner; header.Version ??= ClientContext.Version; var grpcHeader = header.GetHeader(); if (header.Split != null) { ObjectTools.SetSplitValues(grpcHeader, header.Split, ClientContext.Owner, ClientContext.Version, ClientContext.Key); } var initRequest = new PutRequest { Body = new PutRequest.Types.Body { Init = new PutRequest.Types.Body.Types.Init { Header = grpcHeader, } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcHeader.ContainerId }, ObjectSessionContext.Types.Verb.Put, ClientContext.Key); initRequest.AddMetaHeader(args.XHeaders, protoToken); initRequest.Sign(ClientContext.Key); return await PutObjectInit(initRequest, ctx).ConfigureAwait(false); } private async Task> PutObjectInit(PutRequest initRequest, CallContext ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Put(null, ctx.GetDeadline(), ctx.CancellationToken); await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false); return new ObjectStreamer(call); } private async Task GetObject(GetRequest request, CallContext ctx) { var reader = GetObjectInit(request, ctx); var grpcObject = await reader.ReadHeader().ConfigureAwait(false); var modelObject = grpcObject.ToModel(); modelObject.ObjectReader = reader; return modelObject; } private ObjectReader GetObjectInit(GetRequest initRequest, CallContext ctx) { if (initRequest is null) throw new ArgumentNullException(nameof(initRequest)); var call = client.Get(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken); return new ObjectReader(call); } private SearchReader GetSearchReader(SearchRequest initRequest, CallContext ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Search(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken); return new SearchReader(call); } }