Sandbox/CustomClientCut.cs
2025-02-19 11:20:29 +03:00

139 lines
4.6 KiB
C#

using FrostFS.SDK;
using FrostFS.SDK.Client;
using FrostFS.SDK.Client.Interfaces;
using FrostFS.SDK.Client.Mappers.GRPC;
namespace client;
public class UploadProgressInfo(SplitId splitId)
{
public SplitId SplitId { get; } = splitId;
public List<ObjectPartInfo> Parts { get; } = [];
}
public readonly struct ObjectPartInfo(long offset, int length, FrostFsObjectId objectId)
{
public long Offset { get; } = offset;
public int Length { get; } = length;
public FrostFsObjectId ObjectId { get; } = objectId;
}
public class LargeFileUploader(IFrostFSClient client, UploadProgressInfo? progress = null)
{
private readonly IFrostFSClient _client = client;
private readonly int _partSize = 64 * 1024 * 1024;
private readonly int _chunkSize = 3 * 1024 * 1024;
private long _offset = 0;
private byte[]? chunkBuffer;
private bool _fail = progress == null;
public UploadProgressInfo? ProgressInfo { get; private set; } = progress;
public async Task<FrostFsObjectId> PutObject(FrostFsContainerId container, Stream stream)
{
// use any optimization here according to memory usage strategy (ArrayPool, external buffer etc.)
chunkBuffer = new byte[_chunkSize];
var fullLength = stream.Length;
ProgressInfo ??= new(new SplitId());
var lastPart = ProgressInfo.Parts.LastOrDefault();
_offset = lastPart.Offset + lastPart.Length;
stream.Seek(_offset, SeekOrigin.Begin);
var i = 0;
while (fullLength - _offset > _partSize)
{
var partInfo = await PutObjectPart(container, ProgressInfo.SplitId, stream);
ProgressInfo.Parts.Add(partInfo);
if (_fail && i++ == 3)
{
throw new Exception("Upload error");
}
}
// send the last part with additional attributes
if (ProgressInfo.Parts.Count > 0)
{
var largeObjectHeader = new FrostFsObjectHeader(
container,
FrostFsObjectType.Regular)
{
PayloadLength = (ulong)fullLength
};
// last part object header
var header = new FrostFsObjectHeader(
containerId: container,
type: FrostFsObjectType.Regular,
attributes: [],
split: new FrostFsSplit(ProgressInfo.SplitId, ProgressInfo.Parts.Last().ObjectId, null, largeObjectHeader));
var param = new PrmObjectPut(header);
var chunkStream = await _client.PutObjectAsync(param, default).ConfigureAwait(true);
var lastPartInfo = await UploadObjectPart(stream, chunkStream, fullLength - _offset);
ProgressInfo.Parts.Add(lastPartInfo);
var linkObject = new FrostFsLinkObject(header.ContainerId, ProgressInfo.SplitId, largeObjectHeader, [.. ProgressInfo.Parts.Select(p => p.ObjectId)]);
_ = await _client.PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), default);
var parentHeader = param.Header!.GetHeader();
return parentHeader.Split!.Parent.ToModel();
}
var simpleObject = await PutObjectPart(container, ProgressInfo.SplitId, stream);
return simpleObject.ObjectId;
}
private async Task<ObjectPartInfo> PutObjectPart(FrostFsContainerId container, SplitId splitId, Stream stream)
{
var lastPart = ProgressInfo!.Parts.LastOrDefault();
var header = new FrostFsObjectHeader(
containerId: container,
type: FrostFsObjectType.Regular,
attributes: [],
new FrostFsSplit(splitId, lastPart.ObjectId));
var param = new PrmObjectPut(header);
var chunkStream = await _client.PutObjectAsync(param, default).ConfigureAwait(true);
return await UploadObjectPart(stream, chunkStream, _partSize);
}
private async Task<ObjectPartInfo> UploadObjectPart(Stream stream, IObjectWriter chunkStream, long partSize)
{
int uploaded = 0;
while (true)
{
var restBytes = partSize - uploaded;
var nextChunkSize = Math.Min(restBytes, _chunkSize);
if (nextChunkSize == 0)
{
var objectId = await chunkStream.CompleteAsync();
var part = new ObjectPartInfo(_offset, uploaded, objectId);
_offset += uploaded;
return part;
}
var size = stream.Read(chunkBuffer!, 0, (int)nextChunkSize);
await chunkStream.WriteAsync(chunkBuffer.AsMemory()[..size]);
uploaded += size;
}
}
}