139 lines
4.6 KiB
C#
139 lines
4.6 KiB
C#
using FrostFS.SDK;
|
|
using FrostFS.SDK.Client;
|
|
using FrostFS.SDK.Client.Interfaces;
|
|
using FrostFS.SDK.Client.Mappers.GRPC;
|
|
|
|
namespace client;
|
|
|
|
public class UploadProgressInfo(SplitId splitId)
|
|
{
|
|
public SplitId SplitId { get; } = splitId;
|
|
|
|
public List<ObjectPartInfo> Parts { get; } = [];
|
|
}
|
|
|
|
public readonly struct ObjectPartInfo(long offset, int length, FrostFsObjectId objectId)
|
|
{
|
|
public long Offset { get; } = offset;
|
|
public int Length { get; } = length;
|
|
public FrostFsObjectId ObjectId { get; } = objectId;
|
|
}
|
|
|
|
public class LargeFileUploader(IFrostFSClient client, UploadProgressInfo? progress = null)
|
|
{
|
|
private readonly IFrostFSClient _client = client;
|
|
private readonly int _partSize = 64 * 1024 * 1024;
|
|
private readonly int _chunkSize = 3 * 1024 * 1024;
|
|
private long _offset = 0;
|
|
private byte[]? chunkBuffer;
|
|
|
|
private bool _fail = progress == null;
|
|
public UploadProgressInfo? ProgressInfo { get; private set; } = progress;
|
|
|
|
public async Task<FrostFsObjectId> PutObject(FrostFsContainerId container, Stream stream)
|
|
{
|
|
// use any optimization here according to memory usage strategy (ArrayPool, external buffer etc.)
|
|
chunkBuffer = new byte[_chunkSize];
|
|
|
|
var fullLength = stream.Length;
|
|
|
|
ProgressInfo ??= new(new SplitId());
|
|
|
|
var lastPart = ProgressInfo.Parts.LastOrDefault();
|
|
|
|
_offset = lastPart.Offset + lastPart.Length;
|
|
|
|
stream.Seek(_offset, SeekOrigin.Begin);
|
|
|
|
var i = 0;
|
|
while (fullLength - _offset > _partSize)
|
|
{
|
|
var partInfo = await PutObjectPart(container, ProgressInfo.SplitId, stream);
|
|
ProgressInfo.Parts.Add(partInfo);
|
|
|
|
if (_fail && i++ == 3)
|
|
{
|
|
throw new Exception("Upload error");
|
|
}
|
|
}
|
|
|
|
// send the last part with additional attributes
|
|
if (ProgressInfo.Parts.Count > 0)
|
|
{
|
|
var largeObjectHeader = new FrostFsObjectHeader(
|
|
container,
|
|
FrostFsObjectType.Regular)
|
|
{
|
|
PayloadLength = (ulong)fullLength
|
|
};
|
|
|
|
// last part object header
|
|
var header = new FrostFsObjectHeader(
|
|
containerId: container,
|
|
type: FrostFsObjectType.Regular,
|
|
attributes: [],
|
|
split: new FrostFsSplit(ProgressInfo.SplitId, ProgressInfo.Parts.Last().ObjectId, null, largeObjectHeader));
|
|
|
|
var param = new PrmObjectPut(header);
|
|
|
|
var chunkStream = await _client.PutObjectAsync(param, default).ConfigureAwait(true);
|
|
|
|
var lastPartInfo = await UploadObjectPart(stream, chunkStream, fullLength - _offset);
|
|
|
|
ProgressInfo.Parts.Add(lastPartInfo);
|
|
|
|
var linkObject = new FrostFsLinkObject(header.ContainerId, ProgressInfo.SplitId, largeObjectHeader, [.. ProgressInfo.Parts.Select(p => p.ObjectId)]);
|
|
|
|
_ = await _client.PutSingleObjectAsync(new PrmSingleObjectPut(linkObject), default);
|
|
|
|
var parentHeader = param.Header!.GetHeader();
|
|
|
|
return parentHeader.Split!.Parent.ToModel();
|
|
}
|
|
|
|
var simpleObject = await PutObjectPart(container, ProgressInfo.SplitId, stream);
|
|
|
|
return simpleObject.ObjectId;
|
|
}
|
|
|
|
private async Task<ObjectPartInfo> PutObjectPart(FrostFsContainerId container, SplitId splitId, Stream stream)
|
|
{
|
|
var lastPart = ProgressInfo!.Parts.LastOrDefault();
|
|
|
|
var header = new FrostFsObjectHeader(
|
|
containerId: container,
|
|
type: FrostFsObjectType.Regular,
|
|
attributes: [],
|
|
new FrostFsSplit(splitId, lastPart.ObjectId));
|
|
|
|
var param = new PrmObjectPut(header);
|
|
|
|
var chunkStream = await _client.PutObjectAsync(param, default).ConfigureAwait(true);
|
|
|
|
return await UploadObjectPart(stream, chunkStream, _partSize);
|
|
}
|
|
|
|
private async Task<ObjectPartInfo> UploadObjectPart(Stream stream, IObjectWriter chunkStream, long partSize)
|
|
{
|
|
int uploaded = 0;
|
|
while (true)
|
|
{
|
|
var restBytes = partSize - uploaded;
|
|
var nextChunkSize = Math.Min(restBytes, _chunkSize);
|
|
|
|
if (nextChunkSize == 0)
|
|
{
|
|
var objectId = await chunkStream.CompleteAsync();
|
|
var part = new ObjectPartInfo(_offset, uploaded, objectId);
|
|
_offset += uploaded;
|
|
|
|
return part;
|
|
}
|
|
|
|
var size = stream.Read(chunkBuffer!, 0, (int)nextChunkSize);
|
|
await chunkStream.WriteAsync(chunkBuffer.AsMemory()[..size]);
|
|
|
|
uploaded += size;
|
|
}
|
|
}
|
|
}
|