From f5d1899dd24696c7c3dc3825c2f42e3b375e9291 Mon Sep 17 00:00:00 2001 From: Pavel Gross Date: Wed, 26 Jun 2024 15:15:58 +0300 Subject: [PATCH 1/3] [#13] Change GetObject result to stream Signed-off-by: Pavel Gross --- .../Services/ObjectReader.cs | 15 +++++-- .../Services/ObjectServiceProvider.cs | 43 ++++++++++++------- src/FrostFS.SDK.ModelsV2/Object/Object.cs | 2 + .../Object/ObjectAttribute.cs | 12 ++---- src/FrostFS.SDK.Tests/ClientTestLive.cs | 37 +++++++++++++--- 5 files changed, 75 insertions(+), 34 deletions(-) diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs index 73e982a..19c65d8 100644 --- a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs +++ b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs @@ -4,14 +4,17 @@ using System.Threading.Tasks; using Grpc.Core; using FrostFS.Object; +using FrostFS.SDK.ModelsV2; namespace FrostFS.SDK.ClientV2; -internal class ObjectReader(AsyncServerStreamingCall call) : IDisposable +public class ObjectReader(AsyncServerStreamingCall call) : IObjectReader { + private bool disposed = false; + public AsyncServerStreamingCall Call { get; private set; } = call; - public async Task ReadHeader() + internal async Task ReadHeader() { if (!await Call.ResponseStream.MoveNext()) throw new InvalidOperationException("unexpected end of stream"); @@ -45,6 +48,12 @@ internal class ObjectReader(AsyncServerStreamingCall call) : IDispo public void Dispose() { - Call.Dispose(); + if (!disposed) + { + Call.Dispose(); + GC.SuppressFinalize(this); + + disposed = true; + } } } diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs index 8619cdd..bf35ad7 100644 --- a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs +++ b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs @@ -80,7 +80,7 @@ internal class ObjectServiceProvider : ContextAccessor var obj = await GetObject(request, ctx); - return obj.ToModel(); + return obj; } internal Task PutObjectAsync(PutObjectParameters parameters, Context ctx) @@ -311,27 +311,38 @@ internal class ObjectServiceProvider : ContextAccessor } // TODO: add implementation with stream writer! - private async Task GetObject(GetRequest request, Context ctx) + private async Task GetObject(GetRequest request, Context ctx) { - using var stream = GetObjectInit(request, ctx); + var reader = GetObjectInit(request, ctx); - var obj = await stream.ReadHeader(); - var payload = new byte[obj.Header.PayloadLength]; - var offset = 0L; - var chunk = await stream.ReadChunk(); + var obj = await reader.ReadHeader(); - while (chunk is not null && (ulong)offset < obj.Header.PayloadLength) - { - var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length); + var @object = obj.ToModel(); - Array.Copy(chunk, 0, payload, offset, length); - offset += chunk.Length; - chunk = await stream.ReadChunk(); - } + @object.ObjectReader = reader; - obj.Payload = ByteString.CopyFrom(payload); + return @object; + + // obj. - return obj; + // return obj.ToModel(); + + // var payload = new byte[obj.Header.PayloadLength]; + // var offset = 0L; + // var chunk = await stream.ReadChunk(); + + // while (chunk is not null && (ulong)offset < obj.Header.PayloadLength) + // { + // var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length); + + // Array.Copy(chunk, 0, payload, offset, length); + // offset += chunk.Length; + // chunk = await stream.ReadChunk(); + // } + + // obj.Payload = ByteString.CopyFrom(payload); + + // return obj; } private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx) diff --git a/src/FrostFS.SDK.ModelsV2/Object/Object.cs b/src/FrostFS.SDK.ModelsV2/Object/Object.cs index 51c4630..e129601 100644 --- a/src/FrostFS.SDK.ModelsV2/Object/Object.cs +++ b/src/FrostFS.SDK.ModelsV2/Object/Object.cs @@ -27,6 +27,8 @@ public class Object } public byte[] Payload { get; set; } + + public IObjectReader? ObjectReader { get; set; } public Signature? Signature { get; set; } diff --git a/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs b/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs index b114f2b..92d38b2 100644 --- a/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs +++ b/src/FrostFS.SDK.ModelsV2/Object/ObjectAttribute.cs @@ -1,13 +1,7 @@ namespace FrostFS.SDK.ModelsV2; -public class ObjectAttribute +public class ObjectAttribute(string key, string value) { - public string Key { get; set; } - public string Value { get; set; } - - public ObjectAttribute(string key, string value) - { - Key = key; - Value = value; - } + public string Key { get; set; } = key; + public string Value { get; set; } = value; } diff --git a/src/FrostFS.SDK.Tests/ClientTestLive.cs b/src/FrostFS.SDK.Tests/ClientTestLive.cs index 70dda66..f596472 100644 --- a/src/FrostFS.SDK.Tests/ClientTestLive.cs +++ b/src/FrostFS.SDK.Tests/ClientTestLive.cs @@ -1,3 +1,4 @@ +using System.Security.Cryptography; using FrostFS.SDK.ClientV2; using FrostFS.SDK.ClientV2.Interfaces; using FrostFS.SDK.ModelsV2; @@ -82,13 +83,17 @@ public class ClientTestLive Assert.NotNull(container); + Random rnd = new(); + var bytes = new byte[6*1024*1024 + 100]; + rnd.NextBytes(bytes); + var param = new PutObjectParameters { Header = new ObjectHeader( containerId: containerId, type: ObjectType.Regular, new ObjectAttribute("fileName", "test")), - Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + Payload = new MemoryStream(bytes), ClientCut = false }; @@ -102,7 +107,7 @@ public class ClientTestLive hasObject = true; var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId); - Assert.Equal(10u, objHeader.PayloadLength); + Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength); Assert.Single(objHeader.Attributes); Assert.Equal("fileName", objHeader.Attributes.First().Key); Assert.Equal("test", objHeader.Attributes.First().Value); @@ -112,7 +117,16 @@ public class ClientTestLive var @object = await fsClient.GetObjectAsync(containerId, objectId!); - Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload); + var downloadedBytes = new byte[@object.Header.PayloadLength]; + MemoryStream ms = new(downloadedBytes); + + byte[]? chunk = null; + while ((chunk = await @object.ObjectReader.ReadChunk()) != null) + { + ms.Write(chunk); + } + + Assert.Equal(MD5.HashData(bytes), MD5.HashData(downloadedBytes)); await Cleanup(fsClient); @@ -145,13 +159,17 @@ public class ClientTestLive Assert.NotNull(container); + Random rnd = new(); + var bytes = new byte[6*1024*1024 + 100]; + rnd.NextBytes(bytes); + var param = new PutObjectParameters { Header = new ObjectHeader( containerId: containerId, type: ObjectType.Regular, new ObjectAttribute("fileName", "test")), - Payload = new MemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + Payload = new MemoryStream(bytes), ClientCut = true }; @@ -165,7 +183,7 @@ public class ClientTestLive hasObject = true; var objHeader = await fsClient.GetObjectHeadAsync(containerId, objectId); - Assert.Equal(10u, objHeader.PayloadLength); + Assert.Equal((ulong)bytes.Length, objHeader.PayloadLength); Assert.Single(objHeader.Attributes); Assert.Equal("fileName", objHeader.Attributes.First().Key); Assert.Equal("test", objHeader.Attributes.First().Value); @@ -175,7 +193,14 @@ public class ClientTestLive var @object = await fsClient.GetObjectAsync(containerId, objectId!); - Assert.Equal([1, 2, 3, 4, 5, 6, 7, 8, 9, 0], @object.Payload); + var downloadedBytes = new byte[@object.Header.PayloadLength]; + MemoryStream ms = new(downloadedBytes); + + byte[]? chunk = null; + while ((chunk = await @object.ObjectReader.ReadChunk()) != null) + { + ms.Write(chunk); + } await Cleanup(fsClient); -- 2.45.2 From 17492ee871b5479d2a1946218f6102b7c5ff28f6 Mon Sep 17 00:00:00 2001 From: Pavel Gross Date: Wed, 26 Jun 2024 15:24:15 +0300 Subject: [PATCH 2/3] [#13] Add cancellation token to GetChank method Signed-off-by: Pavel Gross --- src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs | 7 ++++--- src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs index 19c65d8..647a7ad 100644 --- a/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs +++ b/src/FrostFS.SDK.ClientV2/Services/ObjectReader.cs @@ -5,13 +5,14 @@ using Grpc.Core; using FrostFS.Object; using FrostFS.SDK.ModelsV2; +using System.Threading; namespace FrostFS.SDK.ClientV2; public class ObjectReader(AsyncServerStreamingCall call) : IObjectReader { private bool disposed = false; - + public AsyncServerStreamingCall Call { get; private set; } = call; internal async Task ReadHeader() @@ -32,9 +33,9 @@ public class ObjectReader(AsyncServerStreamingCall call) : IObjectR }; } - public async Task ReadChunk() + public async Task ReadChunk(CancellationToken cancellationToken = default) { - if (!await Call.ResponseStream.MoveNext()) + if (!await Call.ResponseStream.MoveNext(cancellationToken)) return null; var response = Call.ResponseStream.Current; diff --git a/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs b/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs index 3822dce..4c5ce3d 100644 --- a/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs +++ b/src/FrostFS.SDK.ModelsV2/Object/IObjectReader.cs @@ -1,9 +1,10 @@ using System; +using System.Threading; using System.Threading.Tasks; namespace FrostFS.SDK.ModelsV2; public interface IObjectReader : IDisposable { - Task ReadChunk(); + Task ReadChunk(CancellationToken cancellationToken = default); } -- 2.45.2 From 605463ec24f98d1245717e12e5257bcdc129977b Mon Sep 17 00:00:00 2001 From: Pavel Gross Date: Thu, 27 Jun 2024 12:38:14 +0300 Subject: [PATCH 3/3] [#13] Drop comments Signed-off-by: Pavel Gross --- .../Services/ObjectServiceProvider.cs | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs index bf35ad7..01999a5 100644 --- a/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs +++ b/src/FrostFS.SDK.ClientV2/Services/ObjectServiceProvider.cs @@ -310,7 +310,6 @@ internal class ObjectServiceProvider : ContextAccessor return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray()); } - // TODO: add implementation with stream writer! private async Task GetObject(GetRequest request, Context ctx) { var reader = GetObjectInit(request, ctx); @@ -321,28 +320,7 @@ internal class ObjectServiceProvider : ContextAccessor @object.ObjectReader = reader; - return @object; - - // obj. - - // return obj.ToModel(); - - // var payload = new byte[obj.Header.PayloadLength]; - // var offset = 0L; - // var chunk = await stream.ReadChunk(); - - // while (chunk is not null && (ulong)offset < obj.Header.PayloadLength) - // { - // var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length); - - // Array.Copy(chunk, 0, payload, offset, length); - // offset += chunk.Length; - // chunk = await stream.ReadChunk(); - // } - - // obj.Payload = ByteString.CopyFrom(payload); - - // return obj; + return @object; } private ObjectReader GetObjectInit(GetRequest initRequest, Context ctx) -- 2.45.2