using FrostFS.SDK; using FrostFS.SDK.Client; using FrostFS.SDK.Client.Interfaces; using FrostFS.SDK.Client.Mappers.GRPC; namespace client; public class UploadProgressInfo(SplitId splitId) { public SplitId SplitId { get; } = splitId; public List Parts { get; } = []; } public readonly struct ObjectPartInfo(long offset, int length, FrostFsObjectId objectId) { public long Offset { get; } = offset; public int Length { get; } = length; public FrostFsObjectId ObjectId { get; } = objectId; } public class LargeFileUploader(IFrostFSClient client, UploadProgressInfo? progress = null) { private readonly IFrostFSClient _client = client; private readonly int _partSize = 64 * 1024 * 1024; private readonly int _chunkSize = 3 * 1024 * 1024; private long _offset = 0; private byte[]? chunkBuffer; private bool _fail = progress == null; public UploadProgressInfo? ProgressInfo { get; private set; } = progress; public async Task PutObject(FrostFsContainerId container, Stream stream) { // use any optimization here according to memory usage strategy (ArrayPool, external buffer etc.) chunkBuffer = new byte[_chunkSize]; var fullLength = stream.Length; ProgressInfo ??= new(new SplitId()); var lastPart = ProgressInfo.Parts.LastOrDefault(); _offset = lastPart.Offset + lastPart.Length; stream.Seek(_offset, SeekOrigin.Begin); var i = 0; while (fullLength - _offset > _partSize) { var partInfo = await PutObjectPart(container, ProgressInfo.SplitId, stream); ProgressInfo.Parts.Add(partInfo); if (_fail && i++ == 3) { throw new Exception("Upload error"); } } // send the last part with additional attributes if (ProgressInfo.Parts.Count > 0) { var largeObjectHeader = new FrostFsObjectHeader( container, FrostFsObjectType.Regular) { PayloadLength = (ulong)fullLength }; // last part object header var header = new FrostFsObjectHeader( containerId: container, type: FrostFsObjectType.Regular, attributes: [], split: new FrostFsSplit(ProgressInfo.SplitId, ProgressInfo.Parts.Last().ObjectId, null, largeObjectHeader)); var param = new PrmObjectPut(header); var chunkStream = await _client.PutObjectAsync(param, default).ConfigureAwait(true); var lastPartInfo = await UploadObjectPart(stream, chunkStream, fullLength - _offset); ProgressInfo.Parts.Add(lastPartInfo); var linkObject = new FrostFsLinkObject(header.ContainerId, ProgressInfo.SplitId, largeObjectHeader, [.. ProgressInfo.Parts.Select(p => p.ObjectId)]); _ = await _client.PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), default); var parentHeader = param.Header!.GetHeader(); return parentHeader.Split!.Parent.ToModel(); } var simpleObject = await PutObjectPart(container, ProgressInfo.SplitId, stream); return simpleObject.ObjectId; } private async Task PutObjectPart(FrostFsContainerId container, SplitId splitId, Stream stream) { var lastPart = ProgressInfo!.Parts.LastOrDefault(); var header = new FrostFsObjectHeader( containerId: container, type: FrostFsObjectType.Regular, attributes: [], new FrostFsSplit(splitId, lastPart.ObjectId)); var param = new PrmObjectPut(header); var chunkStream = await _client.PutObjectAsync(param, default).ConfigureAwait(true); return await UploadObjectPart(stream, chunkStream, _partSize); } private async Task UploadObjectPart(Stream stream, IObjectWriter chunkStream, long partSize) { int uploaded = 0; while (true) { var restBytes = partSize - uploaded; var nextChunkSize = Math.Min(restBytes, _chunkSize); if (nextChunkSize == 0) { var objectId = await chunkStream.CompleteAsync(); var part = new ObjectPartInfo(_offset, uploaded, objectId); _offset += uploaded; return part; } var size = stream.Read(chunkBuffer!, 0, (int)nextChunkSize); await chunkStream.WriteAsync(chunkBuffer.AsMemory()[..size]); uploaded += size; } } }