Compare commits

...
Sign in to create a new pull request.

6 commits

Author SHA1 Message Date
0bc4f6871c
m
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-28 10:51:56 +03:00
984b51a86d
pilorama: Allow to filter keys by prefix
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-28 10:51:56 +03:00
3d7c49c17b
WIP
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-28 10:51:56 +03:00
1d4467043e
pilorama: Add TreeListKeys()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-28 10:51:56 +03:00
09a4469677
services/tree: Move server stream relay to a separate function
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-28 10:51:56 +03:00
47e03b8b1e
services/tree: Simplify loop condition in getSortedSubTree()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-28 10:51:56 +03:00
14 changed files with 1730 additions and 39 deletions

View file

@ -468,3 +468,33 @@ func (e *StorageEngine) getTreeShard(ctx context.Context, cid cidSDK.ID, treeID
return 0, lst, pilorama.ErrTreeNotFound return 0, lst, pilorama.ErrTreeNotFound
} }
func (e *StorageEngine) TreeListKeys(ctx context.Context, cid cidSDK.ID, treeID string, cursor pilorama.ListKeysCursor, count int) ([]pilorama.KeyInfo, pilorama.ListKeysCursor, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeListKeys",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
var err error
var res []pilorama.KeyInfo
for _, sh := range e.sortShards(cid) {
res, cursor, err = sh.TreeListKeys(ctx, cid, treeID, cursor, count)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(ctx, sh, "can't read tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
return res, cursor, err
}
return res, cursor, err
}

View file

@ -11,6 +11,7 @@ import (
"path/filepath" "path/filepath"
"slices" "slices"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -1166,6 +1167,206 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
return res, last, metaerr.Wrap(err) return res, last, metaerr.Wrap(err)
} }
type ListKeysCursor struct {
prefix []string
finished bool
stack stack
}
func (c *ListKeysCursor) SetPrefix(prefix string) {
c.prefix = strings.Split(prefix, "/")
}
func (t *boltForest) TreeListKeys(_ context.Context, cid cidSDK.ID, treeID string, cursor ListKeysCursor, count int) ([]KeyInfo, ListKeysCursor, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, cursor, ErrDegradedMode
}
if cursor.finished {
return nil, cursor, nil
}
var result []KeyInfo
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
var err error
result, err = t.listKeys(b, &cursor, count, MultiNode{RootID})
return err
})
if cursor.stack.empty() {
cursor.finished = true
}
return result, cursor, metaerr.Wrap(err)
}
func (t *boltForest) listKeys(b *bbolt.Bucket, cursor *ListKeysCursor, count int, root MultiNode) ([]KeyInfo, error) {
var result []KeyInfo
if cursor.stack.empty() {
cursor.stack.push(treeLevel{
parentPrefix: "",
parent: root,
children: []heapInfo{{id: root, filename: "/"}},
})
} else {
cursor.stack.refillLastLevel(count, func(last string, p, c MultiNode, count int) []heapInfo {
return t.nextBatch(b, last, p, c, count)
})
}
// // If the node is a leaf, we could scan all filenames in the tree.
// // To prevent this we first count the number of children: if it is less than
// // the number of nodes we need to return, fallback to TreeGetChildren() implementation.
// if fewChildren = t.hasFewChildren(b, nodeIDs, count); fewChildren {
// var err error
// result, err = t.getChildren(b, nodeIDs)
// return err
// }
for !cursor.stack.empty() {
depth := len(cursor.stack.values)
node, _, prefix := cursor.stack.popSingle()
if strings.HasSuffix(node.filename, "/") {
h := newHeap(nil, count*5)
h.withSlash = true
if depth < len(cursor.prefix) {
t.fillSortedChildrenExact(b, node.id, h, cursor.prefix[depth-1])
} else if depth == len(cursor.prefix) {
t.fillSortedChildrenWithPrefix(b, node.id, h, cursor.prefix[depth-1])
} else {
t.fillSortedChildren(b, node.id, h)
}
var children []heapInfo
for hi, ok := h.pop(); ok; hi, ok = h.pop() {
children = append(children, hi)
}
if len(children) != 0 {
cursor.stack.push(treeLevel{
parentPrefix: prefix + node.filename,
parent: node.id,
children: children,
})
}
cursor.stack.trim()
} else if len(cursor.prefix) < depth {
treeKey := strings.TrimPrefix(prefix, "/") + node.filename
meta, err := t.getLatestMeta(b, node.id)
if err != nil {
return nil, err
}
result = append(result, KeyInfo{Key: treeKey, Meta: meta.Items})
if len(result) == count {
return result, nil
}
}
}
return result, nil
}
func (t *boltForest) getLatestMeta(b *bbolt.Bucket, ids MultiNode) (Meta, error) {
keySlice := make([]byte, 9)
var meta Meta
for _, id := range ids {
info, err := t.getChildInfo(b, keySlice, id)
if err != nil {
return Meta{}, err
}
if !isInternal(info.Meta.Items) && info.Meta.Time > meta.Time {
meta = info.Meta
}
}
return meta, nil
}
func (t *boltForest) fillSorted(b *bbolt.Bucket, full bool, filename string, parentIDs, nodeIDs MultiNode, h *fixedHeap) {
c := b.Cursor()
if full {
prefix := internalKeyPrefix(nil, AttributeFilename)
if filename != "" {
h.push(nodeIDs, filename+"/")
}
for i := 1; i < len(filename)-1; i++ {
l := i
prefix = append(prefix, byte(l), byte(l>>8))
prefix = append(prefix, filename[:i]...)
var children MultiNode
for k, _ := c.Seek(prefix); len(k) == len(prefix)+16 && k[0] == 'i' && bytes.HasPrefix(k[len(prefix)+2:], []byte(filename[:i])); k, _ = c.Next() {
parentID := binary.LittleEndian.Uint64(k[len(k)-16:])
if !slices.Contains(nodeIDs, parentID) {
continue
}
children = append(children, binary.LittleEndian.Uint64(k[len(k)-8:]))
}
h.push(children, filename[:i]+"/")
}
}
length := uint16(0)
count := 0
prefix := internalKeyPrefix(nil, AttributeFilename)
var nodes []uint64
var lastFilename *string
for k, _ := c.Seek(prefix); len(k) > 0 && k[0] == 'i'; k, _ = c.Next() {
if len(k) < len(prefix)+2+16 {
continue
}
parentID := binary.LittleEndian.Uint64(k[len(k)-16:])
if !slices.Contains(parentIDs, parentID) {
continue
}
actualLength := binary.LittleEndian.Uint16(k[len(prefix):])
childID := binary.LittleEndian.Uint64(k[len(k)-8:])
filename := string(k[len(prefix)+2 : len(k)-16])
if lastFilename == nil {
lastFilename = &filename
nodes = append(nodes, childID)
} else if *lastFilename == filename {
nodes = append(nodes, childID)
} else {
processed1 := h.push(nodes, *lastFilename)
processed2 := h.push(nodes, *lastFilename+"/")
nodes = MultiNode{childID}
lastFilename = &filename
if actualLength != length {
length = actualLength
count = 1
} else if processed1 || processed2 {
if count++; count > h.count {
lastFilename = nil
nodes = nil
length = actualLength + 1
count = 0
c.Seek(append(prefix, byte(length), byte(length>>8)))
c.Prev() // c.Next() will be performed by for loop
}
}
}
}
if len(nodes) != 0 && lastFilename != nil {
h.push(nodes, *lastFilename)
h.push(nodes, *lastFilename+"/")
}
}
func sortByFilename(nodes []NodeInfo) { func sortByFilename(nodes []NodeInfo) {
slices.SortFunc(nodes, func(a, b NodeInfo) int { slices.SortFunc(nodes, func(a, b NodeInfo) int {
return bytes.Compare(a.Meta.GetAttr(AttributeFilename), b.Meta.GetAttr(AttributeFilename)) return bytes.Compare(a.Meta.GetAttr(AttributeFilename), b.Meta.GetAttr(AttributeFilename))
@ -1199,6 +1400,102 @@ func (t *boltForest) getChildInfo(b *bbolt.Bucket, key []byte, childID Node) (No
return childInfo, nil return childInfo, nil
} }
func (t *boltForest) fillSortedChildrenExact(b *bbolt.Bucket, nodeIDs MultiNode, h *fixedHeap, pp string) {
c := b.Cursor()
length := uint16(len(pp))
prefix := internalKeyPrefix(nil, AttributeFilename)
prefix = binary.LittleEndian.AppendUint16(prefix, length)
prefix = append(prefix, pp...)
var nodes []uint64
for k, _ := c.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = c.Next() {
parentID := binary.LittleEndian.Uint64(k[len(k)-16:])
if !slices.Contains(nodeIDs, parentID) {
continue
}
childID := binary.LittleEndian.Uint64(k[len(k)-8:])
nodes = append(nodes, childID)
}
if len(nodes) != 0 {
h.push(nodes, pp)
}
}
func (t *boltForest) fillSortedChildrenWithPrefix(b *bbolt.Bucket, nodeIDs MultiNode, h *fixedHeap, pp string) {
c := b.Cursor()
length := uint16(len(pp))
prefix := internalKeyPrefix(nil, AttributeFilename)
offset := len(prefix)
prefix = binary.LittleEndian.AppendUint16(prefix, length)
prefix = append(prefix, pp...)
count := 0
var nodes []uint64
var lastFilename *string
for k, _ := c.Seek(prefix); len(k) > 0 && k[0] == 'i'; k, _ = c.Next() {
if len(k) < len(prefix)+16 {
continue
}
actualLength := binary.LittleEndian.Uint16(k[offset:])
childID := binary.LittleEndian.Uint64(k[len(k)-8:])
filename := string(k[offset+2 : len(k)-16])
if !strings.HasPrefix(filename, pp) {
if lastFilename != nil {
h.push(nodes, *lastFilename)
}
lastFilename = nil
nodes = nil
length = actualLength + 1
count = 0
binary.LittleEndian.PutUint16(prefix[offset:], length)
c.Seek(prefix)
c.Prev() // c.Next() will be performed by for loop
continue
}
parentID := binary.LittleEndian.Uint64(k[len(k)-16:])
if !slices.Contains(nodeIDs, parentID) {
continue
}
if lastFilename == nil {
lastFilename = &filename
nodes = append(nodes, childID)
} else if *lastFilename == filename {
nodes = append(nodes, childID)
} else {
processed := h.push(nodes, *lastFilename)
nodes = MultiNode{childID}
lastFilename = &filename
if actualLength != length {
length = actualLength
count = 1
} else if processed {
if count++; count > h.count {
lastFilename = nil
nodes = nil
length = actualLength + 1
count = 0
binary.LittleEndian.PutUint16(prefix[offset:], length)
c.Seek(prefix)
c.Prev() // c.Next() will be performed by for loop
}
}
}
}
if len(nodes) != 0 && lastFilename != nil {
h.push(nodes, *lastFilename)
}
}
func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *fixedHeap) { func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *fixedHeap) {
c := b.Cursor() c := b.Cursor()
prefix := internalKeyPrefix(nil, AttributeFilename) prefix := internalKeyPrefix(nil, AttributeFilename)

View file

@ -0,0 +1,101 @@
package pilorama
import (
"context"
"fmt"
"slices"
"strings"
"testing"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func TestTreeListKeys(t *testing.T) {
for i := range providers {
if providers[i].name != "bbolt" {
continue
}
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeListKeys(t, providers[i].construct(t).(*boltForest))
})
}
}
func testForestTreeListKeys(t *testing.T, s *boltForest) {
ctx := context.Background()
defer func() { require.NoError(t, s.Close(ctx)) }()
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
keys := []KeyInfo{
{Key: "a"},
{Key: "a-"},
{Key: "a--"},
{Key: "ab"},
{Key: "a-/a"},
{Key: "a-/b"},
{Key: "a/b"},
}
for i := range keys {
path := strings.Split(keys[i].Key, "/")
keys[i].Meta = []KeyValue{
{Key: AttributeVersion, Value: []byte{byte(i)}}, // uuid.New().String())},
{Key: AttributeFilename, Value: []byte(path[len(path)-1])},
}
_, err := s.TreeAddByPath(ctx, d, treeID, AttributeFilename, path[:len(path)-1], keys[i].Meta)
require.NoError(t, err)
}
sortedKeys := slices.Clone(keys)
slices.SortFunc(sortedKeys, func(a, b KeyInfo) int {
return strings.Compare(a.Key, b.Key)
})
listKeys := func(t *testing.T, cursor ListKeysCursor, count int) ([]KeyInfo, ListKeysCursor) {
res, cursor, err := s.TreeListKeys(ctx, cid, treeID, cursor, count)
require.NoError(t, err)
return res, cursor
}
res, _ := listKeys(t, ListKeysCursor{}, 20)
require.Equal(t, sortedKeys, res)
t.Run("multiple invocations", func(t *testing.T) {
var cursor ListKeysCursor
var result []KeyInfo
res, cursor := listKeys(t, cursor, 2)
for len(res) != 0 {
result = append(result, res...)
res, cursor = listKeys(t, cursor, 2)
}
require.Equal(t, sortedKeys, result)
})
t.Run("with prefix", func(t *testing.T) {
for _, prefix := range []string{"a", "a-", "ab", "ab/", "a-/"} {
t.Run(fmt.Sprintf("prefix=%s", prefix), func(t *testing.T) {
expected := slices.Clone(sortedKeys)
for i := 0; i < len(expected); {
if strings.HasPrefix(expected[i].Key, prefix) {
i++
} else {
expected = slices.Delete(expected, i, i+1)
}
}
var cursor ListKeysCursor
cursor.SetPrefix(prefix)
res, _ := listKeys(t, cursor, 20)
require.Len(t, res, len(expected))
if len(expected) != 0 { // Ignore nil vs empty slice comparison.
require.Equal(t, expected, res)
}
})
}
})
}

View file

@ -120,6 +120,10 @@ func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID st
return nil return nil
} }
func (f *memoryForest) TreeListKeys(_ context.Context, _ cid.ID, _ string, _ ListKeysCursor, _ int) ([]KeyInfo, ListKeysCursor, error) {
panic("unimplemented")
}
func (f *memoryForest) Init(context.Context) error { func (f *memoryForest) Init(context.Context) error {
return nil return nil
} }

View file

@ -34,6 +34,8 @@ type fixedHeap struct {
sorted bool sorted bool
count int count int
h *filenameHeap h *filenameHeap
withSlash bool
} }
func newHeap(start *string, count int) *fixedHeap { func newHeap(start *string, count int) *fixedHeap {
@ -50,6 +52,14 @@ func newHeap(start *string, count int) *fixedHeap {
const amortizationMultiplier = 5 const amortizationMultiplier = 5
func (h *fixedHeap) push(id MultiNode, filename string) bool { func (h *fixedHeap) push(id MultiNode, filename string) bool {
res := h.pushAux(id, filename)
if h.withSlash {
return h.pushAux(id, filename+"/") || res
}
return res
}
func (h *fixedHeap) pushAux(id MultiNode, filename string) bool {
if h.start != nil && filename <= *h.start { if h.start != nil && filename <= *h.start {
return false return false
} }

View file

@ -57,6 +57,12 @@ type Forest interface {
TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error)
// TreeHeight returns current tree height. // TreeHeight returns current tree height.
TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error)
TreeListKeys(ctx context.Context, cid cidSDK.ID, treeID string, cursor ListKeysCursor, count int) ([]KeyInfo, ListKeysCursor, error)
}
type KeyInfo struct {
Key string
Meta []KeyValue
} }
type ForestStorage interface { type ForestStorage interface {

View file

@ -0,0 +1,69 @@
package pilorama
import (
"go.etcd.io/bbolt"
)
type treeLevel struct {
parentPrefix string
parent MultiNode
children []heapInfo
}
type stack struct {
values []treeLevel
}
func (s *stack) empty() bool {
for i := range s.values {
if len(s.values[i].children) != 0 {
return false
}
}
return true
}
func (s *stack) popSingle() (heapInfo, MultiNode, string) {
lastLevel := &s.values[len(s.values)-1]
prefix := lastLevel.parentPrefix
parents := lastLevel.parent
head, tail := lastLevel.children[0], lastLevel.children[1:]
lastLevel.children = tail
return head, parents, prefix
}
func (s *stack) trim() {
for len(s.values) > 0 && len(s.values[len(s.values)-1].children) == 0 {
s.values = s.values[:len(s.values)-1]
}
}
func (s *stack) push(level treeLevel) {
s.values = append(s.values, level)
}
func (s *stack) refillLastLevel(count int, nextBatch func(string, MultiNode, MultiNode, int) []heapInfo) {
lastLevel := &s.values[len(s.values)-1]
if len(lastLevel.children) > count {
return
}
parentIDs := lastLevel.parent
childrenIDs := lastLevel.children[len(lastLevel.children)-1].id
filename := lastLevel.children[len(lastLevel.children)-1].filename
batch := nextBatch(filename, parentIDs, childrenIDs, count)
lastLevel.children = append(lastLevel.children, batch...)
}
func (t *boltForest) nextBatch(b *bbolt.Bucket, last string, parentIDs, nodeIDs MultiNode, count int) []heapInfo {
h := newHeap(&last, count)
t.fillSorted(b, true, last, parentIDs, nodeIDs, h)
var result []heapInfo
for info, ok := h.pop(); ok; info, ok = h.pop() {
result = append(result, info)
}
return result
}

View file

@ -454,3 +454,22 @@ func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID strin
} }
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source) return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
} }
func (s *Shard) TreeListKeys(ctx context.Context, cnr cidSDK.ID, treeID string, cursor pilorama.ListKeysCursor, count int) ([]pilorama.KeyInfo, pilorama.ListKeysCursor, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeListKeys",
trace.WithAttributes(attribute.String("shard_id", s.ID().String())),
)
defer span.End()
if s.pilorama == nil {
return nil, cursor, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, cursor, ErrDegradedMode
}
return s.pilorama.TreeListKeys(ctx, cnr, treeID, cursor, count)
}

View file

@ -0,0 +1,91 @@
package tree
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
func (s *Service) ListKeys(req *ListKeysRequest, srv TreeService_ListKeysServer) error {
b := req.GetBody()
var cnr cidSDK.ID
if err := cnr.Decode(b.GetContainerId()); err != nil {
return err
}
err := s.verifyClient(srv.Context(), req, cnr, nil, acl.OpObjectGet)
if err != nil {
return err
}
ns, pos, err := s.getContainerNodes(cnr)
if err != nil {
return err
}
if pos < 0 {
return relayStream(s, ns, req, srv, (TreeServiceClient).ListKeys)
}
const engineBatchSize = 1000
responseBatchSize := b.GetBatchSize()
if responseBatchSize > engineBatchSize {
responseBatchSize = engineBatchSize
}
batch := newListBatch(srv, int(responseBatchSize))
res, cursor, err := s.forest.TreeListKeys(srv.Context(), cnr, b.GetTreeId(), pilorama.ListKeysCursor{}, engineBatchSize)
for err == nil && len(res) != 0 {
for i := range res {
if err := batch.add(res[i]); err != nil {
return err
}
}
res, cursor, err = s.forest.TreeListKeys(srv.Context(), cnr, b.GetTreeId(), cursor, engineBatchSize)
}
return batch.commit()
}
type listBatch struct {
items []ListKeysResponse_Body_Info
srv TreeService_ListKeysServer
limit int
}
func newListBatch(srv TreeService_ListKeysServer, count int) *listBatch {
return &listBatch{
srv: srv,
limit: count,
}
}
func (b *listBatch) empty() bool {
return len(b.items) == 0
}
func (b *listBatch) add(info pilorama.KeyInfo) error {
b.items = append(b.items, ListKeysResponse_Body_Info{
Key: info.Key,
Meta: metaToProto(info.Meta),
})
if len(b.items) < b.limit {
return nil
}
return b.commit()
}
func (b *listBatch) commit() error {
if len(b.items) == 0 {
return nil
}
items := b.items
b.items = b.items[:0]
return b.srv.Send(&ListKeysResponse{
Body: &ListKeysResponse_Body{
Keys: items,
},
})
}

View file

@ -30,6 +30,42 @@ func relayUnary[Req any, Resp any](ctx context.Context, s *Service, ns []netmapS
return resp, outErr return resp, outErr
} }
type treeServiceServer[Resp any] interface {
Context() context.Context
Send(*Resp) error
}
func relayStream[
Req any,
Resp any,
Server treeServiceServer[Resp],
Client interface{ Recv() (*Resp, error) },
](
s *Service,
ns []netmapSDK.NodeInfo,
req *Req,
srv Server,
callback func(TreeServiceClient, context.Context, *Req, ...grpc.CallOption) (Client, error),
) error {
var cli Client
var outErr error
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
cli, outErr = callback(c, srv.Context(), req)
return true
})
if err != nil {
return err
} else if outErr != nil {
return outErr
}
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
if err := srv.Send(resp); err != nil {
return err
}
}
return nil
}
// forEachNode executes callback for each node in the container until true is returned. // forEachNode executes callback for each node in the container until true is returned.
// Returns errNoSuitableNode if there was no successful attempt to dial any node. // Returns errNoSuitableNode if there was no successful attempt to dial any node.
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error { func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {

View file

@ -381,23 +381,7 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS
return err return err
} }
if pos < 0 { if pos < 0 {
var cli TreeService_GetSubTreeClient return relayStream(s, ns, req, srv, (TreeServiceClient).GetSubTree)
var outErr error
err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
cli, outErr = c.GetSubTree(srv.Context(), req)
return true
})
if err != nil {
return err
} else if outErr != nil {
return outErr
}
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
if err := srv.Send(resp); err != nil {
return err
}
}
return nil
} }
return getSubTree(srv.Context(), srv, cid, b, s.forest) return getSubTree(srv.Context(), srv, cid, b, s.forest)
@ -449,10 +433,8 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
parent: ps, parent: ps,
}} }}
for { for len(stack) != 0 {
if len(stack) == 0 { if item := &stack[len(stack)-1]; len(item.values) == 0 {
break
} else if item := &stack[len(stack)-1]; len(item.values) == 0 {
if len(stack) == 1 { if len(stack) == 1 {
break break
} }
@ -646,23 +628,7 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
return err return err
} }
if pos < 0 { if pos < 0 {
var cli TreeService_GetOpLogClient return relayStream(s, ns, req, srv, (TreeServiceClient).GetOpLog)
var outErr error
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
cli, outErr = c.GetOpLog(srv.Context(), req)
return true
})
if err != nil {
return err
} else if outErr != nil {
return outErr
}
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
if err := srv.Send(resp); err != nil {
return err
}
}
return nil
} }
h := b.GetHeight() h := b.GetHeight()

View file

@ -41,6 +41,9 @@ service TreeService {
rpc GetSubTree(GetSubTreeRequest) returns (stream GetSubTreeResponse); rpc GetSubTree(GetSubTreeRequest) returns (stream GetSubTreeResponse);
// TreeList return list of the existing trees in the container. // TreeList return list of the existing trees in the container.
rpc TreeList(TreeListRequest) returns (TreeListResponse); rpc TreeList(TreeListRequest) returns (TreeListResponse);
// ListKeys returns the list of all keys in the tree sorted by full
// filepath.
rpc ListKeys(ListKeysRequest) returns (stream ListKeysResponse);
/* Synchronization API */ /* Synchronization API */
@ -300,6 +303,30 @@ message TreeListResponse {
Signature signature = 2; Signature signature = 2;
} }
message ListKeysRequest {
message Body {
bytes container_id = 1;
string tree_id = 2;
uint32 batch_size = 3;
}
Body body = 1;
Signature signature = 2;
}
message ListKeysResponse {
message Body {
message Info {
string key = 1;
repeated KeyValue meta = 2;
}
repeated Info keys = 1;
}
Body body = 1;
Signature signature = 2;
}
message ApplyRequest { message ApplyRequest {
message Body { message Body {
// Container ID in V2 format. // Container ID in V2 format.

View file

@ -6746,6 +6746,973 @@ func (x *TreeListResponse) UnmarshalEasyJSON(in *jlexer.Lexer) {
} }
} }
type ListKeysRequest_Body struct {
ContainerId []byte `json:"containerId"`
TreeId string `json:"treeId"`
BatchSize uint32 `json:"batchSize"`
}
var (
_ encoding.ProtoMarshaler = (*ListKeysRequest_Body)(nil)
_ encoding.ProtoUnmarshaler = (*ListKeysRequest_Body)(nil)
_ json.Marshaler = (*ListKeysRequest_Body)(nil)
_ json.Unmarshaler = (*ListKeysRequest_Body)(nil)
)
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ListKeysRequest_Body) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.BytesSize(1, x.ContainerId)
size += proto.StringSize(2, x.TreeId)
size += proto.UInt32Size(3, x.BatchSize)
return size
}
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
func (x *ListKeysRequest_Body) MarshalProtobuf(dst []byte) []byte {
m := pool.MarshalerPool.Get()
defer pool.MarshalerPool.Put(m)
x.EmitProtobuf(m.MessageMarshaler())
dst = m.Marshal(dst)
return dst
}
func (x *ListKeysRequest_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) {
if x == nil {
return
}
if len(x.ContainerId) != 0 {
mm.AppendBytes(1, x.ContainerId)
}
if len(x.TreeId) != 0 {
mm.AppendString(2, x.TreeId)
}
if x.BatchSize != 0 {
mm.AppendUint32(3, x.BatchSize)
}
}
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
func (x *ListKeysRequest_Body) UnmarshalProtobuf(src []byte) (err error) {
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in %s", "ListKeysRequest_Body")
}
switch fc.FieldNum {
case 1: // ContainerId
data, ok := fc.Bytes()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "ContainerId")
}
x.ContainerId = data
case 2: // TreeId
data, ok := fc.String()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "TreeId")
}
x.TreeId = data
case 3: // BatchSize
data, ok := fc.Uint32()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "BatchSize")
}
x.BatchSize = data
}
}
return nil
}
func (x *ListKeysRequest_Body) GetContainerId() []byte {
if x != nil {
return x.ContainerId
}
return nil
}
func (x *ListKeysRequest_Body) SetContainerId(v []byte) {
x.ContainerId = v
}
func (x *ListKeysRequest_Body) GetTreeId() string {
if x != nil {
return x.TreeId
}
return ""
}
func (x *ListKeysRequest_Body) SetTreeId(v string) {
x.TreeId = v
}
func (x *ListKeysRequest_Body) GetBatchSize() uint32 {
if x != nil {
return x.BatchSize
}
return 0
}
func (x *ListKeysRequest_Body) SetBatchSize(v uint32) {
x.BatchSize = v
}
// MarshalJSON implements the json.Marshaler interface.
func (x *ListKeysRequest_Body) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
x.MarshalEasyJSON(&w)
return w.Buffer.BuildBytes(), w.Error
}
func (x *ListKeysRequest_Body) MarshalEasyJSON(out *jwriter.Writer) {
if x == nil {
out.RawString("null")
return
}
first := true
out.RawByte('{')
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"containerId\":"
out.RawString(prefix)
if x.ContainerId != nil {
out.Base64Bytes(x.ContainerId)
} else {
out.String("")
}
}
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"treeId\":"
out.RawString(prefix)
out.String(x.TreeId)
}
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"batchSize\":"
out.RawString(prefix)
out.Uint32(x.BatchSize)
}
out.RawByte('}')
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (x *ListKeysRequest_Body) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
x.UnmarshalEasyJSON(&r)
return r.Error()
}
func (x *ListKeysRequest_Body) UnmarshalEasyJSON(in *jlexer.Lexer) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "containerId":
{
var f []byte
{
tmp := in.Bytes()
if len(tmp) == 0 {
tmp = nil
}
f = tmp
}
x.ContainerId = f
}
case "treeId":
{
var f string
f = in.String()
x.TreeId = f
}
case "batchSize":
{
var f uint32
r := in.JsonNumber()
n := r.String()
v, err := strconv.ParseUint(n, 10, 32)
if err != nil {
in.AddError(err)
return
}
pv := uint32(v)
f = pv
x.BatchSize = f
}
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
type ListKeysRequest struct {
Body *ListKeysRequest_Body `json:"body"`
Signature *Signature `json:"signature"`
}
var (
_ encoding.ProtoMarshaler = (*ListKeysRequest)(nil)
_ encoding.ProtoUnmarshaler = (*ListKeysRequest)(nil)
_ json.Marshaler = (*ListKeysRequest)(nil)
_ json.Unmarshaler = (*ListKeysRequest)(nil)
)
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ListKeysRequest) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *ListKeysRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *ListKeysRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().MarshalProtobuf(buf), nil
}
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
func (x *ListKeysRequest) MarshalProtobuf(dst []byte) []byte {
m := pool.MarshalerPool.Get()
defer pool.MarshalerPool.Put(m)
x.EmitProtobuf(m.MessageMarshaler())
dst = m.Marshal(dst)
return dst
}
func (x *ListKeysRequest) EmitProtobuf(mm *easyproto.MessageMarshaler) {
if x == nil {
return
}
if x.Body != nil {
x.Body.EmitProtobuf(mm.AppendMessage(1))
}
if x.Signature != nil {
x.Signature.EmitProtobuf(mm.AppendMessage(2))
}
}
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
func (x *ListKeysRequest) UnmarshalProtobuf(src []byte) (err error) {
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in %s", "ListKeysRequest")
}
switch fc.FieldNum {
case 1: // Body
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Body")
}
x.Body = new(ListKeysRequest_Body)
if err := x.Body.UnmarshalProtobuf(data); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
case 2: // Signature
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Signature")
}
x.Signature = new(Signature)
if err := x.Signature.UnmarshalProtobuf(data); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
}
}
return nil
}
func (x *ListKeysRequest) GetBody() *ListKeysRequest_Body {
if x != nil {
return x.Body
}
return nil
}
func (x *ListKeysRequest) SetBody(v *ListKeysRequest_Body) {
x.Body = v
}
func (x *ListKeysRequest) GetSignature() *Signature {
if x != nil {
return x.Signature
}
return nil
}
func (x *ListKeysRequest) SetSignature(v *Signature) {
x.Signature = v
}
// MarshalJSON implements the json.Marshaler interface.
func (x *ListKeysRequest) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
x.MarshalEasyJSON(&w)
return w.Buffer.BuildBytes(), w.Error
}
func (x *ListKeysRequest) MarshalEasyJSON(out *jwriter.Writer) {
if x == nil {
out.RawString("null")
return
}
first := true
out.RawByte('{')
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"body\":"
out.RawString(prefix)
x.Body.MarshalEasyJSON(out)
}
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"signature\":"
out.RawString(prefix)
x.Signature.MarshalEasyJSON(out)
}
out.RawByte('}')
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (x *ListKeysRequest) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
x.UnmarshalEasyJSON(&r)
return r.Error()
}
func (x *ListKeysRequest) UnmarshalEasyJSON(in *jlexer.Lexer) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "body":
{
var f *ListKeysRequest_Body
f = new(ListKeysRequest_Body)
f.UnmarshalEasyJSON(in)
x.Body = f
}
case "signature":
{
var f *Signature
f = new(Signature)
f.UnmarshalEasyJSON(in)
x.Signature = f
}
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
type ListKeysResponse_Body_Info struct {
Key string `json:"key"`
Meta []KeyValue `json:"meta"`
}
var (
_ encoding.ProtoMarshaler = (*ListKeysResponse_Body_Info)(nil)
_ encoding.ProtoUnmarshaler = (*ListKeysResponse_Body_Info)(nil)
_ json.Marshaler = (*ListKeysResponse_Body_Info)(nil)
_ json.Unmarshaler = (*ListKeysResponse_Body_Info)(nil)
)
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ListKeysResponse_Body_Info) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.StringSize(1, x.Key)
for i := range x.Meta {
size += proto.NestedStructureSizeUnchecked(2, &x.Meta[i])
}
return size
}
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
func (x *ListKeysResponse_Body_Info) MarshalProtobuf(dst []byte) []byte {
m := pool.MarshalerPool.Get()
defer pool.MarshalerPool.Put(m)
x.EmitProtobuf(m.MessageMarshaler())
dst = m.Marshal(dst)
return dst
}
func (x *ListKeysResponse_Body_Info) EmitProtobuf(mm *easyproto.MessageMarshaler) {
if x == nil {
return
}
if len(x.Key) != 0 {
mm.AppendString(1, x.Key)
}
for i := range x.Meta {
x.Meta[i].EmitProtobuf(mm.AppendMessage(2))
}
}
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
func (x *ListKeysResponse_Body_Info) UnmarshalProtobuf(src []byte) (err error) {
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in %s", "ListKeysResponse_Body_Info")
}
switch fc.FieldNum {
case 1: // Key
data, ok := fc.String()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Key")
}
x.Key = data
case 2: // Meta
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Meta")
}
x.Meta = append(x.Meta, KeyValue{})
ff := &x.Meta[len(x.Meta)-1]
if err := ff.UnmarshalProtobuf(data); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
}
}
return nil
}
func (x *ListKeysResponse_Body_Info) GetKey() string {
if x != nil {
return x.Key
}
return ""
}
func (x *ListKeysResponse_Body_Info) SetKey(v string) {
x.Key = v
}
func (x *ListKeysResponse_Body_Info) GetMeta() []KeyValue {
if x != nil {
return x.Meta
}
return nil
}
func (x *ListKeysResponse_Body_Info) SetMeta(v []KeyValue) {
x.Meta = v
}
// MarshalJSON implements the json.Marshaler interface.
func (x *ListKeysResponse_Body_Info) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
x.MarshalEasyJSON(&w)
return w.Buffer.BuildBytes(), w.Error
}
func (x *ListKeysResponse_Body_Info) MarshalEasyJSON(out *jwriter.Writer) {
if x == nil {
out.RawString("null")
return
}
first := true
out.RawByte('{')
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"key\":"
out.RawString(prefix)
out.String(x.Key)
}
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"meta\":"
out.RawString(prefix)
out.RawByte('[')
for i := range x.Meta {
if i != 0 {
out.RawByte(',')
}
x.Meta[i].MarshalEasyJSON(out)
}
out.RawByte(']')
}
out.RawByte('}')
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (x *ListKeysResponse_Body_Info) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
x.UnmarshalEasyJSON(&r)
return r.Error()
}
func (x *ListKeysResponse_Body_Info) UnmarshalEasyJSON(in *jlexer.Lexer) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "key":
{
var f string
f = in.String()
x.Key = f
}
case "meta":
{
var f KeyValue
var list []KeyValue
in.Delim('[')
for !in.IsDelim(']') {
f = KeyValue{}
f.UnmarshalEasyJSON(in)
list = append(list, f)
in.WantComma()
}
x.Meta = list
in.Delim(']')
}
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
type ListKeysResponse_Body struct {
Keys []ListKeysResponse_Body_Info `json:"keys"`
}
var (
_ encoding.ProtoMarshaler = (*ListKeysResponse_Body)(nil)
_ encoding.ProtoUnmarshaler = (*ListKeysResponse_Body)(nil)
_ json.Marshaler = (*ListKeysResponse_Body)(nil)
_ json.Unmarshaler = (*ListKeysResponse_Body)(nil)
)
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ListKeysResponse_Body) StableSize() (size int) {
if x == nil {
return 0
}
for i := range x.Keys {
size += proto.NestedStructureSizeUnchecked(1, &x.Keys[i])
}
return size
}
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
func (x *ListKeysResponse_Body) MarshalProtobuf(dst []byte) []byte {
m := pool.MarshalerPool.Get()
defer pool.MarshalerPool.Put(m)
x.EmitProtobuf(m.MessageMarshaler())
dst = m.Marshal(dst)
return dst
}
func (x *ListKeysResponse_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) {
if x == nil {
return
}
for i := range x.Keys {
x.Keys[i].EmitProtobuf(mm.AppendMessage(1))
}
}
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
func (x *ListKeysResponse_Body) UnmarshalProtobuf(src []byte) (err error) {
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in %s", "ListKeysResponse_Body")
}
switch fc.FieldNum {
case 1: // Keys
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Keys")
}
x.Keys = append(x.Keys, ListKeysResponse_Body_Info{})
ff := &x.Keys[len(x.Keys)-1]
if err := ff.UnmarshalProtobuf(data); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
}
}
return nil
}
func (x *ListKeysResponse_Body) GetKeys() []ListKeysResponse_Body_Info {
if x != nil {
return x.Keys
}
return nil
}
func (x *ListKeysResponse_Body) SetKeys(v []ListKeysResponse_Body_Info) {
x.Keys = v
}
// MarshalJSON implements the json.Marshaler interface.
func (x *ListKeysResponse_Body) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
x.MarshalEasyJSON(&w)
return w.Buffer.BuildBytes(), w.Error
}
func (x *ListKeysResponse_Body) MarshalEasyJSON(out *jwriter.Writer) {
if x == nil {
out.RawString("null")
return
}
first := true
out.RawByte('{')
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"keys\":"
out.RawString(prefix)
out.RawByte('[')
for i := range x.Keys {
if i != 0 {
out.RawByte(',')
}
x.Keys[i].MarshalEasyJSON(out)
}
out.RawByte(']')
}
out.RawByte('}')
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (x *ListKeysResponse_Body) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
x.UnmarshalEasyJSON(&r)
return r.Error()
}
func (x *ListKeysResponse_Body) UnmarshalEasyJSON(in *jlexer.Lexer) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "keys":
{
var f ListKeysResponse_Body_Info
var list []ListKeysResponse_Body_Info
in.Delim('[')
for !in.IsDelim(']') {
f = ListKeysResponse_Body_Info{}
f.UnmarshalEasyJSON(in)
list = append(list, f)
in.WantComma()
}
x.Keys = list
in.Delim(']')
}
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
type ListKeysResponse struct {
Body *ListKeysResponse_Body `json:"body"`
Signature *Signature `json:"signature"`
}
var (
_ encoding.ProtoMarshaler = (*ListKeysResponse)(nil)
_ encoding.ProtoUnmarshaler = (*ListKeysResponse)(nil)
_ json.Marshaler = (*ListKeysResponse)(nil)
_ json.Unmarshaler = (*ListKeysResponse)(nil)
)
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *ListKeysResponse) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *ListKeysResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *ListKeysResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().MarshalProtobuf(buf), nil
}
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
func (x *ListKeysResponse) MarshalProtobuf(dst []byte) []byte {
m := pool.MarshalerPool.Get()
defer pool.MarshalerPool.Put(m)
x.EmitProtobuf(m.MessageMarshaler())
dst = m.Marshal(dst)
return dst
}
func (x *ListKeysResponse) EmitProtobuf(mm *easyproto.MessageMarshaler) {
if x == nil {
return
}
if x.Body != nil {
x.Body.EmitProtobuf(mm.AppendMessage(1))
}
if x.Signature != nil {
x.Signature.EmitProtobuf(mm.AppendMessage(2))
}
}
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
func (x *ListKeysResponse) UnmarshalProtobuf(src []byte) (err error) {
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in %s", "ListKeysResponse")
}
switch fc.FieldNum {
case 1: // Body
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Body")
}
x.Body = new(ListKeysResponse_Body)
if err := x.Body.UnmarshalProtobuf(data); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
case 2: // Signature
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Signature")
}
x.Signature = new(Signature)
if err := x.Signature.UnmarshalProtobuf(data); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
}
}
return nil
}
func (x *ListKeysResponse) GetBody() *ListKeysResponse_Body {
if x != nil {
return x.Body
}
return nil
}
func (x *ListKeysResponse) SetBody(v *ListKeysResponse_Body) {
x.Body = v
}
func (x *ListKeysResponse) GetSignature() *Signature {
if x != nil {
return x.Signature
}
return nil
}
func (x *ListKeysResponse) SetSignature(v *Signature) {
x.Signature = v
}
// MarshalJSON implements the json.Marshaler interface.
func (x *ListKeysResponse) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
x.MarshalEasyJSON(&w)
return w.Buffer.BuildBytes(), w.Error
}
func (x *ListKeysResponse) MarshalEasyJSON(out *jwriter.Writer) {
if x == nil {
out.RawString("null")
return
}
first := true
out.RawByte('{')
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"body\":"
out.RawString(prefix)
x.Body.MarshalEasyJSON(out)
}
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"signature\":"
out.RawString(prefix)
x.Signature.MarshalEasyJSON(out)
}
out.RawByte('}')
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (x *ListKeysResponse) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
x.UnmarshalEasyJSON(&r)
return r.Error()
}
func (x *ListKeysResponse) UnmarshalEasyJSON(in *jlexer.Lexer) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeFieldName(false)
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "body":
{
var f *ListKeysResponse_Body
f = new(ListKeysResponse_Body)
f.UnmarshalEasyJSON(in)
x.Body = f
}
case "signature":
{
var f *Signature
f = new(Signature)
f.UnmarshalEasyJSON(in)
x.Signature = f
}
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
type ApplyRequest_Body struct { type ApplyRequest_Body struct {
ContainerId []byte `json:"containerId"` ContainerId []byte `json:"containerId"`
TreeId string `json:"treeId"` TreeId string `json:"treeId"`

View file

@ -29,6 +29,7 @@ const (
TreeService_GetNodeByPath_FullMethodName = "/tree.TreeService/GetNodeByPath" TreeService_GetNodeByPath_FullMethodName = "/tree.TreeService/GetNodeByPath"
TreeService_GetSubTree_FullMethodName = "/tree.TreeService/GetSubTree" TreeService_GetSubTree_FullMethodName = "/tree.TreeService/GetSubTree"
TreeService_TreeList_FullMethodName = "/tree.TreeService/TreeList" TreeService_TreeList_FullMethodName = "/tree.TreeService/TreeList"
TreeService_ListKeys_FullMethodName = "/tree.TreeService/ListKeys"
TreeService_Apply_FullMethodName = "/tree.TreeService/Apply" TreeService_Apply_FullMethodName = "/tree.TreeService/Apply"
TreeService_GetOpLog_FullMethodName = "/tree.TreeService/GetOpLog" TreeService_GetOpLog_FullMethodName = "/tree.TreeService/GetOpLog"
TreeService_Healthcheck_FullMethodName = "/tree.TreeService/Healthcheck" TreeService_Healthcheck_FullMethodName = "/tree.TreeService/Healthcheck"
@ -52,6 +53,9 @@ type TreeServiceClient interface {
GetSubTree(ctx context.Context, in *GetSubTreeRequest, opts ...grpc.CallOption) (TreeService_GetSubTreeClient, error) GetSubTree(ctx context.Context, in *GetSubTreeRequest, opts ...grpc.CallOption) (TreeService_GetSubTreeClient, error)
// TreeList return list of the existing trees in the container. // TreeList return list of the existing trees in the container.
TreeList(ctx context.Context, in *TreeListRequest, opts ...grpc.CallOption) (*TreeListResponse, error) TreeList(ctx context.Context, in *TreeListRequest, opts ...grpc.CallOption) (*TreeListResponse, error)
// ListKeys returns the list of all keys in the tree sorted by full
// filepath.
ListKeys(ctx context.Context, in *ListKeysRequest, opts ...grpc.CallOption) (TreeService_ListKeysClient, error)
// Apply pushes log operation from another node to the current. // Apply pushes log operation from another node to the current.
// The request must be signed by a container node. // The request must be signed by a container node.
Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error) Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error)
@ -155,6 +159,38 @@ func (c *treeServiceClient) TreeList(ctx context.Context, in *TreeListRequest, o
return out, nil return out, nil
} }
func (c *treeServiceClient) ListKeys(ctx context.Context, in *ListKeysRequest, opts ...grpc.CallOption) (TreeService_ListKeysClient, error) {
stream, err := c.cc.NewStream(ctx, &TreeService_ServiceDesc.Streams[1], TreeService_ListKeys_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &treeServiceListKeysClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type TreeService_ListKeysClient interface {
Recv() (*ListKeysResponse, error)
grpc.ClientStream
}
type treeServiceListKeysClient struct {
grpc.ClientStream
}
func (x *treeServiceListKeysClient) Recv() (*ListKeysResponse, error) {
m := new(ListKeysResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *treeServiceClient) Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error) { func (c *treeServiceClient) Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error) {
out := new(ApplyResponse) out := new(ApplyResponse)
err := c.cc.Invoke(ctx, TreeService_Apply_FullMethodName, in, out, opts...) err := c.cc.Invoke(ctx, TreeService_Apply_FullMethodName, in, out, opts...)
@ -165,7 +201,7 @@ func (c *treeServiceClient) Apply(ctx context.Context, in *ApplyRequest, opts ..
} }
func (c *treeServiceClient) GetOpLog(ctx context.Context, in *GetOpLogRequest, opts ...grpc.CallOption) (TreeService_GetOpLogClient, error) { func (c *treeServiceClient) GetOpLog(ctx context.Context, in *GetOpLogRequest, opts ...grpc.CallOption) (TreeService_GetOpLogClient, error) {
stream, err := c.cc.NewStream(ctx, &TreeService_ServiceDesc.Streams[1], TreeService_GetOpLog_FullMethodName, opts...) stream, err := c.cc.NewStream(ctx, &TreeService_ServiceDesc.Streams[2], TreeService_GetOpLog_FullMethodName, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -223,6 +259,9 @@ type TreeServiceServer interface {
GetSubTree(*GetSubTreeRequest, TreeService_GetSubTreeServer) error GetSubTree(*GetSubTreeRequest, TreeService_GetSubTreeServer) error
// TreeList return list of the existing trees in the container. // TreeList return list of the existing trees in the container.
TreeList(context.Context, *TreeListRequest) (*TreeListResponse, error) TreeList(context.Context, *TreeListRequest) (*TreeListResponse, error)
// ListKeys returns the list of all keys in the tree sorted by full
// filepath.
ListKeys(*ListKeysRequest, TreeService_ListKeysServer) error
// Apply pushes log operation from another node to the current. // Apply pushes log operation from another node to the current.
// The request must be signed by a container node. // The request must be signed by a container node.
Apply(context.Context, *ApplyRequest) (*ApplyResponse, error) Apply(context.Context, *ApplyRequest) (*ApplyResponse, error)
@ -257,6 +296,9 @@ func (UnimplementedTreeServiceServer) GetSubTree(*GetSubTreeRequest, TreeService
func (UnimplementedTreeServiceServer) TreeList(context.Context, *TreeListRequest) (*TreeListResponse, error) { func (UnimplementedTreeServiceServer) TreeList(context.Context, *TreeListRequest) (*TreeListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TreeList not implemented") return nil, status.Errorf(codes.Unimplemented, "method TreeList not implemented")
} }
func (UnimplementedTreeServiceServer) ListKeys(*ListKeysRequest, TreeService_ListKeysServer) error {
return status.Errorf(codes.Unimplemented, "method ListKeys not implemented")
}
func (UnimplementedTreeServiceServer) Apply(context.Context, *ApplyRequest) (*ApplyResponse, error) { func (UnimplementedTreeServiceServer) Apply(context.Context, *ApplyRequest) (*ApplyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Apply not implemented") return nil, status.Errorf(codes.Unimplemented, "method Apply not implemented")
} }
@ -407,6 +449,27 @@ func _TreeService_TreeList_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _TreeService_ListKeys_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(ListKeysRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(TreeServiceServer).ListKeys(m, &treeServiceListKeysServer{stream})
}
type TreeService_ListKeysServer interface {
Send(*ListKeysResponse) error
grpc.ServerStream
}
type treeServiceListKeysServer struct {
grpc.ServerStream
}
func (x *treeServiceListKeysServer) Send(m *ListKeysResponse) error {
return x.ServerStream.SendMsg(m)
}
func _TreeService_Apply_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _TreeService_Apply_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ApplyRequest) in := new(ApplyRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -510,6 +573,11 @@ var TreeService_ServiceDesc = grpc.ServiceDesc{
Handler: _TreeService_GetSubTree_Handler, Handler: _TreeService_GetSubTree_Handler,
ServerStreams: true, ServerStreams: true,
}, },
{
StreamName: "ListKeys",
Handler: _TreeService_ListKeys_Handler,
ServerStreams: true,
},
{ {
StreamName: "GetOpLog", StreamName: "GetOpLog",
Handler: _TreeService_GetOpLog_Handler, Handler: _TreeService_GetOpLog_Handler,