forked from TrueCloudLab/frostfs-node
91 lines
2 KiB
Go
91 lines
2 KiB
Go
package tree
|
|
|
|
import (
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
)
|
|
|
|
func (s *Service) ListKeys(req *ListKeysRequest, srv TreeService_ListKeysServer) error {
|
|
b := req.GetBody()
|
|
|
|
var cnr cidSDK.ID
|
|
if err := cnr.Decode(b.GetContainerId()); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := s.verifyClient(srv.Context(), req, cnr, nil, acl.OpObjectGet)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ns, pos, err := s.getContainerNodes(cnr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if pos < 0 {
|
|
return relayStream(s, ns, req, srv, (TreeServiceClient).ListKeys)
|
|
}
|
|
|
|
const engineBatchSize = 1000
|
|
|
|
responseBatchSize := b.GetBatchSize()
|
|
if responseBatchSize > engineBatchSize {
|
|
responseBatchSize = engineBatchSize
|
|
}
|
|
|
|
batch := newListBatch(srv, int(responseBatchSize))
|
|
res, cursor, err := s.forest.TreeListKeys(srv.Context(), cnr, b.GetTreeId(), pilorama.ListKeysCursor{}, engineBatchSize)
|
|
for err == nil && len(res) != 0 {
|
|
for i := range res {
|
|
if err := batch.add(res[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
res, cursor, err = s.forest.TreeListKeys(srv.Context(), cnr, b.GetTreeId(), cursor, engineBatchSize)
|
|
}
|
|
return batch.commit()
|
|
}
|
|
|
|
type listBatch struct {
|
|
items []ListKeysResponse_Body_Info
|
|
srv TreeService_ListKeysServer
|
|
limit int
|
|
}
|
|
|
|
func newListBatch(srv TreeService_ListKeysServer, count int) *listBatch {
|
|
return &listBatch{
|
|
srv: srv,
|
|
limit: count,
|
|
}
|
|
}
|
|
|
|
func (b *listBatch) empty() bool {
|
|
return len(b.items) == 0
|
|
}
|
|
|
|
func (b *listBatch) add(info pilorama.KeyInfo) error {
|
|
b.items = append(b.items, ListKeysResponse_Body_Info{
|
|
Key: info.Key,
|
|
Meta: metaToProto(info.Meta),
|
|
})
|
|
if len(b.items) < b.limit {
|
|
return nil
|
|
}
|
|
return b.commit()
|
|
}
|
|
|
|
func (b *listBatch) commit() error {
|
|
if len(b.items) == 0 {
|
|
return nil
|
|
}
|
|
|
|
items := b.items
|
|
b.items = b.items[:0]
|
|
return b.srv.Send(&ListKeysResponse{
|
|
Body: &ListKeysResponse_Body{
|
|
Keys: items,
|
|
},
|
|
})
|
|
}
|