[#13] Change GetObject result to stream #12
6 changed files with 60 additions and 39 deletions
|
@ -4,14 +4,18 @@ using System.Threading.Tasks;
|
|||
using Grpc.Core;
|
||||
|
||||
using FrostFS.Object;
|
||||
using FrostFS.SDK.ModelsV2;
|
||||
using System.Threading;
|
||||
|
||||
namespace FrostFS.SDK.ClientV2;
|
||||
|
||||
internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDisposable
|
||||
public class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IObjectReader
|
||||
{
|
||||
private bool disposed = false;
|
||||
|
||||
public AsyncServerStreamingCall<GetResponse> Call { get; private set; } = call;
|
||||
|
||||
public async Task<Object.Object> ReadHeader()
|
||||
internal async Task<Object.Object> ReadHeader()
|
||||
{
|
||||
if (!await Call.ResponseStream.MoveNext())
|
||||
throw new InvalidOperationException("unexpected end of stream");
|
||||
|
@ -29,9 +33,9 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
|
|||
};
|
||||
}
|
||||
|
||||
public async Task<byte[]?> ReadChunk()
|
||||
public async Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!await Call.ResponseStream.MoveNext())
|
||||
if (!await Call.ResponseStream.MoveNext(cancellationToken))
|
||||
return null;
|
||||
|
||||
var response = Call.ResponseStream.Current;
|
||||
|
@ -44,7 +48,13 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
|
|||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (!disposed)
|
||||
{
|
||||
Call.Dispose();
|
||||
GC.SuppressFinalize(this);
|
||||
|
||||
disposed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
|
||||
var obj = await GetObject(request, ctx);
|
||||
|
||||
return obj.ToModel();
|
||||
return obj;
|
||||
}
|
||||
|
||||
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
|
||||
|
@ -310,28 +310,17 @@ internal class ObjectServiceProvider : ContextAccessor
|
|||
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
|
||||
}
|
||||
|
||||
// TODO: add implementation with stream writer!
|
||||
private async Task<Object.Object> GetObject(GetRequest request, Context ctx)
|
||||
private async Task<ModelsV2.Object> GetObject(GetRequest request, Context ctx)
|
||||
{
|
||||
using var stream = GetObjectInit(request, ctx);
|
||||
var reader = GetObjectInit(request, ctx);
|
||||
|
||||
var obj = await stream.ReadHeader();
|
||||
var payload = new byte[obj.Header.PayloadLength];
|
||||
var offset = 0L;
|
||||
var chunk = await stream.ReadChunk();
|
||||
var obj = await reader.ReadHeader();
|
||||
|
||||
while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
|
||||
{
|
||||
var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
|
||||
var @object = obj.ToModel();
|
||||
|
||||
Array.Copy(chunk, 0, payload, offset, length);
|
||||
offset += chunk.Length;
|
||||
chunk = await stream.ReadChunk();
|
||||
}
|
||||
@object.ObjectReader = reader;
|
||||
|
||||
obj.Payload = ByteString.CopyFrom(payload);
|
||||
|
||||
return obj;
|
||||
return @object;
|
||||
}
|
||||
|
||||
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace FrostFS.SDK.ModelsV2;
|
||||
|
||||
public interface IObjectReader : IDisposable
|
||||
{
|
||||
Task<byte[]?> ReadChunk();
|
||||
Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ public class Object
|
|||
|
||||
public byte[] Payload { get; set; }
|
||||
|
||||
public IObjectReader? ObjectReader { get; set; }
|
||||
|
||||
public Signature? Signature { get; set; }
|
||||
|
||||
public void SetParent(LargeObject largeObject)
|
||||
|
|
|
@ -1,13 +1,7 @@
|
|||
namespace FrostFS.SDK.ModelsV2;
|
||||
|
||||
public class ObjectAttribute
|
||||
public class ObjectAttribute(string key, string value)
|
||||
{
|
||||
public string Key { get; set; }
|
||||
public string Value { get; set; }
|
||||
|
||||
public ObjectAttribute(string key, string value)
|
||||
{
|
||||
Key = key;
|
||||
Value = value;
|
||||
}
|
||||
public string Key { get; set; } = key;
|
||||
public string Value { get; set; } = value;
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
using System.Security.Cryptography;
|
||||
using FrostFS.SDK.ClientV2;
|
||||
using FrostFS.SDK.ClientV2.Interfaces;
|
||||
using FrostFS.SDK.ModelsV2;
|
||||
|
@ -82,13 +83,17 @@ public class ClientTestLive
|
|||
|
||||
Assert.NotNull(container);
|
||||
|
||||
Random rnd = new();
|
||||
var bytes = new byte[6*1024*1024 + 100];
|
||||
rnd.NextBytes(bytes);
|
||||
|
||||
var param = new PutObjectParameters
|
||||
{
|
||||
Header = new ObjectHeader(
|
||||
containerId: containerId,
|
||||
type: ObjectType.Regular,
|
||||
new ObjectAttribute("fileName", "test")),
|
||||
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
|
||||
Payload = new MemoryStream(bytes),
|
||||
ClientCut = false
|
||||
};
|
||||
|
||||
|
@ -102,7 +107,7 @@ public class ClientTestLive
|
|||
hasObject = true;
|
||||
|
||||
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
|
||||
Assert.Equal(10u, objHeader.PayloadLength);
|
||||
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
|
||||
Assert.Single(objHeader.Attributes);
|
||||
Assert.Equal("fileName", objHeader.Attributes.First().Key);
|
||||
Assert.Equal("test", objHeader.Attributes.First().Value);
|
||||
|
@ -112,7 +117,16 @@ public class ClientTestLive
|
|||
|
||||
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
|
||||
|
||||
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
|
||||
var downloadedBytes = new byte[@object.Header.PayloadLength];
|
||||
MemoryStream ms = new(downloadedBytes);
|
||||
|
||||
byte[]? chunk = null;
|
||||
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
|
||||
{
|
||||
ms.Write(chunk);
|
||||
}
|
||||
|
||||
Assert.Equal(MD5.HashData(bytes), MD5.HashData(downloadedBytes));
|
||||
|
||||
await Cleanup(fsClient);
|
||||
|
||||
|
@ -145,13 +159,17 @@ public class ClientTestLive
|
|||
|
||||
Assert.NotNull(container);
|
||||
|
||||
Random rnd = new();
|
||||
var bytes = new byte[6*1024*1024 + 100];
|
||||
rnd.NextBytes(bytes);
|
||||
|
||||
var param = new PutObjectParameters
|
||||
{
|
||||
Header = new ObjectHeader(
|
||||
containerId: containerId,
|
||||
type: ObjectType.Regular,
|
||||
new ObjectAttribute("fileName", "test")),
|
||||
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
|
||||
Payload = new MemoryStream(bytes),
|
||||
ClientCut = true
|
||||
};
|
||||
|
||||
|
@ -165,7 +183,7 @@ public class ClientTestLive
|
|||
hasObject = true;
|
||||
|
||||
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
|
||||
Assert.Equal(10u, objHeader.PayloadLength);
|
||||
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
|
||||
Assert.Single(objHeader.Attributes);
|
||||
Assert.Equal("fileName", objHeader.Attributes.First().Key);
|
||||
Assert.Equal("test", objHeader.Attributes.First().Value);
|
||||
|
@ -175,7 +193,14 @@ public class ClientTestLive
|
|||
|
||||
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
|
||||
|
||||
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
|
||||
var downloadedBytes = new byte[@object.Header.PayloadLength];
|
||||
MemoryStream ms = new(downloadedBytes);
|
||||
|
||||
byte[]? chunk = null;
|
||||
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
|
||||
{
|
||||
ms.Write(chunk);
|
||||
}
|
||||
|
||||
await Cleanup(fsClient);
|
||||
|
||||
|
|
Loading…
Reference in a new issue