From f5d1899dd24696c7c3dc3825c2f42e3b375e9291 Mon Sep 17 00:00:00 2001
From: Pavel Gross
Date: Wed, 26 Jun 2024 15:15:58 +0300
Subject: [PATCH 1/3] [#13] Change GetObject result to stream
Signed-off-by: Pavel Gross
---
.../Services/ObjectReader.cs | 15 +++++--
.../Services/ObjectServiceProvider.cs | 43 ++++++++++++-------
src/FrostFS.SDK.ModelsV2/Object/Object.cs | 2 +
.../Object/ObjectAttribute.cs | 12 ++----
src/FrostFS.SDK.Tests/ClientTestLive.cs | 37 +++++++++++++---
5 files changed, 75 insertions(+), 34 deletions(-)
diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs
index 73e982a..19c65d8 100644
--- a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs
+++ b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs
@@ -4,14 +4,17 @@ using System.Threading.Tasks;
using Grpc.Core;
using FrostFS.Object;
+using FrostFS.SDK.ModelsV2;
namespace FrostFS.SDK.ClientV2;
-internal class ObjectReader(AsyncServerStreamingCall call) : IDisposable
+public class ObjectReader(AsyncServerStreamingCall call) : IObjectReader
{
+ private bool disposed = false;
+
public AsyncServerStreamingCall Call { get; private set; } = call;
- public async Task ReadHeader()
+ internal async Task ReadHeader()
{
if (!await Call.ResponseStream.MoveNext())
throw new InvalidOperationException("unexpected end of stream");
@@ -45,6 +48,12 @@ internal class ObjectReader(AsyncServerStreamingCall call) : IDispo
public void Dispose()
{
- Call.Dispose();
+ if (!disposed)
+ {
+ Call.Dispose();
+ GC.SuppressFinalize(this);
+
+ disposed = true;
+ }
}
}
diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
index 8619cdd..bf35ad7 100644
--- a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
+++ b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
@@ -80,7 +80,7 @@ internal class ObjectServiceProvider : ContextAccessor
var obj = await GetObject(request, ctx);
- return obj.ToModel();
+ return obj;
}
internal Task PutObjectAsync(PutObjectParameters parameters, Context ctx)
@@ -311,27 +311,38 @@ internal class ObjectServiceProvider : ContextAccessor
}
// TODO: add implementation with stream writer!
- private async Task GetObject(GetRequest request, Context ctx)
+ private async Task GetObject(GetRequest request, Context ctx)
{
- using var stream = GetObjectInit(request, ctx);
+ var reader = GetObjectInit(request, ctx);
- var obj = await stream.ReadHeader();
- var payload = new byte[obj.Header.PayloadLength];
- var offset = 0L;
- var chunk = await stream.ReadChunk();
+ var obj = await reader.ReadHeader();
- while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
- {
- var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
+ var @object = obj.ToModel();
- Array.Copy(chunk, 0, payload, offset, length);
- offset += chunk.Length;
- chunk = await stream.ReadChunk();
- }
+ @object.ObjectReader = reader;
- obj.Payload = ByteString.CopyFrom(payload);
+ return @object;
+
+ // obj.
- return obj;
+ // return obj.ToModel();
+
+ // var payload = new byte[obj.Header.PayloadLength];
+ // var offset = 0L;
+ // var chunk = await stream.ReadChunk();
+
+ // while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
+ // {
+ // var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
+
+ // Array.Copy(chunk, 0, payload, offset, length);
+ // offset += chunk.Length;
+ // chunk = await stream.ReadChunk();
+ // }
+
+ // obj.Payload = ByteString.CopyFrom(payload);
+
+ // return obj;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
diff --git a/src/FrostFS.SDK.ModelsV2/Object/Object.cs b/src/FrostFS.SDK.ModelsV2/Object/Object.cs
index 51c4630..e129601 100644
--- a/src/FrostFS.SDK.ModelsV2/Object/Object.cs
+++ b/src/FrostFS.SDK.ModelsV2/Object/Object.cs
@@ -27,6 +27,8 @@ public class Object
}
public byte[] Payload { get; set; }
+
+ public IObjectReader? ObjectReader { get; set; }
public Signature? Signature { get; set; }
diff --git a/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs b/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs
index b114f2b..92d38b2 100644
--- a/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs
+++ b/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs
@@ -1,13 +1,7 @@
namespace FrostFS.SDK.ModelsV2;
-public class ObjectAttribute
+public class ObjectAttribute(string key, string value)
{
- public string Key { get; set; }
- public string Value { get; set; }
-
- public ObjectAttribute(string key, string value)
- {
- Key = key;
- Value = value;
- }
+ public string Key { get; set; } = key;
+ public string Value { get; set; } = value;
}
diff --git a/src/FrostFS.SDK.Tests/ClientTestLive.cs b/src/FrostFS.SDK.Tests/ClientTestLive.cs
index 70dda66..f596472 100644
--- a/src/FrostFS.SDK.Tests/ClientTestLive.cs
+++ b/src/FrostFS.SDK.Tests/ClientTestLive.cs
@@ -1,3 +1,4 @@
+using System.Security.Cryptography;
using FrostFS.SDK.ClientV2;
using FrostFS.SDK.ClientV2.Interfaces;
using FrostFS.SDK.ModelsV2;
@@ -82,13 +83,17 @@ public class ClientTestLive
Assert.NotNull(container);
+ Random rnd = new();
+ var bytes = new byte[6*1024*1024 + 100];
+ rnd.NextBytes(bytes);
+
var param = new PutObjectParameters
{
Header = new ObjectHeader(
containerId: containerId,
type: ObjectType.Regular,
new ObjectAttribute("fileName", "test")),
- Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ Payload = new MemoryStream(bytes),
ClientCut = false
};
@@ -102,7 +107,7 @@ public class ClientTestLive
hasObject = true;
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
- Assert.Equal(10u, objHeader.PayloadLength);
+ Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value);
@@ -112,7 +117,16 @@ public class ClientTestLive
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
- Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
+ var downloadedBytes = new byte[@object.Header.PayloadLength];
+ MemoryStream ms = new(downloadedBytes);
+
+ byte[]? chunk = null;
+ while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
+ {
+ ms.Write(chunk);
+ }
+
+ Assert.Equal(MD5.HashData(bytes), MD5.HashData(downloadedBytes));
await Cleanup(fsClient);
@@ -145,13 +159,17 @@ public class ClientTestLive
Assert.NotNull(container);
+ Random rnd = new();
+ var bytes = new byte[6*1024*1024 + 100];
+ rnd.NextBytes(bytes);
+
var param = new PutObjectParameters
{
Header = new ObjectHeader(
containerId: containerId,
type: ObjectType.Regular,
new ObjectAttribute("fileName", "test")),
- Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ Payload = new MemoryStream(bytes),
ClientCut = true
};
@@ -165,7 +183,7 @@ public class ClientTestLive
hasObject = true;
var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId);
- Assert.Equal(10u, objHeader.PayloadLength);
+ Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength);
Assert.Single(objHeader.Attributes);
Assert.Equal("fileName", objHeader.Attributes.First().Key);
Assert.Equal("test", objHeader.Attributes.First().Value);
@@ -175,7 +193,14 @@ public class ClientTestLive
var @object = await fsClient.GetObjectAsync(containerId, objectId!);
- Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload);
+ var downloadedBytes = new byte[@object.Header.PayloadLength];
+ MemoryStream ms = new(downloadedBytes);
+
+ byte[]? chunk = null;
+ while ((chunk = await @object.ObjectReader.ReadChunk()) != null)
+ {
+ ms.Write(chunk);
+ }
await Cleanup(fsClient);
--
2.45.2
From 17492ee871b5479d2a1946218f6102b7c5ff28f6 Mon Sep 17 00:00:00 2001
From: Pavel Gross
Date: Wed, 26 Jun 2024 15:24:15 +0300
Subject: [PATCH 2/3] [#13] Add cancellation token to GetChank method
Signed-off-by: Pavel Gross
---
src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs | 7 ++++---
src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs | 3 ++-
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs
index 19c65d8..647a7ad 100644
--- a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs
+++ b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs
@@ -5,13 +5,14 @@ using Grpc.Core;
using FrostFS.Object;
using FrostFS.SDK.ModelsV2;
+using System.Threading;
namespace FrostFS.SDK.ClientV2;
public class ObjectReader(AsyncServerStreamingCall call) : IObjectReader
{
private bool disposed = false;
-
+
public AsyncServerStreamingCall Call { get; private set; } = call;
internal async Task ReadHeader()
@@ -32,9 +33,9 @@ public class ObjectReader(AsyncServerStreamingCall call) : IObjectR
};
}
- public async Task ReadChunk()
+ public async Task ReadChunk(CancellationToken cancellationToken = default)
{
- if (!await Call.ResponseStream.MoveNext())
+ if (!await Call.ResponseStream.MoveNext(cancellationToken))
return null;
var response = Call.ResponseStream.Current;
diff --git a/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs b/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs
index 3822dce..4c5ce3d 100644
--- a/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs
+++ b/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs
@@ -1,9 +1,10 @@
using System;
+using System.Threading;
using System.Threading.Tasks;
namespace FrostFS.SDK.ModelsV2;
public interface IObjectReader : IDisposable
{
- Task ReadChunk();
+ Task ReadChunk(CancellationToken cancellationToken = default);
}
--
2.45.2
From 605463ec24f98d1245717e12e5257bcdc129977b Mon Sep 17 00:00:00 2001
From: Pavel Gross
Date: Thu, 27 Jun 2024 12:38:14 +0300
Subject: [PATCH 3/3] [#13] Drop comments
Signed-off-by: Pavel Gross
---
.../Services/ObjectServiceProvider.cs | 24 +------------------
1 file changed, 1 insertion(+), 23 deletions(-)
diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
index bf35ad7..01999a5 100644
--- a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
+++ b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs
@@ -310,7 +310,6 @@ internal class ObjectServiceProvider : ContextAccessor
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
- // TODO: add implementation with stream writer!
private async Task GetObject(GetRequest request, Context ctx)
{
var reader = GetObjectInit(request, ctx);
@@ -321,28 +320,7 @@ internal class ObjectServiceProvider : ContextAccessor
@object.ObjectReader = reader;
- return @object;
-
- // obj.
-
- // return obj.ToModel();
-
- // var payload = new byte[obj.Header.PayloadLength];
- // var offset = 0L;
- // var chunk = await stream.ReadChunk();
-
- // while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
- // {
- // var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
-
- // Array.Copy(chunk, 0, payload, offset, length);
- // offset += chunk.Length;
- // chunk = await stream.ReadChunk();
- // }
-
- // obj.Payload = ByteString.CopyFrom(payload);
-
- // return obj;
+ return @object;
}
private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx)
--
2.45.2