[#13] Change GetObject result to stream #12

Merged
PavelGrossSpb merged 3 commits from PavelGrossSpb/frostfs-sdk-csharp:ObjectDownloader into master 2024-09-04 19:51:24 +00:00
6 changed files with 60 additions and 39 deletions

View file

@ -4,14 +4,18 @@ using System.Threading.Tasks;
using Grpc.Core;
using FrostFS.Object;
using FrostFS.SDK.ModelsV2;
using System.Threading;
namespace FrostFS.SDK.ClientV2;
internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDisposable
public class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IObjectReader
{
private bool disposed = false;
public AsyncServerStreamingCall<GetResponse> Call { get; private set; } = call;
public async Task<Object.Object> ReadHeader()
internal async Task<Object.Object> ReadHeader()
{
if (!await Call.ResponseStream.MoveNext())
throw new InvalidOperationException("unexpected end of stream");
@ -29,9 +33,9 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
};
}
public async Task<byte[]?> ReadChunk()
public async Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default)
{
if (!await Call.ResponseStream.MoveNext())
if (!await Call.ResponseStream.MoveNext(cancellationToken))
return null;
var response = Call.ResponseStream.Current;
@ -44,7 +48,13 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
}
public void Dispose()
{
if (!disposed)
{
Call.Dispose();
GC.SuppressFinalize(this);
disposed = true;
}
}
}

View file

@ -80,7 +80,7 @@ internal class ObjectServiceProvider : ContextAccessor
var obj = await GetObject(request, ctx);
return obj.ToModel();
return obj;
}
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
@ -310,28 +310,17 @@ internal class ObjectServiceProvider : ContextAccessor
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
// TODO: add implementation with stream writer!
private async Task<Object.Object> GetObject(GetRequest request, Context ctx)
private async Task<ModelsV2.Object> GetObject(GetRequest request, Context ctx)
{
using var stream = GetObjectInit(request, ctx);
var reader = GetObjectInit(request, ctx);
var obj = await stream.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
var offset = 0L;
var chunk = await stream.ReadChunk();
var obj = await reader.ReadHeader();
while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
{
var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
var @object = obj.ToModel();
Array.Copy(chunk, 0, payload, offset, length);
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
@object.ObjectReader = reader;
obj.Payload = ByteString.CopyFrom(payload);
return obj;
return @object;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)

View file

@ -1,9 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace FrostFS.SDK.ModelsV2;
public interface IObjectReader : IDisposable
{
Task<byte[]?> ReadChunk();
Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default);
}

View file

@ -28,6 +28,8 @@ public class Object
public byte[] Payload { get; set; }
public IObjectReader? ObjectReader { get; set; }
public Signature? Signature { get; set; }
public void SetParent(LargeObject largeObject)

View file

@ -1,13 +1,7 @@
namespace FrostFS.SDK.ModelsV2;
public class ObjectAttribute
public class ObjectAttribute(string key, string value)
{
public string Key { get; set; }
public string Value { get; set; }
public ObjectAttribute(string key, string value)
{
Key = key;
Value = value;
}
public string Key { get; set; } = key;
public string Value { get; set; } = value;
}

View file

@ -1,3 +1,4 @@
using System.Security.Cryptography;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ModelsV2;
@ -82,13 +83,17 @@ public class ClientTestLive
Assert.NotNull(container);
Random rnd = new();
var bytes = new byte[6*1024*1024 + 100];
rnd.NextBytes(bytes);
var param = new PutObjectParameters
{
Header = new ObjectHeader(
containerId: containerId,
type: ObjectType.Regular,
new ObjectAttribute("fileName", "test")),
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
Payload = new MemoryStream(bytes),
ClientCut = false
};
@ -102,7 +107,7 @@ public class ClientTestLive
hasObject = true;
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
Assert.Equal(10u, objHeader.PayloadLength);
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value);
@ -112,7 +117,16 @@ public class ClientTestLive
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
byte[]? chunk = null;
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
{
ms.Write(chunk);
}
Assert.Equal(MD5.HashData(bytes), MD5.HashData(downloadedBytes));
await Cleanup(fsClient);
@ -145,13 +159,17 @@ public class ClientTestLive
Assert.NotNull(container);
Random rnd = new();
var bytes = new byte[6*1024*1024 + 100];
rnd.NextBytes(bytes);
var param = new PutObjectParameters
{
Header = new ObjectHeader(
containerId: containerId,
type: ObjectType.Regular,
new ObjectAttribute("fileName", "test")),
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
Payload = new MemoryStream(bytes),
ClientCut = true
};
@ -165,7 +183,7 @@ public class ClientTestLive
hasObject = true;
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
Assert.Equal(10u, objHeader.PayloadLength);
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value);
@ -175,7 +193,14 @@ public class ClientTestLive
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
byte[]? chunk = null;
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
{
ms.Write(chunk);
}
await Cleanup(fsClient);