[#13] Change GetObject result to stream #12
6 changed files with 60 additions and 39 deletions
|
@ -4,14 +4,18 @@ using System.Threading.Tasks;
|
||||||
using Grpc.Core;
|
using Grpc.Core;
|
||||||
|
|
||||||
using FrostFS.Object;
|
using FrostFS.Object;
|
||||||
|
using FrostFS.SDK.ModelsV2;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
namespace FrostFS.SDK.ClientV2;
|
namespace FrostFS.SDK.ClientV2;
|
||||||
|
|
||||||
internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDisposable
|
public class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IObjectReader
|
||||||
{
|
{
|
||||||
|
private bool disposed = false;
|
||||||
|
|
||||||
public AsyncServerStreamingCall<GetResponse> Call { get; private set; } = call;
|
public AsyncServerStreamingCall<GetResponse> Call { get; private set; } = call;
|
||||||
|
|
||||||
public async Task<Object.Object> ReadHeader()
|
internal async Task<Object.Object> ReadHeader()
|
||||||
{
|
{
|
||||||
if (!await Call.ResponseStream.MoveNext())
|
if (!await Call.ResponseStream.MoveNext())
|
||||||
throw new InvalidOperationException("unexpected end of stream");
|
throw new InvalidOperationException("unexpected end of stream");
|
||||||
|
@ -29,9 +33,9 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<byte[]?> ReadChunk()
|
public async Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
if (!await Call.ResponseStream.MoveNext())
|
if (!await Call.ResponseStream.MoveNext(cancellationToken))
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
var response = Call.ResponseStream.Current;
|
var response = Call.ResponseStream.Current;
|
||||||
|
@ -45,6 +49,12 @@ internal class ObjectReader(AsyncServerStreamingCall<GetResponse> call) : IDispo
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
Call.Dispose();
|
if (!disposed)
|
||||||
|
{
|
||||||
|
Call.Dispose();
|
||||||
|
GC.SuppressFinalize(this);
|
||||||
|
|
||||||
|
disposed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ internal class ObjectServiceProvider : ContextAccessor
|
||||||
|
|
||||||
var obj = await GetObject(request, ctx);
|
var obj = await GetObject(request, ctx);
|
||||||
|
|
||||||
return obj.ToModel();
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
|
internal Task<ObjectId> PutObjectAsync(PutObjectParameters parameters, Context ctx)
|
||||||
|
@ -310,28 +310,17 @@ internal class ObjectServiceProvider : ContextAccessor
|
||||||
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
|
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add implementation with stream writer!
|
private async Task<ModelsV2.Object> GetObject(GetRequest request, Context ctx)
|
||||||
private async Task<Object.Object> GetObject(GetRequest request, Context ctx)
|
|
||||||
{
|
{
|
||||||
using var stream = GetObjectInit(request, ctx);
|
var reader = GetObjectInit(request, ctx);
|
||||||
|
|
||||||
var obj = await stream.ReadHeader();
|
var obj = await reader.ReadHeader();
|
||||||
var payload = new byte[obj.Header.PayloadLength];
|
|
||||||
var offset = 0L;
|
|
||||||
var chunk = await stream.ReadChunk();
|
|
||||||
|
|
||||||
while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
|
var @object = obj.ToModel();
|
||||||
{
|
|
||||||
var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
|
|
||||||
|
|
||||||
Array.Copy(chunk, 0, payload, offset, length);
|
@object.ObjectReader = reader;
|
||||||
offset += chunk.Length;
|
|
||||||
chunk = await stream.ReadChunk();
|
|
||||||
}
|
|
||||||
|
|
||||||
obj.Payload = ByteString.CopyFrom(payload);
|
return @object;
|
||||||
|
|
||||||
return obj;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
|
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
using System;
|
using System;
|
||||||
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace FrostFS.SDK.ModelsV2;
|
namespace FrostFS.SDK.ModelsV2;
|
||||||
|
|
||||||
public interface IObjectReader : IDisposable
|
public interface IObjectReader : IDisposable
|
||||||
{
|
{
|
||||||
Task<byte[]?> ReadChunk();
|
Task<byte[]?> ReadChunk(CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ public class Object
|
||||||
|
|
||||||
public byte[] Payload { get; set; }
|
public byte[] Payload { get; set; }
|
||||||
|
|
||||||
|
public IObjectReader? ObjectReader { get; set; }
|
||||||
|
|
||||||
public Signature? Signature { get; set; }
|
public Signature? Signature { get; set; }
|
||||||
|
|
||||||
public void SetParent(LargeObject largeObject)
|
public void SetParent(LargeObject largeObject)
|
||||||
|
|
|
@ -1,13 +1,7 @@
|
||||||
namespace FrostFS.SDK.ModelsV2;
|
namespace FrostFS.SDK.ModelsV2;
|
||||||
|
|
||||||
public class ObjectAttribute
|
public class ObjectAttribute(string key, string value)
|
||||||
{
|
{
|
||||||
public string Key { get; set; }
|
public string Key { get; set; } = key;
|
||||||
public string Value { get; set; }
|
public string Value { get; set; } = value;
|
||||||
|
|
||||||
public ObjectAttribute(string key, string value)
|
|
||||||
{
|
|
||||||
Key = key;
|
|
||||||
Value = value;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
using System.Security.Cryptography;
|
||||||
using FrostFS.SDK.ClientV2;
|
using FrostFS.SDK.ClientV2;
|
||||||
using FrostFS.SDK.ClientV2.Interfaces;
|
using FrostFS.SDK.ClientV2.Interfaces;
|
||||||
using FrostFS.SDK.ModelsV2;
|
using FrostFS.SDK.ModelsV2;
|
||||||
|
@ -82,13 +83,17 @@ public class ClientTestLive
|
||||||
|
|
||||||
Assert.NotNull(container);
|
Assert.NotNull(container);
|
||||||
|
|
||||||
|
Random rnd = new();
|
||||||
|
var bytes = new byte[6*1024*1024 + 100];
|
||||||
|
rnd.NextBytes(bytes);
|
||||||
|
|
||||||
var param = new PutObjectParameters
|
var param = new PutObjectParameters
|
||||||
{
|
{
|
||||||
Header = new ObjectHeader(
|
Header = new ObjectHeader(
|
||||||
containerId: containerId,
|
containerId: containerId,
|
||||||
type: ObjectType.Regular,
|
type: ObjectType.Regular,
|
||||||
new ObjectAttribute("fileName", "test")),
|
new ObjectAttribute("fileName", "test")),
|
||||||
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
|
Payload = new MemoryStream(bytes),
|
||||||
ClientCut = false
|
ClientCut = false
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -102,7 +107,7 @@ public class ClientTestLive
|
||||||
hasObject = true;
|
hasObject = true;
|
||||||
|
|
||||||
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
|
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
|
||||||
Assert.Equal(10u, objHeader.PayloadLength);
|
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
|
||||||
Assert.Single(objHeader.Attributes);
|
Assert.Single(objHeader.Attributes);
|
||||||
Assert.Equal("fileName", objHeader.Attributes.First().Key);
|
Assert.Equal("fileName", objHeader.Attributes.First().Key);
|
||||||
Assert.Equal("test", objHeader.Attributes.First().Value);
|
Assert.Equal("test", objHeader.Attributes.First().Value);
|
||||||
|
@ -112,7 +117,16 @@ public class ClientTestLive
|
||||||
|
|
||||||
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
|
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
|
||||||
|
|
||||||
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
|
var downloadedBytes = new byte[@object.Header.PayloadLength];
|
||||||
|
MemoryStream ms = new(downloadedBytes);
|
||||||
|
|
||||||
|
byte[]? chunk = null;
|
||||||
|
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
|
||||||
|
{
|
||||||
|
ms.Write(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Equal(MD5.HashData(bytes), MD5.HashData(downloadedBytes));
|
||||||
|
|
||||||
await Cleanup(fsClient);
|
await Cleanup(fsClient);
|
||||||
|
|
||||||
|
@ -145,13 +159,17 @@ public class ClientTestLive
|
||||||
|
|
||||||
Assert.NotNull(container);
|
Assert.NotNull(container);
|
||||||
|
|
||||||
|
Random rnd = new();
|
||||||
|
var bytes = new byte[6*1024*1024 + 100];
|
||||||
|
rnd.NextBytes(bytes);
|
||||||
|
|
||||||
var param = new PutObjectParameters
|
var param = new PutObjectParameters
|
||||||
{
|
{
|
||||||
Header = new ObjectHeader(
|
Header = new ObjectHeader(
|
||||||
containerId: containerId,
|
containerId: containerId,
|
||||||
type: ObjectType.Regular,
|
type: ObjectType.Regular,
|
||||||
new ObjectAttribute("fileName", "test")),
|
new ObjectAttribute("fileName", "test")),
|
||||||
Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
|
Payload = new MemoryStream(bytes),
|
||||||
ClientCut = true
|
ClientCut = true
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -165,7 +183,7 @@ public class ClientTestLive
|
||||||
hasObject = true;
|
hasObject = true;
|
||||||
|
|
||||||
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
|
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
|
||||||
Assert.Equal(10u, objHeader.PayloadLength);
|
Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
|
||||||
Assert.Single(objHeader.Attributes);
|
Assert.Single(objHeader.Attributes);
|
||||||
Assert.Equal("fileName", objHeader.Attributes.First().Key);
|
Assert.Equal("fileName", objHeader.Attributes.First().Key);
|
||||||
Assert.Equal("test", objHeader.Attributes.First().Value);
|
Assert.Equal("test", objHeader.Attributes.First().Value);
|
||||||
|
@ -175,7 +193,14 @@ public class ClientTestLive
|
||||||
|
|
||||||
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
|
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
|
||||||
|
|
||||||
Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
|
var downloadedBytes = new byte[@object.Header.PayloadLength];
|
||||||
|
MemoryStream ms = new(downloadedBytes);
|
||||||
|
|
||||||
|
byte[]? chunk = null;
|
||||||
|
while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
|
||||||
|
{
|
||||||
|
ms.Write(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
await Cleanup(fsClient);
|
await Cleanup(fsClient);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue