using System; using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using FrostFS.Object; using FrostFS.Refs; using FrostFS.SDK.Client; using FrostFS.SDK.Client.Interfaces; using FrostFS.SDK.Client.Mappers.GRPC; using FrostFS.Session; using Google.Protobuf; namespace FrostFS.SDK.Client; internal sealed class ObjectServiceProvider(ObjectService.ObjectServiceClient client, ClientContext clientCtx) : ContextAccessor(clientCtx) { private SessionProvider? sessions; private readonly ObjectService.ObjectServiceClient client = client; public async ValueTask GetDefaultSession(ISessionToken args, CallContext ctx) { sessions ??= new(ClientContext); if (!ClientContext.SessionCache!.TryGetValue(ClientContext.SessionCacheKey, out var token)) { var protoToken = await sessions.GetDefaultSession(args, ctx).ConfigureAwait(false); token = new FrostFsSessionToken(protoToken); ClientContext.SessionCache.SetValue(ClientContext.SessionCacheKey, token); } if (token == null) { throw new FrostFsException("Cannot create session"); } return token; } internal async Task GetObjectHeadAsync(PrmObjectHeadGet args, CallContext ctx) { var request = new HeadRequest { Body = new HeadRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.GetContainerID(), ObjectId = args.ObjectId.ToMessage() }, Raw = args.Raw } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Head, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client!.HeadAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false); Verifier.CheckResponse(response); var result = new FrostFsHeaderResult(); if (response.Body.Header != null) { result.HeaderInfo = response.Body.Header?.Header.ToModel(); } if (response.Body.SplitInfo != null) { result.SplitInfo = new FrostFsSplitInfo(response.Body.SplitInfo); } return result; } internal async Task GetObjectAsync(PrmObjectGet args, CallContext ctx) { var request = new GetRequest { Body = new GetRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.GetContainerID(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Get, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); return await GetObject(request, ctx).ConfigureAwait(false); } internal async Task GetRangeAsync(PrmRangeGet args, CallContext ctx) { var request = new GetRangeRequest { Body = new GetRangeRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.GetContainerID(), ObjectId = args.ObjectId.ToMessage() }, Range = new Object.Range { Offset = args.Range.Offset, Length = args.Range.Length }, Raw = args.Raw } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Range, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var call = client.GetRange(request, null, ctx.GetDeadline(), ctx.CancellationToken); return new RangeReader(call); } internal async Task[]> GetRangeHashAsync(PrmRangeHashGet args, CallContext ctx) { var request = new GetRangeHashRequest { Body = new GetRangeHashRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.GetContainerID(), ObjectId = args.ObjectId.ToMessage() }, Type = ChecksumType.Sha256, Salt = ByteString.CopyFrom(args.Salt) // TODO: create a type with calculated cashed ByteString inside } }; foreach (var range in args.Ranges) { request.Body.Ranges.Add(new Object.Range { Length = range.Length, Offset = range.Offset }); } var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Rangehash, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client.GetRangeHashAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken); Verifier.CheckResponse(response); var hashCollection = response.Body.HashList.Select(h => h.Memory).ToArray(); return hashCollection; } internal async Task DeleteObjectAsync(PrmObjectDelete args, CallContext ctx) { var request = new DeleteRequest { Body = new DeleteRequest.Types.Body { Address = new Address { ContainerId = args.ContainerId.GetContainerID(), ObjectId = args.ObjectId.ToMessage() } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( request.Body.Address, ObjectSessionContext.Types.Verb.Delete, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client.DeleteAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken); Verifier.CheckResponse(response); } internal async IAsyncEnumerable SearchObjectsAsync(PrmObjectSearch args, CallContext ctx) { var request = new SearchRequest { Body = new SearchRequest.Types.Body { ContainerId = args.ContainerId.GetContainerID(), Version = 1 // TODO: clarify this param } }; request.Body.Filters.AddRange(args.Filters.Select(f => f.ToMessage())); var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = request.Body.ContainerId }, ObjectSessionContext.Types.Verb.Search, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); using var stream = GetSearchReader(request, ctx); while (true) { var ids = await stream.Read(ctx.CancellationToken).ConfigureAwait(false); if (ids == null) yield break; foreach (var oid in ids) { yield return FrostFsObjectId.FromHash(oid.Value.Span); } } } internal async Task PutSingleObjectAsync(PrmSingleObjectPut args, CallContext ctx) { var grpcObject = ObjectTools.CreateSingleObject(args.FrostFsObject, ClientContext); var request = new PutSingleRequest { Body = new() { Object = grpcObject } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcObject.Header.ContainerId }, ObjectSessionContext.Types.Verb.Put, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); request.Sign(ClientContext.Key); var response = await client.PutSingleAsync(request, null, ctx.GetDeadline(), ctx.CancellationToken).ConfigureAwait(false); Verifier.CheckResponse(response); return FrostFsObjectId.FromHash(grpcObject.ObjectId.Value.Span); } internal async Task PatchObjectAsync(PrmObjectPatch args, CallContext ctx) { var chunkSize = args.MaxChunkLength; Stream payload = args.Payload ?? throw new ArgumentNullException(nameof(args), "Stream parameter is null"); var call = client.Patch(null, ctx.GetDeadline(), ctx.CancellationToken); byte[]? chunkBuffer = null; try { chunkBuffer = ArrayPool.Shared.Rent(chunkSize); bool isFirstChunk = true; ulong currentPos = args.Range.Offset; var address = new Address { ObjectId = args.Address.ObjectId, ContainerId = args.Address.ContainerId }; while (true) { var bytesCount = await payload.ReadAsync(chunkBuffer, 0, chunkSize, ctx.CancellationToken).ConfigureAwait(false); if (bytesCount == 0) { break; } var request = new PatchRequest() { Body = new() { Address = address, Patch = new PatchRequest.Types.Body.Types.Patch { Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)), SourceRange = new Range { Offset = currentPos, Length = (ulong)bytesCount } } } }; if (isFirstChunk) { var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( address, ObjectSessionContext.Types.Verb.Patch, ClientContext.Key); request.AddMetaHeader(args.XHeaders, protoToken); if (args.NewAttributes != null && args.NewAttributes.Length > 0) { foreach (var attr in args.NewAttributes) { request.Body.NewAttributes.Add(attr.ToMessage()); request.Body.ReplaceAttributes = args.ReplaceAttributes; } } isFirstChunk = false; } else { request.AddMetaHeader(args.XHeaders); } request.Sign(ClientContext.Key); await call.RequestStream.WriteAsync(request).ConfigureAwait(false); currentPos += (ulong)bytesCount; } } finally { if (chunkBuffer != null) { ArrayPool.Shared.Return(chunkBuffer); } } await call.RequestStream.CompleteAsync().ConfigureAwait(false); var response = await call.ResponseAsync.ConfigureAwait(false); Verifier.CheckResponse(response); return response.Body.ObjectId.ToModel(); } internal async Task PutClientCutObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { if (args.Payload == null) throw new ArgumentException(nameof(args.Payload)); if (args.Header == null) throw new ArgumentException(nameof(args.Header)); var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); int partSize = (int)networkSettings.MaxObjectSize; int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; ulong fullLength; // Information about the uploaded parts. var progressInfo = args.Progress; // var offset = 0L; if (progressInfo != null && progressInfo.GetParts().Count > 0) { if (!args.Payload.CanSeek) { throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported."); } var lastPart = progressInfo.GetLast(); args.Payload.Position = lastPart.Offset + lastPart.Length; fullLength = (ulong)(args.Payload.Length - args.Payload.Position); offset = args.Payload.Position; } else { if (args.Header.PayloadLength > 0) fullLength = args.Header.PayloadLength; else if (args.Payload.CanSeek) fullLength = (ulong)args.Payload.Length; else throw new ArgumentException("The stream does not have a length and payload length is not defined"); } //define collection capacity var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0; var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0; progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount); var remain = fullLength; byte[]? buffer = null; bool isRentBuffer = false; try { if (args.CustomBuffer != null) { if (args.CustomBuffer.Length < chunkSize) throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required"); buffer = args.CustomBuffer; } else { buffer = ArrayPool.Shared.Rent(chunkSize); isRentBuffer = true; } FrostFsObjectId? resultObjectId = null; FrostFsObjectHeader? parentHeader = null; while (remain > 0) { var bytesToWrite = Math.Min((ulong)partSize, remain); var isLastPart = remain <= (ulong)partSize; // When the last part of the object is uploaded, all metadata for the object must be added if (isLastPart && objectsCount > 1) { parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular) { Attributes = args.Header.Attributes, PayloadLength = fullLength }; } // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter var header = objectsCount == 1 ? args.Header : new FrostFsObjectHeader( args.Header.ContainerId, FrostFsObjectType.Regular, [], new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId, parentHeader: parentHeader)); var prm = new PrmObjectPut(header); using var stream = await PutStreamObjectAsync(prm, ctx).ConfigureAwait(false); var uploaded = 0; // If an error occurs while uploading a part of the object, there is no need to re-upload the parts // that were successfully uploaded before. It is sufficient to re-upload only the failed part var thisPartRest = (int)Math.Min((ulong)partSize, remain); while (thisPartRest > 0) { var nextChunkSize = Math.Min(thisPartRest, chunkSize); var size = await args.Payload.ReadAsync(buffer, 0, nextChunkSize).ConfigureAwait(false); if (size == 0) break; await stream.WriteAsync(buffer.AsMemory(0, size)).ConfigureAwait(false); uploaded += size; thisPartRest -= size; } var objectId = await stream.CompleteAsync().ConfigureAwait(false); var part = new ObjectPartInfo(offset, uploaded, objectId); offset += uploaded; progressInfo.AddPart(part); remain -= bytesToWrite; if (isLastPart) { if (objectsCount == 1) { return progressInfo.GetPart(0).ObjectId; } if (parentHeader == null) continue; // Once all parts of the object are uploaded, they must be linked into a single entity var linkObject = new FrostFsLinkObject(header.ContainerId, progressInfo.SplitId, parentHeader, [.. progressInfo.GetParts().Select(p => p.ObjectId)]); await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); // Retrieve the ID of the linked object resultObjectId = FrostFsObjectId.FromHash(prm.Header!.GetHeader().Split!.Parent.Value.Span); return resultObjectId; } } throw new FrostFsException("Unexpected error: cannot send object"); } finally { if (isRentBuffer && buffer != null) { ArrayPool.Shared.Return(buffer); } } } internal async Task PutClientCutSingleObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { if (args.Payload == null) throw new ArgumentException(nameof(args.Payload)); if (args.Header == null) throw new ArgumentException(nameof(args.Header)); var networkSettings = await ClientContext.Client.GetNetworkSettingsAsync(ctx).ConfigureAwait(false); int partSize = (int)networkSettings.MaxObjectSize; int chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; ulong fullLength; // Information about the uploaded parts. var progressInfo = args.Progress; // var offset = 0L; if (progressInfo != null && progressInfo.GetParts().Count > 0) { if (!args.Payload.CanSeek) { throw new FrostFsException("Cannot resume client cut upload for this stream. Seek must be supported."); } var lastPart = progressInfo.GetLast(); args.Payload.Position = lastPart.Offset + lastPart.Length; fullLength = (ulong)(args.Payload.Length - args.Payload.Position); offset = args.Payload.Position; } else { if (args.Header.PayloadLength > 0) fullLength = args.Header.PayloadLength; else if (args.Payload.CanSeek) fullLength = (ulong)args.Payload.Length; else throw new ArgumentException("The stream does not have a length and payload length is not defined"); } //define collection capacity var restPart = (fullLength % (ulong)partSize) > 0 ? 1 : 0; var objectsCount = fullLength > 0 ? (int)(fullLength / (ulong)partSize) + restPart : 0; // if the object fits one part, it can be loaded as non-complex object, but if it is not upload resuming if (objectsCount == 1 && progressInfo != null && progressInfo.GetLast().Length == 0) { args.PutObjectContext.MaxObjectSizeCache = partSize; args.PutObjectContext.FullLength = fullLength; var singlePartResult = await PutMultipartStreamObjectAsync(args, default).ConfigureAwait(false); return singlePartResult.ObjectId; } progressInfo ??= new UploadProgressInfo(Guid.NewGuid(), objectsCount); var remain = fullLength; byte[]? buffer = null; bool isRentBuffer = false; try { if (args.CustomBuffer != null) { if (args.CustomBuffer.Length < partSize) { throw new ArgumentException($"Buffer size is too small. A buffer with capacity {partSize} is required"); } buffer = args.CustomBuffer; } else { buffer = ArrayPool.Shared.Rent(partSize); isRentBuffer = true; } FrostFsObjectHeader? parentHeader = null; for (int i = 0; i < objectsCount;) { i++; var bytesToWrite = Math.Min((ulong)partSize, remain); var size = await args.Payload.ReadAsync(buffer, 0, (int)bytesToWrite).ConfigureAwait(false); if (i == objectsCount) { parentHeader = new FrostFsObjectHeader(args.Header.ContainerId, FrostFsObjectType.Regular) { PayloadLength = args.PutObjectContext.FullLength, Attributes = args.Header.Attributes }; } // Uploading the next part of the object. Note: the request must contain a non-null SplitId parameter var partHeader = new FrostFsObjectHeader( args.Header.ContainerId, FrostFsObjectType.Regular, [], new FrostFsSplit(progressInfo.SplitId, progressInfo.GetLast().ObjectId, parentHeader: parentHeader)) { PayloadLength = (ulong)size }; var obj = new FrostFsObject(partHeader) { SingleObjectPayload = buffer.AsMemory(0, size) }; var prm = new PrmSingleObjectPut(obj); var objectId = await PutSingleObjectAsync(prm, ctx).ConfigureAwait(false); var part = new ObjectPartInfo(offset, size, objectId); progressInfo.AddPart(part); offset += size; if (i < objectsCount) continue; // Once all parts of the object are uploaded, they must be linked into a single entity var linkObject = new FrostFsLinkObject(args.Header.ContainerId, progressInfo.SplitId, parentHeader!, [.. progressInfo.GetParts().Select(p => p.ObjectId)]); _ = await PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), ctx).ConfigureAwait(false); // Retrieve the ID of the linked object return partHeader.GetHeader().Split!.Parent.ToModel(); } throw new FrostFsException("Unexpected error"); } finally { if (isRentBuffer && buffer != null) { ArrayPool.Shared.Return(buffer); } } } struct PutObjectResult(FrostFsObjectId objectId, int objectSize) { public FrostFsObjectId ObjectId = objectId; public int ObjectSize = objectSize; } private async Task PutMultipartStreamObjectAsync(PrmObjectClientCutPut args, CallContext ctx) { var payload = args.Payload!; var chunkSize = args.BufferMaxSize > 0 ? args.BufferMaxSize : Constants.ObjectChunkSize; var restBytes = args.PutObjectContext.FullLength - args.PutObjectContext.CurrentStreamPosition; chunkSize = (int)Math.Min(restBytes, (ulong)chunkSize); bool isRentBuffer = false; byte[]? chunkBuffer = null; try { // 0 means no limit from client, so server side cut is performed var objectLimitSize = args.PutObjectContext.MaxObjectSizeCache; if (args.CustomBuffer != null) { if (args.CustomBuffer.Length < chunkSize) { throw new ArgumentException($"Buffer size is too small. At least {chunkSize} required"); } chunkBuffer = args.CustomBuffer; } else { chunkBuffer = ArrayPool.Shared.Rent(chunkSize); isRentBuffer = true; } var sentBytes = 0; using var stream = await GetUploadStream(args, ctx).ConfigureAwait(false); while (objectLimitSize == 0 || sentBytes < objectLimitSize) { // send chunks limited to default or user's settings var bufferSize = objectLimitSize > 0 ? Math.Min(objectLimitSize - sentBytes, chunkSize) : chunkSize; var bytesCount = await payload.ReadAsync(chunkBuffer, 0, bufferSize, ctx.CancellationToken).ConfigureAwait(false); if (bytesCount == 0) break; sentBytes += bytesCount; var chunkRequest = new PutRequest { Body = new PutRequest.Types.Body { Chunk = UnsafeByteOperations.UnsafeWrap(chunkBuffer.AsMemory(0, bytesCount)) } }; chunkRequest.AddMetaHeader(args.XHeaders); chunkRequest.Sign(ClientContext.Key); await stream.Write(chunkRequest).ConfigureAwait(false); } args.PutObjectContext.CurrentStreamPosition += (ulong)sentBytes; var response = await stream.Close().ConfigureAwait(false); Verifier.CheckResponse(response); return new PutObjectResult(FrostFsObjectId.FromHash(response.Body.ObjectId.Value.Span), sentBytes); } finally { if (isRentBuffer && chunkBuffer != null) { ArrayPool.Shared.Return(chunkBuffer); } } } internal async Task PutStreamObjectAsync(PrmObjectPutBase args, CallContext ctx) { var stream = await GetUploadStream(args, ctx).ConfigureAwait(false); return new ObjectWriter(ClientContext, args, stream); } private async Task> GetUploadStream(PrmObjectPutBase args, CallContext ctx) { var header = args.Header!; header.OwnerId ??= ClientContext.Owner; header.Version ??= ClientContext.Version; var grpcHeader = header.GetHeader(); if (header.Split != null) { ObjectTools.SetSplitValues(grpcHeader, header.Split, ClientContext.Owner, ClientContext.Version, ClientContext.Key); } var initRequest = new PutRequest { Body = new PutRequest.Types.Body { Init = new PutRequest.Types.Body.Types.Init { Header = grpcHeader, } } }; var sessionToken = args.SessionToken ?? await GetDefaultSession(args, ctx).ConfigureAwait(false); var protoToken = sessionToken.CreateObjectTokenContext( new Address { ContainerId = grpcHeader.ContainerId }, ObjectSessionContext.Types.Verb.Put, ClientContext.Key); initRequest.AddMetaHeader(args.XHeaders, protoToken); initRequest.Sign(ClientContext.Key); return await PutObjectInit(initRequest, ctx).ConfigureAwait(false); } private async Task> PutObjectInit(PutRequest initRequest, CallContext ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Put(null, ctx.GetDeadline(), ctx.CancellationToken); await call.RequestStream.WriteAsync(initRequest).ConfigureAwait(false); return new ObjectStreamer(call); } private async Task GetObject(GetRequest request, CallContext ctx) { var reader = GetObjectInit(request, ctx); var grpcObject = await reader.ReadHeader().ConfigureAwait(false); var modelObject = grpcObject.ToModel(); modelObject.ObjectReader = reader; return modelObject; } private ObjectReader GetObjectInit(GetRequest initRequest, CallContext ctx) { if (initRequest is null) throw new ArgumentNullException(nameof(initRequest)); var call = client.Get(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken); return new ObjectReader(call); } private SearchReader GetSearchReader(SearchRequest initRequest, CallContext ctx) { if (initRequest is null) { throw new ArgumentNullException(nameof(initRequest)); } var call = client.Search(initRequest, null, ctx.GetDeadline(), ctx.CancellationToken); return new SearchReader(call); } }