frostfs-sdk-csharp/src/FrostFS.SDK.ClientV2/Services/Object.cs
Ivan Pchelintsev 8cacbcc8e9 [#1] Return iterator from ListContainersAsync and SearchObjectsAsync
Signed-off-by: Ivan Pchelintsev <i.pchelintsev@yadro.com>
2024-05-22 14:29:20 +03:00

321 lines
No EOL
8.9 KiB
C#

using FrostFS.Object;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using FrostFS.SDK.ModelsV2;
using FrostFS.Session;
using Google.Protobuf;
using Grpc.Core;
namespace FrostFS.SDK.ClientV2;
public partial class Client
{
public async Task<ObjectHeader> GetObjectHeadAsync(ContainerId cid, ObjectId oid)
{
var request = new HeadRequest
{
Body = new HeadRequest.Types.Body
{
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
}
},
};
request.AddMetaHeader();
request.Sign(_key);
var response = await _objectServiceClient.HeadAsync(request);
return response.Body.Header.Header.ToModel();
}
public async Task<ModelsV2.Object> GetObjectAsync(ContainerId cid, ObjectId oid)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
var request = new GetRequest
{
Body = new GetRequest.Types.Body
{
Raw = false,
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
},
}
};
request.AddMetaHeader();
request.AddObjectSessionToken(
sessionToken,
cid.ToGrpcMessage(),
oid.ToGrpcMessage(),
ObjectSessionContext.Types.Verb.Get,
_key
);
request.Sign(_key);
var response = await GetObject(request);
return response.ToModel();
}
private async Task<Object.Object> GetObject(GetRequest request)
{
using var stream = GetObjectInit(request);
var obj = await stream.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
var offset = 0;
var chunk = await stream.ReadChunk();
while (chunk is not null)
{
chunk.CopyTo(payload, offset);
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
obj.Payload = ByteString.CopyFrom(payload);
return obj;
}
private ObjectReader GetObjectInit(GetRequest initRequest)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
return new ObjectReader
{
Call = _objectServiceClient.Get(initRequest)
};
}
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, Stream payload)
{
return await PutObject(header, payload);
}
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, byte[] payload)
{
return await PutObject(header, new MemoryStream(payload));
}
private async Task<ObjectId> PutObject(ObjectHeader header, Stream payload)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
var hdr = header.ToGrpcMessage();
hdr.OwnerId = OwnerId.ToGrpcMessage();
hdr.Version = Version.ToGrpcMessage();
var oid = new ObjectID
{
Value = hdr.Sha256()
};
var request = new PutRequest
{
Body = new PutRequest.Types.Body
{
Init = new PutRequest.Types.Body.Types.Init
{
Header = hdr
},
}
};
request.AddMetaHeader();
request.AddObjectSessionToken(
sessionToken,
hdr.ContainerId,
oid,
ObjectSessionContext.Types.Verb.Put,
_key
);
request.Sign(_key);
using var stream = await PutObjectInit(request);
var buffer = new byte[Constants.ObjectChunkSize];
var bufferLength = await payload.ReadAsync(buffer.AsMemory(0, Constants.ObjectChunkSize));
while (bufferLength > 0)
{
request.Body = new PutRequest.Types.Body
{
Chunk = ByteString.CopyFrom(buffer[..bufferLength]),
};
request.VerifyHeader = null;
request.Sign(_key);
await stream.Write(request);
bufferLength = await payload.ReadAsync(buffer.AsMemory(0, Constants.ObjectChunkSize));
}
var response = await stream.Close();
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
var call = _objectServiceClient.Put();
await call.RequestStream.WriteAsync(initRequest);
return new ObjectStreamer { Call = call };
}
public async Task DeleteObjectAsync(ContainerId cid, ObjectId oid)
{
var request = new DeleteRequest
{
Body = new DeleteRequest.Types.Body
{
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
}
}
};
request.AddMetaHeader();
request.Sign(_key);
await _objectServiceClient.DeleteAsync(request);
}
public async IAsyncEnumerable<ObjectId> SearchObjectsAsync(ContainerId cid, params ObjectFilter[] filters)
{
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
{
ContainerId = cid.ToGrpcMessage(),
Filters = { },
Version = 1
}
};
foreach (var filter in filters)
{
request.Body.Filters.Add(filter.ToGrpcMessage());
}
;
request.AddMetaHeader();
request.Sign(_key);
var objectsIds = SearchObjects(request);
await foreach (var oid in objectsIds)
{
yield return ObjectId.FromHash(oid.Value.ToByteArray());
}
}
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request)
{
using var stream = SearchObjectsInit(request);
var ids = await stream.Read();
while (ids is not null)
{
foreach (var oid in ids)
{
yield return oid;
}
ids = await stream.Read();
}
}
private SearchReader SearchObjectsInit(SearchRequest initRequest)
{
if (initRequest is null)
{
throw new ArgumentNullException(nameof(initRequest));
}
return new SearchReader
{
Call = _objectServiceClient.Search(initRequest)
};
}
}
internal class ObjectReader : IDisposable
{
public AsyncServerStreamingCall<GetResponse> Call { get; init; }
public async Task<Object.Object> ReadHeader()
{
if (!await Call.ResponseStream.MoveNext())
{
throw new InvalidOperationException("unexpect end of stream");
}
var response = Call.ResponseStream.Current;
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Init)
throw new InvalidOperationException("unexpect message type");
return new Object.Object
{
ObjectId = response.Body.Init.ObjectId,
Header = response.Body.Init.Header,
};
}
public async Task<byte[]?> ReadChunk()
{
if (!await Call.ResponseStream.MoveNext())
{
return null;
}
var response = Call.ResponseStream.Current;
if (response.Body.ObjectPartCase != GetResponse.Types.Body.ObjectPartOneofCase.Chunk)
throw new InvalidOperationException("unexpect message type");
return response.Body.Chunk.ToByteArray();
}
public void Dispose()
{
Call.Dispose();
}
}
internal class ObjectStreamer : IDisposable
{
public AsyncClientStreamingCall<PutRequest, PutResponse> Call { get; init; }
public async Task Write(PutRequest request)
{
if (request is null)
{
throw new ArgumentNullException(nameof(request));
}
await Call.RequestStream.WriteAsync(request);
}
public async Task<PutResponse> Close()
{
await Call.RequestStream.CompleteAsync();
return await Call.ResponseAsync;
}
public void Dispose()
{
Call.Dispose();
}
}
internal class SearchReader : IDisposable
{
public AsyncServerStreamingCall<SearchResponse> Call { get; init; }
public async Task<List<ObjectID>?> Read()
{
if (!await Call.ResponseStream.MoveNext())
{
return null;
}
var response = Call.ResponseStream.Current;
return response.Body?.IdList.ToList();
}
public void Dispose()
{
Call.Dispose();
}
}