[#4] infrastructure and sample Client Cut

Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
Pavel Gross 2024-06-10 11:31:36 +03:00
parent 0c4723c705
commit 545e647d7b
22 changed files with 717 additions and 193 deletions

View file

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Google.Protobuf;
@ -9,9 +10,10 @@ using FrostFS.Object;
using FrostFS.Refs;
using FrostFS.SDK.ClientV2.Mappers.GRPC;
using FrostFS.SDK.Cryptography;
using FrostFS.SDK.ModelsV2;
using FrostFS.Session;
using FrostFS.SDK.ModelsV2;
namespace FrostFS.SDK.ClientV2;
public partial class Client
@ -20,7 +22,7 @@ public partial class Client
{
var request = new HeadRequest
{
Body = new HeadRequest.Types.Body
Body = new HeadRequest.Types.Body
{
Address = new Address
{
@ -29,12 +31,12 @@ public partial class Client
}
}
};
request.AddMetaHeader();
request.Sign(_key);
var response = await _objectServiceClient.HeadAsync(request);
var response = await _objectServiceClient!.HeadAsync(request);
Verifier.CheckResponse(response);
return response.Body.Header.Header.ToModel();
}
@ -45,7 +47,6 @@ public partial class Client
{
Body = new GetRequest.Types.Body
{
Raw = false,
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
@ -76,85 +77,8 @@ public partial class Client
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, byte[] payload)
{
return await PutObject(header, new MemoryStream(payload));
}
public async Task DeleteObjectAsync(ContainerId cid, ObjectId oid)
{
var request = new DeleteRequest
{
Body = new DeleteRequest.Types.Body
{
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
}
}
};
request.AddMetaHeader();
request.Sign(_key);
var response = await _objectServiceClient.DeleteAsync(request);
Verifier.CheckResponse(response);
}
public async IAsyncEnumerable<ObjectId> SearchObjectsAsync(ContainerId cid, params ObjectFilter[] filters)
{
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
{
ContainerId = cid.ToGrpcMessage(),
Filters = { },
Version = 1
}
};
foreach (var filter in filters)
{
request.Body.Filters.Add(filter.ToGrpcMessage());
}
request.AddMetaHeader();
request.Sign(_key);
var objectsIds = SearchObjects(request);
await foreach (var oid in objectsIds)
{
yield return ObjectId.FromHash(oid.Value.ToByteArray());
}
}
private async Task<Object.Object> GetObject(GetRequest request)
{
using var stream = GetObjectInit(request);
var obj = await stream.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
var offset = 0;
var chunk = await stream.ReadChunk();
while (chunk is not null)
{
chunk.CopyTo(payload, offset);
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
obj.Payload = ByteString.CopyFrom(payload);
return obj;
}
private ObjectReader GetObjectInit(GetRequest initRequest)
{
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
return new ObjectReader
{
Call = _objectServiceClient.Get(initRequest)
};
using var stream = new MemoryStream(payload);
return await PutObject(header, stream);
}
private async Task<ObjectId> PutObject(ObjectHeader header, Stream payload)
@ -163,12 +87,12 @@ public partial class Client
var hdr = header.ToGrpcMessage();
hdr.OwnerId = OwnerId.ToGrpcMessage();
hdr.Version = Version.ToGrpcMessage();
var oid = new ObjectID
{
Value = hdr.Sha256()
};
var request = new PutRequest
{
Body = new PutRequest.Types.Body
@ -188,23 +112,27 @@ public partial class Client
ObjectSessionContext.Types.Verb.Put,
_key
);
request.Sign(_key);
using var stream = await PutObjectInit(request);
var buffer = new byte[Constants.ObjectChunkSize];
var bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize);
while (bufferLength > 0)
while (true)
{
var bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize);
if (bufferLength == 0)
break;
request.Body = new PutRequest.Types.Body
{
Chunk = ByteString.CopyFrom(buffer[..bufferLength]),
};
request.VerifyHeader = null;
request.Sign(_key);
await stream.Write(request);
bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize);
}
var response = await stream.Close();
@ -213,6 +141,192 @@ public partial class Client
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
}
public async Task<ObjectId> PutSingleObjectAsync(ModelsV2.Object @object)
{
var sessionToken = await CreateSessionAsync(uint.MaxValue);
var obj = CreateObject(@object);
var request = new PutSingleRequest
{
Body = new () { Object = obj }
};
request.AddMetaHeader();
request.AddObjectSessionToken(
sessionToken,
obj.Header.ContainerId,
obj.ObjectId,
ObjectSessionContext.Types.Verb.Put,
_key
);
request.Sign(_key);
var response = await _objectServiceClient!.PutSingleAsync(request);
Verifier.CheckResponse(response);
return ObjectId.FromHash(obj.ObjectId.Value.ToByteArray());
}
public Object.Object CreateObject(ModelsV2.Object @object)
{
var grpcHeader = @object.Header.ToGrpcMessage();
grpcHeader.OwnerId = OwnerId.ToGrpcMessage();
grpcHeader.Version = Version.ToGrpcMessage();
if (@object.Payload != null)
{
grpcHeader.PayloadLength = (ulong)@object.Payload.Length;
grpcHeader.PayloadHash = Sha256Checksum(@object.Payload);
}
var split = @object.Header.Split;
if (split != null)
{
grpcHeader.Split = new Header.Types.Split
{
SplitId = split.SplitId != null ? ByteString.CopyFrom(split.SplitId.ToBinary()) : null
};
if (split.Children != null && split.Children.Any())
grpcHeader.Split.Children.AddRange(split.Children.Select(id => id.ToGrpcMessage()));
if (split.ParentHeader is not null)
{
var grpcParentHeader = CreateHeader(split.ParentHeader, []);
grpcHeader.Split.Parent = new ObjectID { Value = grpcParentHeader.Sha256() };
grpcHeader.Split.ParentHeader = grpcParentHeader;
grpcHeader.Split.ParentSignature = new Refs.Signature
{
Key = ByteString.CopyFrom(_key.PublicKey()),
Sign = ByteString.CopyFrom(_key.SignData(grpcHeader.Split.Parent.ToByteArray())),
};
split.Parent = grpcHeader.Split.Parent.ToModel();
}
}
var obj = new Object.Object
{
Header = grpcHeader,
ObjectId = new ObjectID { Value = grpcHeader.Sha256() },
Payload = ByteString.CopyFrom(@object.Payload)
};
obj.Signature = new Refs.Signature
{
Key = ByteString.CopyFrom(_key.PublicKey()),
Sign = ByteString.CopyFrom(_key.SignData(obj.ObjectId.ToByteArray())),
};
return obj;
}
public Header CreateHeader(ObjectHeader header, byte[]? payload)
{
var grpcHeader = header.ToGrpcMessage();
grpcHeader.OwnerId = OwnerId.ToGrpcMessage();
grpcHeader.Version = Version.ToGrpcMessage();
if (header.PayloadCheckSum != null)
{
grpcHeader.PayloadHash = new Checksum
{
Type = ChecksumType.Sha256,
Sum = ByteString.CopyFrom(header.PayloadCheckSum)
};
}
else
{
if (payload != null)
grpcHeader.PayloadHash = Sha256Checksum(payload);
}
return grpcHeader;
}
public async Task DeleteObjectAsync(ContainerId cid, ObjectId oid)
{
var request = new DeleteRequest
{
Body = new DeleteRequest.Types.Body
{
Address = new Address
{
ContainerId = cid.ToGrpcMessage(),
ObjectId = oid.ToGrpcMessage()
}
}
};
request.AddMetaHeader();
request.Sign(_key);
var response = await _objectServiceClient!.DeleteAsync(request);
Verifier.CheckResponse(response);
}
public async IAsyncEnumerable<ObjectId> SearchObjectsAsync(ContainerId cid, params ObjectFilter[] filters)
{
var request = new SearchRequest
{
Body = new SearchRequest.Types.Body
{
ContainerId = cid.ToGrpcMessage(),
Filters = { },
Version = 1
}
};
foreach (var filter in filters)
{
request.Body.Filters.Add(filter.ToGrpcMessage());
}
request.AddMetaHeader();
request.Sign(_key);
var objectsIds = SearchObjects(request);
await foreach (var oid in objectsIds)
{
yield return ObjectId.FromHash(oid.Value.ToByteArray());
}
}
private async Task<Object.Object> GetObject(GetRequest request)
{
using var stream = GetObjectInit(request);
var obj = await stream.ReadHeader();
var payload = new byte[obj.Header.PayloadLength];
var offset = 0;
var chunk = await stream.ReadChunk();
while (chunk is not null)
{
chunk.CopyTo(payload, offset);
offset += chunk.Length;
chunk = await stream.ReadChunk();
}
obj.Payload = ByteString.CopyFrom(payload);
return obj;
}
private ObjectReader GetObjectInit(GetRequest initRequest)
{
if (initRequest is null)
throw new ArgumentNullException(nameof(initRequest));
return new ObjectReader
{
Call = _objectServiceClient!.Get(initRequest)
};
}
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest)
{
if (initRequest is null)
@ -220,9 +334,9 @@ public partial class Client
throw new ArgumentNullException(nameof(initRequest));
}
var call = _objectServiceClient.Put();
var call = _objectServiceClient!.Put();
await call.RequestStream.WriteAsync(initRequest);
return new ObjectStreamer(call);
}
@ -236,7 +350,7 @@ public partial class Client
{
yield return oid;
}
ids = await stream.Read();
}
}
@ -248,6 +362,17 @@ public partial class Client
throw new ArgumentNullException(nameof(initRequest));
}
return new SearchReader(_objectServiceClient.Search(initRequest));
return new SearchReader(_objectServiceClient!.Search(initRequest));
}
public Checksum Sha256Checksum(byte[] data)
{
return new Checksum
{
Type = ChecksumType.Sha256,
Sum = ByteString.CopyFrom(data.Sha256())
};
}
}