frostfs-s3-gw/pkg/service/tree/tree_client_in_memory.go

502 lines
10 KiB
Go
Raw Normal View History

package tree
import (
"context"
"errors"
"fmt"
"io"
"sort"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"golang.org/x/exp/slices"
)
type nodeMeta struct {
key string
value []byte
}
func (m nodeMeta) GetKey() string {
return m.key
}
func (m nodeMeta) GetValue() []byte {
return m.value
}
type nodeResponse struct {
meta []nodeMeta
nodeID uint64
parentID uint64
timestamp uint64
}
func (n nodeResponse) GetNodeID() []uint64 {
return []uint64{n.nodeID}
}
func (n nodeResponse) GetParentID() []uint64 {
return []uint64{n.parentID}
}
func (n nodeResponse) GetTimestamp() []uint64 {
return []uint64{n.timestamp}
}
func (n nodeResponse) GetMeta() []Meta {
res := make([]Meta, len(n.meta))
for i, value := range n.meta {
res[i] = value
}
return res
}
func (n nodeResponse) getValue(key string) string {
for _, value := range n.meta {
if value.key == key {
return string(value.value)
}
}
return ""
}
type ServiceClientMemory struct {
containers map[string]containerInfo
}
type containerInfo struct {
bkt *data.BucketInfo
trees map[string]memoryTree
}
type memoryTree struct {
idCounter uint64
treeData *treeNodeMemory
}
type treeNodeMemory struct {
data nodeResponse
parent *treeNodeMemory
children []*treeNodeMemory
}
func (t *treeNodeMemory) getNode(nodeID uint64) *treeNodeMemory {
if t.data.nodeID == nodeID {
return t
}
for _, child := range t.children {
if node := child.getNode(nodeID); node != nil {
return node
}
}
return nil
}
func (t *memoryTree) getNodesByPath(path []string) []nodeResponse {
if len(path) == 0 {
return nil
}
var res []nodeResponse
for _, child := range t.treeData.children {
res = child.listNodesByPath(res, path)
}
return res
}
func (t *treeNodeMemory) listNodesByPath(res []nodeResponse, path []string) []nodeResponse {
if len(path) == 0 || t.data.getValue(FileNameKey) != path[0] {
return res
}
if len(path) == 1 {
return append(res, t.data)
}
for _, ch := range t.children {
res = ch.listNodesByPath(res, path[1:])
}
return res
}
func (t *memoryTree) createPathIfNotExist(parent *treeNodeMemory, path []string) *treeNodeMemory {
if len(path) == 0 {
return parent
}
var node *treeNodeMemory
for _, child := range parent.children {
if len(child.data.meta) == 1 && child.data.getValue(FileNameKey) == path[0] {
node = child
break
}
}
if node == nil {
node = &treeNodeMemory{
data: nodeResponse{
meta: []nodeMeta{{key: FileNameKey, value: []byte(path[0])}},
nodeID: t.idCounter,
parentID: parent.data.nodeID,
timestamp: uint64(time.Now().UnixMicro()),
},
parent: parent,
}
t.idCounter++
parent.children = append(parent.children, node)
}
return t.createPathIfNotExist(node, path[1:])
}
func (t *treeNodeMemory) removeChild(nodeID uint64) {
ind := -1
for i, ch := range t.children {
if ch.data.nodeID == nodeID {
ind = i
break
}
}
if ind != -1 {
t.children = append(t.children[:ind], t.children[ind+1:]...)
}
}
func (t *treeNodeMemory) listNodes(res []NodeResponse, depth uint32) []NodeResponse {
res = append(res, t.data)
if depth == 0 {
return res
}
for _, ch := range t.children {
res = ch.listNodes(res, depth-1)
}
return res
}
func NewTreeServiceClientMemory() (*ServiceClientMemory, error) {
return &ServiceClientMemory{
containers: make(map[string]containerInfo),
}, nil
}
type nodeResponseWrapper struct {
nodeResponse
allAttr bool
attrs []string
}
func (n nodeResponseWrapper) GetMeta() []Meta {
res := make([]Meta, 0, len(n.meta))
for _, value := range n.meta {
if n.allAttr || slices.Contains(n.attrs, value.key) {
res = append(res, value)
}
}
return res
}
func (c *ServiceClientMemory) GetNodes(_ context.Context, p *GetNodesParams) ([]NodeResponse, error) {
cnr, ok := c.containers[p.BktInfo.CID.EncodeToString()]
if !ok {
return nil, nil
}
tr, ok := cnr.trees[p.TreeID]
if !ok {
return nil, nil
}
res := tr.getNodesByPath(p.Path)
sort.Slice(res, func(i, j int) bool {
return res[i].timestamp < res[j].timestamp
})
if p.LatestOnly && len(res) != 0 {
res = res[len(res)-1:]
}
res2 := make([]NodeResponse, len(res))
for i, n := range res {
res2[i] = nodeResponseWrapper{
nodeResponse: n,
allAttr: p.AllAttrs,
attrs: p.Meta,
}
}
return res2, nil
}
func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return nil, nil
}
tr, ok := cnr.trees[treeID]
if !ok {
return nil, tree.ErrNodeNotFound
}
if len(rootID) != 1 {
return nil, errors.New("invalid rootID")
}
node := tr.treeData.getNode(rootID[0])
if node == nil {
return nil, tree.ErrNodeNotFound
}
if sort {
sortNode(tr.treeData)
}
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
return node.listNodes(nil, depth-1), nil
}
type SubTreeStreamMemoryImpl struct {
res []NodeResponse
offset int
err error
}
func (s *SubTreeStreamMemoryImpl) Next(ctx context.Context) (NodeResponse, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if s.err != nil {
return nil, s.err
}
if s.offset > len(s.res)-1 {
return nil, io.EOF
}
s.offset++
return s.res[s.offset-1], nil
}
func (c *ServiceClientMemory) GetSubTreeStream(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return &SubTreeStreamMemoryImpl{err: tree.ErrNodeNotFound}, nil
}
tr, ok := cnr.trees[treeID]
if !ok {
return nil, tree.ErrNodeNotFound
}
if len(rootID) != 1 {
return nil, errors.New("invalid rootID")
}
node := tr.treeData.getNode(rootID[0])
if node == nil {
return nil, tree.ErrNodeNotFound
}
sortNode(tr.treeData)
return &SubTreeStreamMemoryImpl{
res: node.listNodes(nil, depth-1),
offset: 0,
}, nil
}
func newContainerInfo(bktInfo *data.BucketInfo, treeID string) containerInfo {
return containerInfo{
bkt: bktInfo,
trees: map[string]memoryTree{
treeID: {
idCounter: 1,
treeData: &treeNodeMemory{
data: nodeResponse{
timestamp: uint64(time.Now().UnixMicro()),
},
},
},
},
}
}
func newMemoryTree() memoryTree {
return memoryTree{
idCounter: 1,
treeData: &treeNodeMemory{
data: nodeResponse{
timestamp: uint64(time.Now().UnixMicro()),
},
},
}
}
func (c *ServiceClientMemory) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
return c.AddNodeBase(ctx, bktInfo, treeID, parent, meta, true)
}
func (c *ServiceClientMemory) AddNodeBase(_ context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string, needSort bool) (uint64, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
cnr = newContainerInfo(bktInfo, treeID)
c.containers[bktInfo.CID.EncodeToString()] = cnr
}
tr, ok := cnr.trees[treeID]
if !ok {
tr = newMemoryTree()
cnr.trees[treeID] = tr
}
parentNode := tr.treeData.getNode(parent)
if parentNode == nil {
return 0, tree.ErrNodeNotFound
}
newID := tr.idCounter
tr.idCounter++
tn := &treeNodeMemory{
data: nodeResponse{
meta: metaToNodeMeta(meta),
nodeID: newID,
parentID: parent,
timestamp: uint64(time.Now().UnixMicro()),
},
parent: parentNode,
}
parentNode.children = append(parentNode.children, tn)
if needSort {
sortNodes(parentNode.children)
}
cnr.trees[treeID] = tr
return newID, nil
}
func (c *ServiceClientMemory) AddNodeByPath(_ context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
cnr = newContainerInfo(bktInfo, treeID)
c.containers[bktInfo.CID.EncodeToString()] = cnr
}
tr, ok := cnr.trees[treeID]
if !ok {
tr = newMemoryTree()
cnr.trees[treeID] = tr
}
parentNode := tr.createPathIfNotExist(tr.treeData, path)
if parentNode == nil {
return 0, fmt.Errorf("create path '%s'", path)
}
newID := tr.idCounter
tr.idCounter++
tn := &treeNodeMemory{
data: nodeResponse{
meta: metaToNodeMeta(meta),
nodeID: newID,
parentID: parentNode.data.nodeID,
timestamp: uint64(time.Now().UnixMicro()),
},
parent: parentNode,
}
parentNode.children = append(parentNode.children, tn)
cnr.trees[treeID] = tr
return newID, nil
}
func (c *ServiceClientMemory) MoveNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return tree.ErrNodeNotFound
}
tr, ok := cnr.trees[treeID]
if !ok {
return tree.ErrNodeNotFound
}
node := tr.treeData.getNode(nodeID)
if node == nil {
return tree.ErrNodeNotFound
}
newParent := tr.treeData.getNode(parentID)
if newParent == nil {
return tree.ErrNodeNotFound
}
node.data.meta = metaToNodeMeta(meta)
node.data.parentID = parentID
newParent.children = append(newParent.children, node)
node.parent.removeChild(nodeID)
return nil
}
func sortNode(node *treeNodeMemory) {
if node == nil {
return
}
sortNodes(node.children)
for _, child := range node.children {
sortNode(child)
}
}
func sortNodes(list []*treeNodeMemory) {
sort.Slice(list, func(i, j int) bool {
return list[i].data.getValue(FileNameKey) < list[j].data.getValue(FileNameKey)
})
}
func (c *ServiceClientMemory) RemoveNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return tree.ErrNodeNotFound
}
tr, ok := cnr.trees[treeID]
if !ok {
return tree.ErrNodeNotFound
}
node := tr.treeData.getNode(nodeID)
if node == nil {
return tree.ErrNodeNotFound
}
node.parent.removeChild(nodeID)
return nil
}
func metaToNodeMeta(m map[string]string) []nodeMeta {
result := make([]nodeMeta, 0, len(m))
for key, value := range m {
result = append(result, nodeMeta{key: key, value: []byte(value)})
}
return result
}