frostfs-http-gw/tree/tree.go
Nikita Zinkevich 2e71755d69
All checks were successful
/ DCO (pull_request) Successful in 2m16s
/ Vulncheck (pull_request) Successful in 2m21s
/ Builds (pull_request) Successful in 1m39s
/ Lint (pull_request) Successful in 2m39s
/ Tests (pull_request) Successful in 1m42s
[#166] Change the check of protocol during get object request
Add tree service's GetBucketSettings to use them to check for protocol to use (S3 or native). Also add mock implementations for this methods and GetLatestVersion.

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-12-04 15:11:46 +03:00

535 lines
12 KiB
Go

package tree
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]data.NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]data.NodeResponse, error)
}
treeNode struct {
ID []uint64
ParentID []uint64
ObjID oid.ID
TimeStamp []uint64
Size uint64
Meta map[string]string
}
multiSystemNode struct {
// the first element is latest
nodes []*treeNode
}
GetNodesParams struct {
CnrID cid.ID
BktInfo *data.BucketInfo
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
}
)
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = layer.ErrNodeNotFound
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = layer.ErrNodeAccessDenied
)
const (
FileNameKey = "FileName"
versioningKV = "Versioning"
cannedACLKV = "cannedACL"
ownerKeyKV = "ownerKey"
lockConfigurationKV = "LockConfiguration"
oidKV = "OID"
cidKV = "CID"
isUnversionedKV = "IsUnversioned"
uploadIDKV = "UploadId"
sizeKV = "Size"
// keys for delete marker nodes.
isDeleteMarkerKV = "IsDeleteMarker"
settingsFileName = "bucket-settings"
// versionTree -- ID of a tree with object versions.
versionTree = "version"
// systemTree -- ID of a tree with system objects
// i.e. bucket settings with versioning and lock configuration, cors.
systemTree = "system"
separator = "/"
)
// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{service: service, log: log}
}
func newTreeNode(nodeInfo data.NodeResponse) (*treeNode, error) {
tNode := &treeNode{
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
default:
tNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return tNode, nil
}
func (n *treeNode) Get(key string) (string, bool) {
value, ok := n.Meta[key]
return value, ok
}
func (n *treeNode) FileName() (string, bool) {
value, ok := n.Meta[FileNameKey]
return value, ok
}
func (n *treeNode) GetLatestNodeIndex() int {
var (
maxTimestamp uint64
index int
)
for i, timestamp := range n.TimeStamp {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
index = i
}
}
return index
}
func (n *treeNode) IsSplit() bool {
return len(n.ID) != 1 || len(n.ParentID) != 1 || len(n.TimeStamp) != 1
}
func newNodeVersion(node data.NodeResponse, objectName string) (*data.NodeVersion, error) {
tNode, err := newTreeNode(node)
if err != nil {
return nil, fmt.Errorf("invalid tree node: %w", err)
}
return newNodeVersionFromTreeNode(objectName, tNode), nil
}
func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeVersion {
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isUnversioned := treeNode.Get(isUnversionedKV)
size, _ := treeNode.Get(sizeKV)
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: treeNode.ObjID,
FilePath: filePath,
},
DeleteMarker: isDeleteMarker,
IsUnversioned: isUnversioned,
IsPrefixNode: size == "",
}
return version
}
func newMultiNode(nodes []data.NodeResponse) (*multiSystemNode, error) {
var (
err error
index int
maxTimestamp uint64
)
if len(nodes) == 0 {
return nil, errors.New("multi node must have at least one node")
}
treeNodes := make([]*treeNode, len(nodes))
for i, node := range nodes {
if treeNodes[i], err = newTreeNode(node); err != nil {
return nil, fmt.Errorf("parse system node response: %w", err)
}
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
return &multiSystemNode{
nodes: treeNodes,
}, nil
}
func (m *multiSystemNode) Latest() *treeNode {
return m.nodes[0]
}
func (m *multiSystemNode) Old() []*treeNode {
return m.nodes[1:]
}
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.NodeVersion, error) {
nodes, err := c.GetVersions(ctx, cnrID, objectName)
if err != nil {
return nil, err
}
latestNode, err := getLatestVersionNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(latestNode, objectName)
}
func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]data.NodeResponse, error) {
meta := []string{oidKV, isDeleteMarkerKV, sizeKV}
path := pathFromName(objectName)
p := &GetNodesParams{
CnrID: *cnrID,
TreeID: versionTree,
Path: path,
Meta: meta,
LatestOnly: false,
AllAttrs: false,
}
return c.service.GetNodes(ctx, p)
}
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
p := &GetNodesParams{
CnrID: bktInfo.CID,
BktInfo: bktInfo,
TreeID: systemTree,
Path: []string{name},
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
nodes = filterMultipartNodes(nodes)
if len(nodes) == 0 {
return nil, ErrNodeNotFound
}
return newMultiNode(nodes)
}
func filterMultipartNodes(nodes []data.NodeResponse) []data.NodeResponse {
res := make([]data.NodeResponse, 0, len(nodes))
LOOP:
for _, node := range nodes {
for _, meta := range node.GetMeta() {
if meta.GetKey() == uploadIDKV {
continue LOOP
}
}
res = append(res, node)
}
return res
}
func getLatestVersionNode(nodes []data.NodeResponse) (data.NodeResponse, error) {
var (
maxCreationTime uint64
targetIndexNode = -1
)
for i, node := range nodes {
if !checkExistOID(node.GetMeta()) {
continue
}
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
targetIndexNode = i
maxCreationTime = currentCreationTime
}
}
if targetIndexNode == -1 {
return nil, layer.ErrNodeNotFound
}
return nodes[targetIndexNode], nil
}
func checkExistOID(meta []data.Meta) bool {
for _, kv := range meta {
if kv.GetKey() == "OID" {
return true
}
}
return false
}
// pathFromName splits name by '/'.
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
func (c *Tree) GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]data.NodeResponse, string, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, versionTree, prefix)
if err != nil {
return nil, "", err
}
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, rootID, 2, false)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
nodesMap := make(map[string][]data.NodeResponse, len(subTree))
for _, node := range subTree {
if MultiID(rootID).Equal(node.GetNodeID()) {
continue
}
fileName := GetFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []data.NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]data.NodeResponse{node}, nodes...)
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]data.NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
rootID := []uint64{0}
path := strings.Split(prefix, separator)
tailPrefix := path[len(path)-1]
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
if err != nil {
return nil, "", err
}
}
return rootID, tailPrefix, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
p := &GetNodesParams{
CnrID: bktInfo.CID,
BktInfo: bktInfo,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
}
}
if len(intermediateNodes) == 0 {
return nil, layer.ErrNodeNotFound
}
return intermediateNodes, nil
}
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
node := multiNode.Latest()
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
if versioningValue, ok := node.Get(versioningKV); ok {
settings.Versioning = versioningValue
}
if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
if settings.LockConfiguration, err = parseLockConfiguration(lockConfigurationValue); err != nil {
return nil, fmt.Errorf("settings node: invalid lock configuration: %w", err)
}
}
settings.CannedACL, _ = node.Get(cannedACLKV)
if ownerKeyHex, ok := node.Get(ownerKeyKV); ok {
if settings.OwnerKey, err = keys.NewPublicKeyFromString(ownerKeyHex); err != nil {
c.log.Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err))
}
}
return settings, nil
}
func GetFilename(node data.NodeResponse) string {
for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey {
return string(kv.GetValue())
}
}
return ""
}
func isIntermediate(node data.NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func getMaxTimestamp(node data.NodeResponse) uint64 {
var maxTimestamp uint64
for _, timestamp := range node.GetTimestamp() {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
return maxTimestamp
}
type MultiID []uint64
func (m MultiID) Equal(id MultiID) bool {
seen := make(map[uint64]struct{}, len(m))
for i := range m {
seen[m[i]] = struct{}{}
}
for i := range id {
if _, ok := seen[id[i]]; !ok {
return false
}
}
return true
}
func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) {
result := &data.ObjectLockConfiguration{}
if len(value) == 0 {
return result, nil
}
lockValues := strings.Split(value, ",")
result.ObjectLockEnabled = lockValues[0]
if len(lockValues) == 1 {
return result, nil
}
if len(lockValues) != 4 {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
var err error
var days, years int64
if len(lockValues[1]) > 0 {
if days, err = strconv.ParseInt(lockValues[1], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
if len(lockValues[3]) > 0 {
if years, err = strconv.ParseInt(lockValues[3], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
result.Rule = &data.ObjectLockRule{
DefaultRetention: &data.DefaultRetention{
Days: days,
Mode: lockValues[2],
Years: years,
},
}
return result, nil
}