xk6-frostfs/internal/s3local/treeService.go

212 lines
6.0 KiB
Go

package s3local
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
)
// treeServiceEngineWrapper implements the basic functioning of tree service using
// only the local storage engine instance. The node position and count is fixed
// beforehand in order to coordinate multiple runs on different nodes of the same
// cluster.
//
// The implementation mostly emulates the following
//
// - https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/services/tree/service.go
// - https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/internal/frostfs/services/tree_client_grpc.go
//
// but skips details which are irrelevant for local storage engine-backed clients.
type treeServiceEngineWrapper struct {
ng *engine.StorageEngine
pos int
size int
}
type kv struct {
k string
v []byte
}
func (kv kv) GetKey() string { return kv.k }
func (kv kv) GetValue() []byte { return kv.v }
type nodeResponse struct {
meta []tree.Meta
nodeID uint64
parentID uint64
ts uint64
}
func (r nodeResponse) GetMeta() []tree.Meta { return r.meta }
func (r nodeResponse) GetNodeID() uint64 { return r.nodeID }
func (r nodeResponse) GetParentID() uint64 { return r.parentID }
func (r nodeResponse) GetTimestamp() uint64 { return r.ts }
func (s treeServiceEngineWrapper) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
nodeIDs, err := s.ng.TreeGetByPath(ctx, p.BktInfo.CID, p.TreeID, pilorama.AttributeFilename, p.Path, p.LatestOnly)
if err != nil {
if errors.Is(err, pilorama.ErrTreeNotFound) {
// This is needed in order for the tree implementation to create the tree/node
// if it doesn't exist already.
// See: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/internal/frostfs/services/tree_client_grpc.go#L306
return nil, tree.ErrNodeNotFound
}
return nil, err
}
resps := make([]tree.NodeResponse, 0, len(nodeIDs))
for _, nodeID := range nodeIDs {
m, parentID, err := s.ng.TreeGetMeta(ctx, p.BktInfo.CID, p.TreeID, nodeID)
if err != nil {
return nil, err
}
resp := nodeResponse{
parentID: parentID,
nodeID: nodeID,
ts: m.Time,
}
if p.AllAttrs {
resp.meta = kvToTreeMeta(m.Items)
} else {
for _, it := range m.Items {
for _, attr := range p.Meta {
if it.Key == attr {
resp.meta = append(resp.meta, kv{it.Key, it.Value})
}
break
}
}
}
resps = append(resps, resp)
}
return resps, nil
}
func (s treeServiceEngineWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
var resps []tree.NodeResponse
var traverse func(nodeID uint64, curDepth uint32) error
traverse = func(nodeID uint64, curDepth uint32) error {
m, parentID, err := s.ng.TreeGetMeta(ctx, bktInfo.CID, treeID, nodeID)
if err != nil {
return fmt.Errorf("getting meta: %v", err)
}
resps = append(resps, nodeResponse{
nodeID: nodeID,
parentID: parentID,
ts: m.Time,
meta: kvToTreeMeta(m.Items),
})
if curDepth >= depth {
return nil
}
children, err := s.ng.TreeGetChildren(ctx, bktInfo.CID, treeID, nodeID)
if err != nil {
return fmt.Errorf("getting children: %v", err)
}
for _, child := range children {
if err := traverse(child.ID, curDepth+1); err != nil {
return err
}
}
return nil
}
if err := traverse(rootID, 0); err != nil {
return nil, fmt.Errorf("traversing: %v", err)
}
return resps, nil
}
func (s treeServiceEngineWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parentID uint64, meta map[string]string) (uint64, error) {
desc := pilorama.CIDDescriptor{
CID: bktInfo.CID,
Position: s.pos,
Size: s.size,
}
mv, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: parentID,
Child: pilorama.RootID,
Meta: pilorama.Meta{Items: mapToKV(meta)},
})
return mv.Child, err
}
func (s treeServiceEngineWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
desc := pilorama.CIDDescriptor{
CID: bktInfo.CID,
Position: s.pos,
Size: s.size,
}
mvs, err := s.ng.TreeAddByPath(ctx, desc, treeID, pilorama.AttributeFilename, path, mapToKV(meta))
if err != nil {
return pilorama.TrashID, err
}
return mvs[len(mvs)-1].Child, nil
}
func (s treeServiceEngineWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
if nodeID == pilorama.RootID {
return fmt.Errorf("node with ID %d is the root and can't be moved", nodeID)
}
desc := pilorama.CIDDescriptor{
CID: bktInfo.CID,
Position: s.pos,
Size: s.size,
}
_, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: parentID,
Child: nodeID,
Meta: pilorama.Meta{
Items: mapToKV(meta),
},
})
return err
}
func (s treeServiceEngineWrapper) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
if nodeID == pilorama.RootID {
return fmt.Errorf("node with ID %d is the root and can't be removed", nodeID)
}
desc := pilorama.CIDDescriptor{
CID: bktInfo.CID,
Position: s.pos,
Size: s.size,
}
_, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: pilorama.TrashID,
Child: nodeID,
})
return err
}
func mapToKV(m map[string]string) []pilorama.KeyValue {
var kvs []pilorama.KeyValue
for k, v := range m {
kvs = append(kvs, pilorama.KeyValue{
Key: k,
Value: []byte(v),
})
}
return kvs
}
func kvToTreeMeta(x []pilorama.KeyValue) []tree.Meta {
ret := make([]tree.Meta, 0, len(x))
for _, x := range x {
ret = append(ret, kv{x.Key, x.Value})
}
return ret
}