[#1] Add object Get operation + code quality

Signed-off-by: Ivan Pchelintsev <i.pchelintsev@yadro.com>
This commit is contained in:
Ivan Pchelintsev 2024-05-16 17:31:48 +03:00
parent 9aa93d123d
commit b307c2c899
17 changed files with 182 additions and 99 deletions

View file

@ -29,11 +29,64 @@ public partial class Client
return await _objectServiceClient.HeadAsync(request);
}
// public async Task<GetResponse> GetObjectAsync(ContainerID cid, ObjectID oid)
// {
//
// }
public async Task<Object.Object> GetObjectAsync(ContainerID cid, ObjectID oid)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
var request = new GetRequest
{
Body = new GetRequest.Types.Body
{
Raw = false,
Address = new Address
{
ContainerId = cid,
ObjectId = oid
},
}
};
request.AddMetaHeader();
request.AddObjectSessionToken(
sessionToken,
cid,
oid,
ObjectSessionContext.Types.Verb.Get,
_key
);
request.Sign(_key);
return await GetObject(request);
}
private async Task<Object.Object> GetObject(GetRequest request)
{
using var stream = GetObjectInit(request);
var obj = await stream.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
var offset = 0;
var chunk = await stream.ReadChunk();
while (chunk is not null)
{
chunk.CopyTo(payload, offset);
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
obj.Payload = ByteString.CopyFrom(payload);
return obj;
}
private ObjectReader GetObjectInit(GetRequest initRequest)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
return new ObjectReader
{
Call = _objectServiceClient.Get(initRequest)
};
}
public async Task<PutResponse> PutObjectAsync(Header header, Stream payload)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
@ -63,7 +116,7 @@ public partial class Client
);
request.Sign(_key);
using var stream = await InitObject(request);
using var stream = await PutObjectInit(request);
var buffer = new byte[Constants.ObjectChunkSize];
var bufferLength = payload.Read(buffer, 0, Constants.ObjectChunkSize);
while (bufferLength > 0)
@ -81,7 +134,7 @@ public partial class Client
return await stream.Close();
}
private async Task<ObjectStreamer> InitObject(PutRequest initRequest)
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest)
{
if (initRequest is null)
{
@ -112,14 +165,47 @@ public partial class Client
}
}
internal class ObjectStreamer : IDisposable
internal class ObjectReader : IDisposable
{
public AsyncClientStreamingCall<PutRequest, PutResponse> Call { get; init; }
public AsyncServerStreamingCall<GetResponse> Call { get; init; }
public async Task<Object.Object> ReadHeader()
{
if (!await Call.ResponseStream.MoveNext())
{
throw new InvalidOperationException("unexpect end of stream");
}
var response = Call.ResponseStream.Current;
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Init)
throw new InvalidOperationException("unexpect message type");
return new Object.Object
{
ObjectId = response.Body.Init.ObjectId,
Header = response.Body.Init.Header,
};
}
public async Task<byte[]?> ReadChunk()
{
if (!await Call.ResponseStream.MoveNext())
{
return null;
}
var response = Call.ResponseStream.Current;
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Chunk)
throw new InvalidOperationException("unexpect message type");
return response.Body.Chunk.ToByteArray();
}
public void Dispose()
{
Call.Dispose();
}
}
internal class ObjectStreamer : IDisposable
{
public AsyncClientStreamingCall<PutRequest, PutResponse> Call { get; init; }
public async Task Write(PutRequest request)
{
@ -136,4 +222,9 @@ internal class ObjectStreamer : IDisposable
await Call.RequestStream.CompleteAsync();
return await Call.ResponseAsync;
}
}
public void Dispose()
{
Call.Dispose();
}
}