[#11] Add Network Snapshot
Signed-off-by: Pavel Gross <p.gross@yadro.com>
This commit is contained in:
parent
b69d22966f
commit
c988ff3c76
84 changed files with 2238 additions and 933 deletions
|
@ -1,457 +0,0 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Google.Protobuf;
|
||||
|
||||
using FrostFS.Object;
|
||||
using FrostFS.Refs;
|
||||
using FrostFS.SDK.ClientV2.Mappers.GRPC;
|
||||
using FrostFS.SDK.Cryptography;
|
||||
using FrostFS.Session;
|
||||
|
||||
using FrostFS.SDK.ModelsV2;
|
||||
using FrostFS.SDK.ClientV2.Extensions;
|
||||
using System.Threading;
|
||||
|
||||
namespace FrostFS.SDK.ClientV2;
|
||||
|
||||
public partial class Client
|
||||
{
|
||||
public async Task<ObjectHeader> GetObjectHeadAsync(ContainerId cid, ObjectId oid)
|
||||
{
|
||||
var request = new HeadRequest
|
||||
{
|
||||
Body = new HeadRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
ObjectId = oid.ToGrpcMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.Sign(_key);
|
||||
var response = await _objectServiceClient!.HeadAsync(request);
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return response.Body.Header.Header.ToModel();
|
||||
}
|
||||
|
||||
public async Task<ModelsV2.Object> GetObjectAsync(ContainerId cid, ObjectId oid)
|
||||
{
|
||||
var sessionToken = await CreateSessionAsync(uint.MaxValue);
|
||||
var request = new GetRequest
|
||||
{
|
||||
Body = new GetRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
ObjectId = oid.ToGrpcMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.AddObjectSessionToken(
|
||||
sessionToken,
|
||||
cid.ToGrpcMessage(),
|
||||
oid.ToGrpcMessage(),
|
||||
ObjectSessionContext.Types.Verb.Get,
|
||||
_key
|
||||
);
|
||||
|
||||
request.Sign(_key);
|
||||
var obj = await GetObject(request);
|
||||
|
||||
return obj.ToModel();
|
||||
}
|
||||
|
||||
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, Stream payload, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return await PutObject(header, payload, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<ObjectId> PutObjectAsync(ObjectHeader header, byte[] payload, CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var stream = new MemoryStream(payload);
|
||||
return await PutObject(header, stream, cancellationToken);
|
||||
}
|
||||
|
||||
private Task<ObjectId> PutObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken)
|
||||
{
|
||||
if (header.ClientCut)
|
||||
return PutClientCutObject(header, payload, cancellationToken);
|
||||
else
|
||||
return PutStreamObject(header, payload, cancellationToken);
|
||||
}
|
||||
|
||||
private async Task<ObjectId> PutClientCutObject(ObjectHeader header, Stream payloadStream, CancellationToken cancellationToken)
|
||||
{
|
||||
ObjectId? objectId = null;
|
||||
List<ObjectId> sentObjectIds = [];
|
||||
ModelsV2.Object? currentObject;
|
||||
|
||||
var partSize = (int)NetworkSettings["MaxObjectSize"];
|
||||
var buffer = new byte[partSize];
|
||||
|
||||
var largeObject = new LargeObject(header.ContainerId);
|
||||
|
||||
var split = new Split();
|
||||
|
||||
var fullLength = (ulong)payloadStream.Length;
|
||||
|
||||
while (true)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var bytesCount = await payloadStream.ReadAsync(buffer, 0, partSize);
|
||||
|
||||
split.Previous = sentObjectIds.LastOrDefault();
|
||||
|
||||
largeObject.AppendBlock(buffer, bytesCount);
|
||||
|
||||
currentObject = new ModelsV2.Object(header.ContainerId, bytesCount < partSize ? buffer.Take(bytesCount).ToArray() : buffer)
|
||||
.AddAttributes(header.Attributes)
|
||||
.SetSplit(split);
|
||||
|
||||
if (largeObject.PayloadLength == fullLength)
|
||||
break;
|
||||
|
||||
objectId = await PutSingleObjectAsync(currentObject, cancellationToken);
|
||||
|
||||
sentObjectIds.Add(objectId!);
|
||||
}
|
||||
|
||||
if (sentObjectIds.Any())
|
||||
{
|
||||
largeObject.CalculateHash();
|
||||
|
||||
currentObject.SetParent(largeObject);
|
||||
|
||||
objectId = await PutSingleObjectAsync(currentObject, cancellationToken);
|
||||
sentObjectIds.Add(objectId);
|
||||
|
||||
var linkObject = new LinkObject(header.ContainerId, split.SplitId, largeObject)
|
||||
.AddChildren(sentObjectIds);
|
||||
|
||||
_ = await PutSingleObjectAsync(linkObject, cancellationToken);
|
||||
|
||||
return currentObject.GetParentId();
|
||||
}
|
||||
|
||||
return await PutSingleObjectAsync(currentObject, cancellationToken);
|
||||
}
|
||||
|
||||
private async Task<ObjectId> PutStreamObject(ObjectHeader header, Stream payload, CancellationToken cancellationToken)
|
||||
{
|
||||
var sessionToken = await CreateSessionAsync(uint.MaxValue);
|
||||
var hdr = header.ToGrpcMessage();
|
||||
hdr.OwnerId = OwnerId.ToGrpcMessage();
|
||||
hdr.Version = Version.ToGrpcMessage();
|
||||
|
||||
var oid = new ObjectID
|
||||
{
|
||||
Value = hdr.Sha256()
|
||||
};
|
||||
|
||||
var request = new PutRequest
|
||||
{
|
||||
Body = new PutRequest.Types.Body
|
||||
{
|
||||
Init = new PutRequest.Types.Body.Types.Init
|
||||
{
|
||||
Header = hdr
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.AddObjectSessionToken(
|
||||
sessionToken,
|
||||
hdr.ContainerId,
|
||||
oid,
|
||||
ObjectSessionContext.Types.Verb.Put,
|
||||
_key
|
||||
);
|
||||
|
||||
request.Sign(_key);
|
||||
|
||||
using var stream = await PutObjectInit(request);
|
||||
var buffer = new byte[Constants.ObjectChunkSize];
|
||||
|
||||
while (true)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var bufferLength = await payload.ReadAsync(buffer, 0, Constants.ObjectChunkSize);
|
||||
|
||||
if (bufferLength == 0)
|
||||
break;
|
||||
|
||||
request.Body = new PutRequest.Types.Body
|
||||
{
|
||||
Chunk = ByteString.CopyFrom(buffer[..bufferLength]),
|
||||
};
|
||||
|
||||
request.VerifyHeader = null;
|
||||
request.Sign(_key);
|
||||
await stream.Write(request);
|
||||
}
|
||||
|
||||
var response = await stream.Close();
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return ObjectId.FromHash(response.Body.ObjectId.Value.ToByteArray());
|
||||
}
|
||||
|
||||
public async Task<ObjectId> PutSingleObjectAsync(ModelsV2.Object @object, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var sessionToken = await CreateSessionAsync(uint.MaxValue);
|
||||
|
||||
var obj = CreateObject(@object);
|
||||
|
||||
var request = new PutSingleRequest
|
||||
{
|
||||
Body = new () { Object = obj }
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.AddObjectSessionToken(
|
||||
sessionToken,
|
||||
obj.Header.ContainerId,
|
||||
obj.ObjectId,
|
||||
ObjectSessionContext.Types.Verb.Put,
|
||||
_key
|
||||
);
|
||||
|
||||
request.Sign(_key);
|
||||
|
||||
var response = await _objectServiceClient!.PutSingleAsync(request, null, null, cancellationToken);
|
||||
Verifier.CheckResponse(response);
|
||||
|
||||
return ObjectId.FromHash(obj.ObjectId.Value.ToByteArray());
|
||||
}
|
||||
|
||||
public Object.Object CreateObject(ModelsV2.Object @object)
|
||||
{
|
||||
var grpcHeader = @object.Header.ToGrpcMessage();
|
||||
|
||||
grpcHeader.OwnerId = OwnerId.ToGrpcMessage();
|
||||
grpcHeader.Version = Version.ToGrpcMessage();
|
||||
|
||||
if (@object.Payload != null)
|
||||
{
|
||||
grpcHeader.PayloadLength = (ulong)@object.Payload.Length;
|
||||
grpcHeader.PayloadHash = Sha256Checksum(@object.Payload);
|
||||
}
|
||||
|
||||
var split = @object.Header.Split;
|
||||
if (split != null)
|
||||
{
|
||||
grpcHeader.Split = new Header.Types.Split
|
||||
{
|
||||
SplitId = split.SplitId != null ? ByteString.CopyFrom(split.SplitId.ToBinary()) : null
|
||||
};
|
||||
|
||||
if (split.Children != null && split.Children.Any())
|
||||
grpcHeader.Split.Children.AddRange(split.Children.Select(id => id.ToGrpcMessage()));
|
||||
|
||||
if (split.ParentHeader is not null)
|
||||
{
|
||||
var grpcParentHeader = CreateHeader(split.ParentHeader, []);
|
||||
|
||||
grpcHeader.Split.Parent = new ObjectID { Value = grpcParentHeader.Sha256() };
|
||||
grpcHeader.Split.ParentHeader = grpcParentHeader;
|
||||
grpcHeader.Split.ParentSignature = new Refs.Signature
|
||||
{
|
||||
Key = ByteString.CopyFrom(_key.PublicKey()),
|
||||
Sign = ByteString.CopyFrom(_key.SignData(grpcHeader.Split.Parent.ToByteArray())),
|
||||
};
|
||||
|
||||
split.Parent = grpcHeader.Split.Parent.ToModel();
|
||||
}
|
||||
|
||||
grpcHeader.Split.Previous = split.Previous?.ToGrpcMessage();
|
||||
}
|
||||
|
||||
var obj = new Object.Object
|
||||
{
|
||||
Header = grpcHeader,
|
||||
ObjectId = new ObjectID { Value = grpcHeader.Sha256() },
|
||||
Payload = ByteString.CopyFrom(@object.Payload)
|
||||
};
|
||||
|
||||
obj.Signature = new Refs.Signature
|
||||
{
|
||||
Key = ByteString.CopyFrom(_key.PublicKey()),
|
||||
Sign = ByteString.CopyFrom(_key.SignData(obj.ObjectId.ToByteArray())),
|
||||
};
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
||||
public Header CreateHeader(ObjectHeader header, byte[]? payload)
|
||||
{
|
||||
var grpcHeader = header.ToGrpcMessage();
|
||||
|
||||
grpcHeader.OwnerId = OwnerId.ToGrpcMessage();
|
||||
grpcHeader.Version = Version.ToGrpcMessage();
|
||||
|
||||
if (header.PayloadCheckSum != null)
|
||||
{
|
||||
grpcHeader.PayloadHash = new Checksum
|
||||
{
|
||||
Type = ChecksumType.Sha256,
|
||||
Sum = ByteString.CopyFrom(header.PayloadCheckSum)
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
if (payload != null)
|
||||
grpcHeader.PayloadHash = Sha256Checksum(payload);
|
||||
}
|
||||
|
||||
return grpcHeader;
|
||||
}
|
||||
|
||||
public async Task DeleteObjectAsync(ContainerId cid, ObjectId oid)
|
||||
{
|
||||
var request = new DeleteRequest
|
||||
{
|
||||
Body = new DeleteRequest.Types.Body
|
||||
{
|
||||
Address = new Address
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
ObjectId = oid.ToGrpcMessage()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.Sign(_key);
|
||||
var response = await _objectServiceClient!.DeleteAsync(request);
|
||||
Verifier.CheckResponse(response);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<ObjectId> SearchObjectsAsync(ContainerId cid, params ObjectFilter[] filters)
|
||||
{
|
||||
var request = new SearchRequest
|
||||
{
|
||||
Body = new SearchRequest.Types.Body
|
||||
{
|
||||
ContainerId = cid.ToGrpcMessage(),
|
||||
Filters = { },
|
||||
Version = 1
|
||||
}
|
||||
};
|
||||
|
||||
foreach (var filter in filters)
|
||||
{
|
||||
request.Body.Filters.Add(filter.ToGrpcMessage());
|
||||
}
|
||||
|
||||
|
||||
request.AddMetaHeader();
|
||||
request.Sign(_key);
|
||||
var objectsIds = SearchObjects(request);
|
||||
|
||||
|
||||
await foreach (var oid in objectsIds)
|
||||
{
|
||||
yield return ObjectId.FromHash(oid.Value.ToByteArray());
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<Object.Object> GetObject(GetRequest request)
|
||||
{
|
||||
using var stream = GetObjectInit(request);
|
||||
var obj = await stream.ReadHeader();
|
||||
var payload = new byte[obj.Header.PayloadLength];
|
||||
var offset = 0L;
|
||||
var chunk = await stream.ReadChunk();
|
||||
|
||||
while (chunk is not null && (ulong)offset < obj.Header.PayloadLength)
|
||||
{
|
||||
var length = Math.Min((long)obj.Header.PayloadLength - offset, chunk.Length);
|
||||
|
||||
Array.Copy(chunk, 0, payload, offset, length);
|
||||
offset += chunk.Length;
|
||||
chunk = await stream.ReadChunk();
|
||||
}
|
||||
|
||||
obj.Payload = ByteString.CopyFrom(payload);
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
||||
private ObjectReader GetObjectInit(GetRequest initRequest)
|
||||
{
|
||||
if (initRequest is null)
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
|
||||
|
||||
return new ObjectReader
|
||||
{
|
||||
Call = _objectServiceClient!.Get(initRequest)
|
||||
};
|
||||
}
|
||||
|
||||
private async Task<ObjectStreamer> PutObjectInit(PutRequest initRequest)
|
||||
{
|
||||
if (initRequest is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
var call = _objectServiceClient!.Put();
|
||||
await call.RequestStream.WriteAsync(initRequest);
|
||||
|
||||
return new ObjectStreamer(call);
|
||||
}
|
||||
|
||||
private async IAsyncEnumerable<ObjectID> SearchObjects(SearchRequest request)
|
||||
{
|
||||
using var stream = GetSearchReader(request);
|
||||
var ids = await stream.Read();
|
||||
while (ids is not null)
|
||||
{
|
||||
foreach (var oid in ids)
|
||||
{
|
||||
yield return oid;
|
||||
}
|
||||
|
||||
ids = await stream.Read();
|
||||
}
|
||||
}
|
||||
|
||||
private SearchReader GetSearchReader(SearchRequest initRequest)
|
||||
{
|
||||
if (initRequest is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(initRequest));
|
||||
}
|
||||
|
||||
return new SearchReader(_objectServiceClient!.Search(initRequest));
|
||||
}
|
||||
|
||||
public Checksum Sha256Checksum(byte[] data)
|
||||
{
|
||||
return new Checksum
|
||||
{
|
||||
Type = ChecksumType.Sha256,
|
||||
Sum = ByteString.CopyFrom(data.Sha256())
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue