frostfs-http-gw/tree/tree.go

351 lines
7.8 KiB
Go
Raw Normal View History

package tree
import (
"context"
"errors"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type (
Tree struct {
service ServiceClient
}
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error)
}
treeNode struct {
ObjID oid.ID
Meta map[string]string
}
GetNodesParams struct {
CnrID cid.ID
BktInfo *data.BucketInfo
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
}
)
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = layer.ErrNodeNotFound
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = layer.ErrNodeAccessDenied
)
const (
FileNameKey = "FileName"
)
const (
oidKV = "OID"
// keys for delete marker nodes.
isDeleteMarkerKV = "IsDeleteMarker"
sizeKV = "Size"
// versionTree -- ID of a tree with object versions.
versionTree = "version"
separator = "/"
)
// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient) *Tree {
return &Tree{service: service}
}
type Meta interface {
GetKey() string
GetValue() []byte
}
type NodeResponse interface {
GetMeta() []Meta
GetTimestamp() []uint64
GetNodeID() []uint64
}
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
tNode := &treeNode{
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
default:
tNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return tNode, nil
}
func (n *treeNode) Get(key string) (string, bool) {
value, ok := n.Meta[key]
return value, ok
}
func (n *treeNode) FileName() (string, bool) {
value, ok := n.Meta[FileNameKey]
return value, ok
}
func newNodeVersion(node NodeResponse) (*api.NodeVersion, error) {
tNode, err := newTreeNode(node)
if err != nil {
return nil, fmt.Errorf("invalid tree node: %w", err)
}
return newNodeVersionFromTreeNode(tNode), nil
}
func newNodeVersionFromTreeNode(treeNode *treeNode) *api.NodeVersion {
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
size, _ := treeNode.Get(sizeKV)
version := &api.NodeVersion{
BaseNodeVersion: api.BaseNodeVersion{
OID: treeNode.ObjID,
},
DeleteMarker: isDeleteMarker,
IsPrefixNode: size == "",
}
return version
}
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) {
nodes, err := c.GetVersions(ctx, cnrID, objectName)
if err != nil {
return nil, err
}
latestNode, err := getLatestVersionNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(latestNode)
}
func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]NodeResponse, error) {
meta := []string{oidKV, isDeleteMarkerKV, sizeKV}
path := pathFromName(objectName)
p := &GetNodesParams{
CnrID: *cnrID,
TreeID: versionTree,
Path: path,
Meta: meta,
LatestOnly: false,
AllAttrs: false,
}
return c.service.GetNodes(ctx, p)
}
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
var (
maxCreationTime uint64
targetIndexNode = -1
)
for i, node := range nodes {
if !checkExistOID(node.GetMeta()) {
continue
}
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
targetIndexNode = i
maxCreationTime = currentCreationTime
}
}
if targetIndexNode == -1 {
return nil, layer.ErrNodeNotFound
}
return nodes[targetIndexNode], nil
}
func checkExistOID(meta []Meta) bool {
for _, kv := range meta {
if kv.GetKey() == "OID" {
return true
}
}
return false
}
// pathFromName splits name by '/'.
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
func (c *Tree) GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, rootID, 2, false)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
nodesMap := make(map[string][]NodeResponse, len(subTree))
for _, node := range subTree {
if MultiID(rootID).Equal(node.GetNodeID()) {
continue
}
fileName := getFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]NodeResponse{node}, nodes...)
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
rootID := []uint64{0}
path := strings.Split(prefix, separator)
tailPrefix := path[len(path)-1]
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
if err != nil {
return nil, "", err
}
}
return rootID, tailPrefix, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
p := &GetNodesParams{
CnrID: bktInfo.CID,
BktInfo: bktInfo,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
}
}
if len(intermediateNodes) == 0 {
return nil, layer.ErrNodeNotFound
}
return intermediateNodes, nil
}
func getFilename(node NodeResponse) string {
for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey {
return string(kv.GetValue())
}
}
return ""
}
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func getMaxTimestamp(node NodeResponse) uint64 {
var maxTimestamp uint64
for _, timestamp := range node.GetTimestamp() {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
return maxTimestamp
}
type MultiID []uint64
func (m MultiID) Equal(id MultiID) bool {
seen := make(map[uint64]struct{}, len(m))
for i := range m {
seen[m[i]] = struct{}{}
}
for i := range id {
if _, ok := seen[id[i]]; !ok {
return false
}
}
return true
}