[#1] Add object Search operation

Signed-off-by: Ivan Pchelintsev <i.pchelintsev@yadro.com>
This commit is contained in:
Ivan Pchelintsev 2024-05-17 11:52:38 +03:00
parent 63f91ac627
commit bb55d093fa
5 changed files with 156 additions and 4 deletions

View file

@ -72,17 +72,18 @@ public partial class Client
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
obj.Payload = ByteString.CopyFrom(payload);
return obj;
}
private ObjectReader GetObjectInit(GetRequest initRequest)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
return new ObjectReader
{
Call = _objectServiceClient.Get(initRequest)
@ -90,6 +91,16 @@ public partial class Client
}
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, Stream payload)
{
return await PutObject(header, payload);
}
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, byte[] payload)
{
return await PutObject(header, new MemoryStream(payload));
}
private async Task<ObjectId> PutObject(ObjectHeader header, Stream payload)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
var hdr = header.ToGrpcMessage();
@ -167,6 +178,56 @@ public partial class Client
request.Sign(_key);
await _objectServiceClient.DeleteAsync(request);
}
public async Task<ObjectId[]> SearchObjectAsync(ContainerId cid, params ObjectFilter[] filters)
{
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
{
ContainerId = cid.ToGrpcMessage(),
Filters = { },
Version = 1
}
};
foreach (var filter in filters)
{
request.Body.Filters.Add(filter.ToGrpcMessage());
}
;
request.AddMetaHeader();
request.Sign(_key);
var ids = await SearchObject(request);
return ids.Select(oid => ObjectId.FromHash(oid.Value.ToByteArray())).ToArray();
}
private async Task<List<ObjectID>> SearchObject(SearchRequest request)
{
var objectsIds = new List<ObjectID> { };
using var stream = SearchObjectInit(request);
var ids = await stream.Read();
while (ids is not null)
{
objectsIds.AddRange(ids);
ids = await stream.Read();
}
return objectsIds;
}
private SearchReader SearchObjectInit(SearchRequest initRequest)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
return new SearchReader
{
Call = _objectServiceClient.Search(initRequest)
};
}
}
internal class ObjectReader : IDisposable
@ -179,6 +240,7 @@ internal class ObjectReader : IDisposable
{
throw new InvalidOperationException("unexpect end of stream");
}
var response = Call.ResponseStream.Current;
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Init)
throw new InvalidOperationException("unexpect message type");
@ -195,6 +257,7 @@ internal class ObjectReader : IDisposable
{
return null;
}
var response = Call.ResponseStream.Current;
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Chunk)
throw new InvalidOperationException("unexpect message type");
@ -226,9 +289,30 @@ internal class ObjectStreamer : IDisposable
await Call.RequestStream.CompleteAsync();
return await Call.ResponseAsync;
}
public void Dispose()
{
Call.Dispose();
}
}
internal class SearchReader : IDisposable
{
public AsyncServerStreamingCall<SearchResponse> Call { get; init; }
public async Task<List<ObjectID>?> Read()
{
if (!await Call.ResponseStream.MoveNext())
{
return null;
}
var response = Call.ResponseStream.Current;
return response.Body?.IdList.ToList();
}
public void Dispose()
{
Call.Dispose();
}
}