[#13] Change GetObject result to stream #12

Merged
PavelGrossSpb merged 3 commits from PavelGrossSpb/frostfs-sdk-csharp:ObjectDownloader into master 2024-09-04 19:51:24 +00:00
6 changed files with 60 additions and 39 deletions

View file

@ -4,14 +4,18 @@ using System.Threading.Tasks;
using Grpc.Core; using Grpc.Core;
using FrostFS.Object; using FrostFS.Object;
using FrostFS.SDK.ModelsV2;
using System.Threading;
namespace FrostFS.SDK.ClientV2; namespace FrostFS.SDK.ClientV2;
internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDisposable public class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IObjectReader
{ {
private bool disposed = false;
public AsyncServerStreamingCall<GetResponse> Call { get; private set; } = call; public AsyncServerStreamingCall<GetResponse> Call { get; private set; } = call;
public async Task<Object.Object> ReadHeader() internal async Task<Object.Object> ReadHeader()
{ {
if (!await Call.ResponseStream.MoveNext()) if (!await Call.ResponseStream.MoveNext())
throw new InvalidOperationException("unexpected end of stream"); throw new InvalidOperationException("unexpected end of stream");
@ -29,9 +33,9 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
}; };
} }
public async Task<byte[]?> ReadChunk() public async Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default)
{ {
if (!await Call.ResponseStream.MoveNext()) if (!await Call.ResponseStream.MoveNext(cancellationToken))
return null; return null;
var response = Call.ResponseStream.Current; var response = Call.ResponseStream.Current;
@ -45,6 +49,12 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
public void Dispose() public void Dispose()
{ {
Call.Dispose(); if (!disposed)
{
Call.Dispose();
GC.SuppressFinalize(this);
disposed = true;
}
} }
} }

View file

@ -80,7 +80,7 @@ internal class ObjectServiceProvider : ContextAccessor
var obj = await GetObject(request, ctx); var obj = await GetObject(request, ctx);
return obj.ToModel(); return obj;
} }
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx) internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
@ -310,28 +310,17 @@ internal class ObjectServiceProvider : ContextAccessor
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()); return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
} }
// TODO: add implementation with stream writer! private async Task<ModelsV2.Object> GetObject(GetRequest request, Context ctx)
private async Task<Object.Object> GetObject(GetRequest request, Context ctx)
{ {
using var stream = GetObjectInit(request, ctx); var reader = GetObjectInit(request, ctx);
var obj = await stream.ReadHeader(); var obj = await reader.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
var offset = 0L;
var chunk = await stream.ReadChunk();
while (chunk is not null && (ulong)offset < obj.Header.PayloadLength) var @object = obj.ToModel();
{
var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
Array.Copy(chunk, 0, payload, offset, length); @object.ObjectReader = reader;
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
obj.Payload = ByteString.CopyFrom(payload); return @object;
return obj;
} }
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx) private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)

View file

@ -1,9 +1,10 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace FrostFS.SDK.ModelsV2; namespace FrostFS.SDK.ModelsV2;
public interface IObjectReader : IDisposable public interface IObjectReader : IDisposable
{ {
Task<byte[]?> ReadChunk(); Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default);
} }

View file

@ -28,6 +28,8 @@ public class Object
public byte[] Payload { get; set; } public byte[] Payload { get; set; }
public IObjectReader? ObjectReader { get; set; }
public Signature? Signature { get; set; } public Signature? Signature { get; set; }
public void SetParent(LargeObject largeObject) public void SetParent(LargeObject largeObject)

View file

@ -1,13 +1,7 @@
namespace FrostFS.SDK.ModelsV2; namespace FrostFS.SDK.ModelsV2;
public class ObjectAttribute public class ObjectAttribute(string key, string value)
{ {
public string Key { get; set; } public string Key { get; set; } = key;
public string Value { get; set; } public string Value { get; set; } = value;
public ObjectAttribute(string key, string value)
{
Key = key;
Value = value;
}
} }

View file

@ -1,3 +1,4 @@
using System.Security.Cryptography;
using FrostFS.SDK.ClientV2; using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces; using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ModelsV2; using FrostFS.SDK.ModelsV2;
@ -82,13 +83,17 @@ public class ClientTestLive
Assert.NotNull(container); Assert.NotNull(container);
Random rnd = new();
var bytes = new byte[6*1024*1024 + 100];
rnd.NextBytes(bytes);
var param = new PutObjectParameters var param = new PutObjectParameters
{ {
Header = new ObjectHeader( Header = new ObjectHeader(
containerId: containerId, containerId: containerId,
type: ObjectType.Regular, type: ObjectType.Regular,
new ObjectAttribute("fileName", "test")), new ObjectAttribute("fileName", "test")),
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), Payload = new MemoryStream(bytes),
ClientCut = false ClientCut = false
}; };
@ -102,7 +107,7 @@ public class ClientTestLive
hasObject = true; hasObject = true;
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId); var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
Assert.Equal(10u, objHeader.PayloadLength); Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.Single(objHeader.Attributes); Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key); Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value); Assert.Equal("test", objHeader.Attributes.First().Value);
@ -112,7 +117,16 @@ public class ClientTestLive
var @object = await fsClient.GetObjectAsync(containerId, objectId!); var @object = await fsClient.GetObjectAsync(containerId, objectId!);
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload); var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
byte[]? chunk = null;
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
{
ms.Write(chunk);
}
Assert.Equal(MD5.HashData(bytes), MD5.HashData(downloadedBytes));
await Cleanup(fsClient); await Cleanup(fsClient);
@ -145,13 +159,17 @@ public class ClientTestLive
Assert.NotNull(container); Assert.NotNull(container);
Random rnd = new();
var bytes = new byte[6*1024*1024 + 100];
rnd.NextBytes(bytes);
var param = new PutObjectParameters var param = new PutObjectParameters
{ {
Header = new ObjectHeader( Header = new ObjectHeader(
containerId: containerId, containerId: containerId,
type: ObjectType.Regular, type: ObjectType.Regular,
new ObjectAttribute("fileName", "test")), new ObjectAttribute("fileName", "test")),
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), Payload = new MemoryStream(bytes),
ClientCut = true ClientCut = true
}; };
@ -165,7 +183,7 @@ public class ClientTestLive
hasObject = true; hasObject = true;
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId); var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
Assert.Equal(10u, objHeader.PayloadLength); Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.Single(objHeader.Attributes); Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key); Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value); Assert.Equal("test", objHeader.Attributes.First().Value);
@ -175,7 +193,14 @@ public class ClientTestLive
var @object = await fsClient.GetObjectAsync(containerId, objectId!); var @object = await fsClient.GetObjectAsync(containerId, objectId!);
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload); var downloadedBytes = new byte[@object.Header.PayloadLength];
MemoryStream ms = new(downloadedBytes);
byte[]? chunk = null;
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
{
ms.Write(chunk);
}
await Cleanup(fsClient); await Cleanup(fsClient);