frostfs-s3-gw/pkg/service/tree/tree.go
Roman Loginov 6f9ee3da76
All checks were successful
/ Vulncheck (pull_request) Successful in 1m13s
/ DCO (pull_request) Successful in 1m43s
/ Builds (1.20) (pull_request) Successful in 2m49s
/ Builds (1.21) (pull_request) Successful in 1m36s
/ Lint (pull_request) Successful in 3m33s
/ Tests (1.20) (pull_request) Successful in 2m34s
/ Tests (1.21) (pull_request) Successful in 2m29s
[#275] Change logic delete multipart upload
In order not to accidentally take outdated
information about downloaded parts from other
nodes, now when the multipart is abort or complete,
the root node of the multipart upload with the
finish flag remains in the tree.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2023-12-27 13:06:45 +03:00

1364 lines
37 KiB
Go

package tree
import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error)
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error
RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error
}
treeNode struct {
ID uint64
ParentID uint64
ObjID oid.ID
TimeStamp uint64
Size uint64
Meta map[string]string
}
GetNodesParams struct {
BktInfo *data.BucketInfo
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
}
)
const (
FileNameKey = "FileName"
)
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = layer.ErrNodeNotFound
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = layer.ErrNodeAccessDenied
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = layer.ErrGatewayTimeout
)
const (
versioningKV = "Versioning"
lockConfigurationKV = "LockConfiguration"
oidKV = "OID"
isCombinedKV = "IsCombined"
isUnversionedKV = "IsUnversioned"
isTagKV = "IsTag"
uploadIDKV = "UploadId"
partNumberKV = "Number"
sizeKV = "Size"
etagKV = "ETag"
md5KV = "MD5"
finishedKV = "Finished"
// keys for lock.
isLockKV = "IsLock"
legalHoldOIDKV = "LegalHoldOID"
retentionOIDKV = "RetentionOID"
untilDateKV = "UntilDate"
isComplianceKV = "IsCompliance"
// keys for delete marker nodes.
isDeleteMarkerKV = "IsDeleteMarker"
ownerKV = "Owner"
createdKV = "Created"
settingsFileName = "bucket-settings"
notifConfFileName = "bucket-notifications"
corsFilename = "bucket-cors"
bucketTaggingFilename = "bucket-tagging"
// versionTree -- ID of a tree with object versions.
versionTree = "version"
// systemTree -- ID of a tree with system objects
// i.e. bucket settings with versioning and lock configuration, cors, notifications.
systemTree = "system"
separator = "/"
userDefinedTagPrefix = "User-Tag-"
maxGetSubTreeDepth = 0 // means all subTree
)
// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{
service: service,
log: log,
}
}
type Meta interface {
GetKey() string
GetValue() []byte
}
type NodeResponse interface {
GetMeta() []Meta
GetNodeID() uint64
GetParentID() uint64
GetTimestamp() uint64
}
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
treeNode := &treeNode{
ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(),
TimeStamp: nodeInfo.GetTimestamp(),
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
case sizeKV:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if treeNode.Size, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err)
}
}
default:
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return treeNode, nil
}
func (n *treeNode) Get(key string) (string, bool) {
value, ok := n.Meta[key]
return value, ok
}
func (n *treeNode) FileName() (string, bool) {
value, ok := n.Meta[FileNameKey]
return value, ok
}
func newNodeVersion(filePath string, node NodeResponse) (*data.NodeVersion, error) {
treeNode, err := newTreeNode(node)
if err != nil {
return nil, fmt.Errorf("invalid tree node: %w", err)
}
return newNodeVersionFromTreeNode(filePath, treeNode), nil
}
func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeVersion {
_, isUnversioned := treeNode.Get(isUnversionedKV)
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isCombined := treeNode.Get(isCombinedKV)
eTag, _ := treeNode.Get(etagKV)
md5, _ := treeNode.Get(md5KV)
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
ID: treeNode.ID,
ParenID: treeNode.ParentID,
OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp,
ETag: eTag,
MD5: md5,
Size: treeNode.Size,
FilePath: filePath,
},
IsUnversioned: isUnversioned,
IsCombined: isCombined,
}
if isDeleteMarker {
var created time.Time
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil {
created = time.UnixMilli(utcMilli)
}
}
var owner user.ID
if ownerStr, ok := treeNode.Get(ownerKV); ok {
_ = owner.DecodeString(ownerStr)
}
version.DeleteMarker = &data.DeleteMarkerInfo{
Created: created,
Owner: owner,
}
}
return version
}
func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" {
return nil, fmt.Errorf("it's not a multipart node")
}
multipartInfo := &data.MultipartInfo{
ID: treeNode.ID,
Key: filePath,
UploadID: uploadID,
Meta: treeNode.Meta,
}
ownerID, _ := treeNode.Get(ownerKV)
_ = multipartInfo.Owner.DecodeString(ownerID)
created, _ := treeNode.Get(createdKV)
if utcMilli, err := strconv.ParseInt(created, 10, 64); err == nil {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
finished, _ := treeNode.Get(finishedKV)
if flag, err := strconv.ParseBool(finished); err == nil {
multipartInfo.Finished = flag
}
return multipartInfo, nil
}
func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
multipartInfo := &data.MultipartInfo{
ID: node.GetNodeID(),
Meta: make(map[string]string, len(node.GetMeta())),
}
for _, kv := range node.GetMeta() {
switch kv.GetKey() {
case uploadIDKV:
multipartInfo.UploadID = string(kv.GetValue())
case FileNameKey:
multipartInfo.Key = string(kv.GetValue())
case createdKV:
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err == nil {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
case ownerKV:
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
multipartInfo.Finished = isFinished
}
default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
if multipartInfo.UploadID == "" {
return nil, fmt.Errorf("it's not a multipart node")
}
return multipartInfo, nil
}
func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
var err error
partInfo := &data.PartInfo{}
for _, kv := range node.GetMeta() {
value := string(kv.GetValue())
switch kv.GetKey() {
case partNumberKV:
if partInfo.Number, err = strconv.Atoi(value); err != nil {
return nil, fmt.Errorf("invalid part number: %w", err)
}
case oidKV:
if err = partInfo.OID.DecodeString(value); err != nil {
return nil, fmt.Errorf("invalid oid: %w", err)
}
case etagKV:
partInfo.ETag = value
case sizeKV:
if partInfo.Size, err = strconv.ParseUint(value, 10, 64); err != nil {
return nil, fmt.Errorf("invalid part size: %w", err)
}
case createdKV:
var utcMilli int64
if utcMilli, err = strconv.ParseInt(value, 10, 64); err != nil {
return nil, fmt.Errorf("invalid created timestamp: %w", err)
}
partInfo.Created = time.UnixMilli(utcMilli)
case md5KV:
partInfo.MD5 = value
}
}
if partInfo.Number <= 0 {
return nil, fmt.Errorf("it's not a part node")
}
return partInfo, nil
}
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
keysToReturn := []string{versioningKV, lockConfigurationKV}
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}, keysToReturn)
if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
if versioningValue, ok := node.Get(versioningKV); ok {
settings.Versioning = versioningValue
}
if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
if settings.LockConfiguration, err = parseLockConfiguration(lockConfigurationValue); err != nil {
return nil, fmt.Errorf("settings node: invalid lock configuration: %w", err)
}
}
return settings, nil
}
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}, []string{})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
}
meta := metaFromSettings(settings)
if isErrNotFound {
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta)
return err
}
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
}
func (c *Tree) GetNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
if err != nil {
return oid.ID{}, err
}
return node.ObjID, nil
}
func (c *Tree) PutNotificationConfigurationNode(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{notifConfFileName}, []string{oidKV})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = notifConfFileName
meta[oidKV] = objID.EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return oid.ID{}, err
}
return oid.ID{}, layer.ErrNoNodeToRemove
}
return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
}
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV})
if err != nil {
return oid.ID{}, err
}
return node.ObjID, nil
}
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = corsFilename
meta[oidKV] = objID.EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return oid.ID{}, err
}
return oid.ID{}, layer.ErrNoNodeToRemove
}
return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, meta)
}
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV})
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
return oid.ID{}, err
}
if node != nil {
return node.ObjID, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID)
}
return oid.ID{}, layer.ErrNoNodeToRemove
}
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return nil, err
}
return getObjectTagging(tagNode), nil
}
func getObjectTagging(tagNode *treeNode) map[string]string {
if tagNode == nil {
return nil
}
meta := make(map[string]string)
for key, val := range tagNode.Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) {
meta[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
}
}
return meta
}
func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return err
}
treeTagSet := make(map[string]string)
treeTagSet[isTagKV] = "true"
for key, val := range tagSet {
treeTagSet[userDefinedTagPrefix+key] = val
}
if tagNode == nil {
_, err = c.service.AddNode(ctx, bktInfo, versionTree, objVersion.ID, treeTagSet)
} else {
err = c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID, objVersion.ID, treeTagSet)
}
return err
}
func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) error {
return c.PutObjectTagging(ctx, bktInfo, objVersion, nil)
}
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
node, err := c.getSystemNodeWithAllAttributes(ctx, bktInfo, []string{bucketTaggingFilename})
if err != nil {
return nil, err
}
tags := make(map[string]string)
for key, val := range node.Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) {
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
}
}
return tags, nil
}
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}, []string{})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
}
treeTagSet := make(map[string]string)
treeTagSet[FileNameKey] = bucketTaggingFilename
for key, val := range tagSet {
treeTagSet[userDefinedTagPrefix+key] = val
}
if isErrNotFound {
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, treeTagSet)
} else {
err = c.service.MoveNode(ctx, bktInfo, systemTree, node.ID, 0, treeTagSet)
}
return err
}
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
return c.PutBucketTagging(ctx, bktInfo, nil)
}
func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, key string) (*treeNode, error) {
nodes, err := c.getTreeNodes(ctx, bktInfo, nodeID, key)
if err != nil {
return nil, err
}
// if there will be many allocations, consider having separate
// implementations of 'getTreeNode' and 'getTreeNodes'
return nodes[key], nil
}
func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) {
subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, nodeID, 2)
if err != nil {
return nil, err
}
treeNodes := make(map[string]*treeNode, len(keys))
for _, s := range subtree {
node, err := newTreeNode(s)
if err != nil {
return nil, err
}
for _, key := range keys {
if _, ok := node.Get(key); ok {
treeNodes[key] = node
break
}
}
if len(treeNodes) == len(keys) {
break
}
}
return treeNodes, nil
}
func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepath string) ([]*data.NodeVersion, error) {
return c.getVersions(ctx, bktInfo, versionTree, filepath, false)
}
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
path := pathFromName(objectName)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: versionTree,
Path: path,
Meta: meta,
LatestOnly: false,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
latestNode, err := getLatestNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(objectName, latestNode)
}
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
var (
maxCreationTime uint64
targetIndexNode = -1
)
for i, node := range nodes {
currentCreationTime := node.GetTimestamp()
if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime {
maxCreationTime = currentCreationTime
targetIndexNode = i
}
}
if targetIndexNode == -1 {
return nil, layer.ErrNodeNotFound
}
return nodes[targetIndexNode], nil
}
func checkExistOID(meta []Meta) bool {
for _, kv := range meta {
if kv.GetKey() == "OID" {
return true
}
}
return false
}
// pathFromName splits name by '/'.
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
func (c *Tree) GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
return c.getVersionsByPrefix(ctx, bktInfo, prefix, true)
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (uint64, string, error) {
var rootID uint64
path := strings.Split(prefix, separator)
tailPrefix := path[len(path)-1]
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
if err != nil {
return 0, "", err
}
}
return rootID, tailPrefix, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) (uint64, error) {
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return 0, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID())
}
}
if len(intermediateNodes) == 0 {
return 0, layer.ErrNodeNotFound
}
if len(intermediateNodes) > 1 {
return 0, fmt.Errorf("found more than one intermediate nodes")
}
return intermediateNodes[0], nil
}
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
nodesMap := make(map[string][]NodeResponse, len(subTree))
for _, node := range subTree {
if node.GetNodeID() == rootID {
continue
}
fileName := getFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes (actually should be exactly one intermediate node with the same name)
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]NodeResponse{node}, nodes...)
} else if node.GetTimestamp() > nodes[0].GetTimestamp() {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
}
func getFilename(node NodeResponse) string {
for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey {
return string(kv.GetValue())
}
}
return ""
}
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func (c *Tree) getSubTreeVersions(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, parentFilePath string, latestOnly bool) ([]*data.NodeVersion, error) {
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, nodeID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
var parentPrefix string
if parentFilePath != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
}
var emptyOID oid.ID
var filepath string
namesMap := make(map[uint64]string, len(subTree))
versions := make(map[string][]*data.NodeVersion, len(subTree))
for i, node := range subTree {
treeNode, fileName, err := parseTreeNode(node)
if err != nil {
continue
}
if i != 0 {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
namesMap[treeNode.ID] = filepath
}
if treeNode.ObjID.Equals(emptyOID) { // The node can be intermediate but we still want to update namesMap
continue
}
key := formLatestNodeKey(node.GetParentID(), fileName)
versionNodes, ok := versions[key]
if !ok {
versionNodes = []*data.NodeVersion{newNodeVersionFromTreeNode(filepath, treeNode)}
} else if !latestOnly {
versionNodes = append(versionNodes, newNodeVersionFromTreeNode(filepath, treeNode))
} else if versionNodes[0].Timestamp <= treeNode.TimeStamp {
versionNodes[0] = newNodeVersionFromTreeNode(filepath, treeNode)
}
versions[key] = versionNodes
}
result := make([]*data.NodeVersion, 0, len(versions)) // consider use len(subTree)
for _, version := range versions {
if latestOnly && version[0].IsDeleteMarker() {
continue
}
result = append(result, version...)
}
return result, nil
}
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
parentPath, ok := namesMap[node.GetParentID()]
if !ok {
return "", fmt.Errorf("couldn't get parent path")
}
filepath := parentPath + separator + fileName
namesMap[node.GetNodeID()] = filepath
return filepath, nil
}
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
treeNode, err := newTreeNode(node)
if err != nil { // invalid OID attribute
return nil, "", err
}
fileName, ok := treeNode.FileName()
if !ok {
return nil, "", fmt.Errorf("doesn't contain FileName")
}
return treeNode, fileName, nil
}
func formLatestNodeKey(parentID uint64, fileName string) string {
return strconv.FormatUint(parentID, 10) + "." + fileName
}
func (c *Tree) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
return c.getVersionsByPrefix(ctx, bktInfo, prefix, false)
}
func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) {
prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly)
if err != nil {
return nil, err
}
var result []*data.NodeVersion
for _, node := range prefixNodes {
versions, err := c.getSubTreeVersions(ctx, bktInfo, node.GetNodeID(), headPrefix, latestOnly)
if err != nil {
return nil, err
}
result = append(result, versions...)
}
return result, nil
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound
}
if len(nodes) > 1 {
c.reqLogger(ctx).Debug(logs.FoundMoreThanOneUnversionedNode,
zap.String("treeID", treeID), zap.String("filepath", filepath))
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Timestamp > nodes[j].Timestamp
})
return nodes[0], nil
}
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
return c.addVersion(ctx, bktInfo, versionTree, version)
}
func (c *Tree) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, id uint64) error {
return c.service.RemoveNode(ctx, bktInfo, versionTree, id)
}
func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
path := pathFromName(info.Key)
meta := metaFromMultipart(info, path[len(path)-1])
_, err := c.service.AddNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta)
return err
}
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
if err != nil {
return nil, err
}
var result []*data.MultipartInfo
for _, node := range subTreeNodes {
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
if err != nil {
return nil, err
}
result = append(result, multipartUploads...)
}
return result, nil
}
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
var parentPrefix string
if parentFilePath != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
}
var filepath string
namesMap := make(map[uint64]string, len(subTree))
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
for i, node := range subTree {
treeNode, fileName, err := parseTreeNode(node)
if err != nil {
continue
}
if i != 0 {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
namesMap[treeNode.ID] = filepath
}
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
if err != nil || multipartInfo.Finished {
continue
}
key := formLatestNodeKey(node.GetParentID(), fileName)
multipartInfos, ok := multiparts[key]
if !ok {
multipartInfos = []*data.MultipartInfo{multipartInfo}
} else {
multipartInfos = append(multipartInfos, multipartInfo)
}
multiparts[key] = multipartInfos
}
result := make([]*data.MultipartInfo, 0, len(multiparts))
for _, multipartInfo := range multiparts {
result = append(result, multipartInfo...)
}
return result, nil
}
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
path := pathFromName(objectName)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: systemTree,
Path: path,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
for _, node := range nodes {
info, err := newMultipartInfo(node)
if err != nil {
continue
}
if info.UploadID == uploadID {
if info.Finished {
break
}
return info, nil
}
}
return nil, layer.ErrNodeNotFound
}
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, multipartNodeID, 2)
if err != nil {
return oid.ID{}, err
}
meta := map[string]string{
partNumberKV: strconv.Itoa(info.Number),
oidKV: info.OID.EncodeToString(),
sizeKV: strconv.FormatUint(info.Size, 10),
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
etagKV: info.ETag,
md5KV: info.MD5,
}
for _, part := range parts {
if part.GetNodeID() == multipartNodeID {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
continue
}
if partInfo.Number == info.Number {
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, part.GetNodeID(), multipartNodeID, meta)
}
}
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return oid.ID{}, err
}
return oid.ID{}, layer.ErrNoNodeToRemove
}
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, multipartNodeID, 2)
if err != nil {
return nil, err
}
result := make([]*data.PartInfo, 0, len(parts))
for _, part := range parts {
if part.GetNodeID() == multipartNodeID {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
continue
}
result = append(result, partInfo)
}
return result, nil
}
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
}
multipartInfo.Finished = true
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
}
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
meta := map[string]string{isLockKV: "true"}
if lock.IsLegalHoldSet() {
meta[legalHoldOIDKV] = lock.LegalHold().EncodeToString()
}
if lock.IsRetentionSet() {
meta[retentionOIDKV] = lock.Retention().EncodeToString()
meta[untilDateKV] = lock.UntilDate()
if lock.IsCompliance() {
meta[isComplianceKV] = "true"
}
}
if lock.ID() == 0 {
_, err := c.service.AddNode(ctx, bktInfo, versionTree, nodeID, meta)
return err
}
return c.service.MoveNode(ctx, bktInfo, versionTree, lock.ID(), nodeID, meta)
}
func (c *Tree) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) {
lockNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isLockKV)
if err != nil {
return nil, err
}
return getLock(lockNode)
}
func getLock(lockNode *treeNode) (*data.LockInfo, error) {
if lockNode == nil {
return &data.LockInfo{}, nil
}
lockInfo := data.NewLockInfo(lockNode.ID)
if legalHold, ok := lockNode.Get(legalHoldOIDKV); ok {
var legalHoldOID oid.ID
if err := legalHoldOID.DecodeString(legalHold); err != nil {
return nil, fmt.Errorf("invalid legal hold object id: %w", err)
}
lockInfo.SetLegalHold(legalHoldOID)
}
if retention, ok := lockNode.Get(retentionOIDKV); ok {
var retentionOID oid.ID
if err := retentionOID.DecodeString(retention); err != nil {
return nil, fmt.Errorf("invalid retention object id: %w", err)
}
_, isCompliance := lockNode.Get(isComplianceKV)
untilDate, _ := lockNode.Get(untilDateKV)
lockInfo.SetRetention(retentionOID, untilDate, isCompliance)
}
return lockInfo, nil
}
func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
nodes, err := c.getTreeNodes(ctx, bktInfo, objVersion.ID, isTagKV, isLockKV)
if err != nil {
return nil, nil, err
}
lockInfo, err := getLock(nodes[isLockKV])
if err != nil {
return nil, nil, err
}
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
}
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
path := pathFromName(version.FilePath)
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
FileNameKey: path[len(path)-1],
}
if version.Size > 0 {
meta[sizeKV] = strconv.FormatUint(version.Size, 10)
}
if len(version.ETag) > 0 {
meta[etagKV] = version.ETag
}
if len(version.MD5) > 0 {
meta[md5KV] = version.MD5
}
if version.IsDeleteMarker() {
meta[isDeleteMarkerKV] = "true"
meta[ownerKV] = version.DeleteMarker.Owner.EncodeToString()
meta[createdKV] = strconv.FormatInt(version.DeleteMarker.Created.UTC().UnixMilli(), 10)
}
if version.IsCombined {
meta[isCombinedKV] = "true"
}
if version.IsUnversioned {
meta[isUnversionedKV] = "true"
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
if err == nil {
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil {
return 0, err
}
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
}
if !errors.Is(err, layer.ErrNodeNotFound) {
return 0, err
}
}
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
}
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
if err != nil {
return err
}
if taggingNode != nil {
return c.service.RemoveNode(ctx, bktInfo, treeID, taggingNode.ID)
}
return nil
}
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
path := pathFromName(filepath)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: treeID,
Path: path,
Meta: keysToReturn,
LatestOnly: false,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
result := make([]*data.NodeVersion, 0, len(nodes))
for _, node := range nodes {
nodeVersion, err := newNodeVersion(filepath, node)
if err != nil {
return nil, err
}
if onlyUnversioned && !nodeVersion.IsUnversioned {
continue
}
result = append(result, nodeVersion)
}
return result, nil
}
func metaFromSettings(settings *data.BucketSettings) map[string]string {
results := make(map[string]string, 3)
results[FileNameKey] = settingsFileName
results[versioningKV] = settings.Versioning
results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
return results
}
func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]string {
info.Meta[FileNameKey] = fileName
info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
return info.Meta
}
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path, meta []string) (*treeNode, error) {
return c.getNode(ctx, bktInfo, systemTree, path, meta, false)
}
func (c *Tree) getSystemNodeWithAllAttributes(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) {
return c.getNode(ctx, bktInfo, systemTree, path, []string{}, true)
}
func (c *Tree) getNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path, meta []string, allAttrs bool) (*treeNode, error) {
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: treeID,
Path: path,
Meta: meta,
LatestOnly: false,
AllAttrs: allAttrs,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound
}
if len(nodes) != 1 {
return nil, fmt.Errorf("found more than one node")
}
return newTreeNode(nodes[0])
}
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
reqLogger := middleware.GetReqLog(ctx)
if reqLogger != nil {
return reqLogger
}
return c.log
}
func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) {
result := &data.ObjectLockConfiguration{}
if len(value) == 0 {
return result, nil
}
lockValues := strings.Split(value, ",")
result.ObjectLockEnabled = lockValues[0]
if len(lockValues) == 1 {
return result, nil
}
if len(lockValues) != 4 {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
var err error
var days, years int64
if len(lockValues[1]) > 0 {
if days, err = strconv.ParseInt(lockValues[1], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
if len(lockValues[3]) > 0 {
if years, err = strconv.ParseInt(lockValues[3], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
result.Rule = &data.ObjectLockRule{
DefaultRetention: &data.DefaultRetention{
Days: days,
Mode: lockValues[2],
Years: years,
},
}
return result, nil
}
func encodeLockConfiguration(conf *data.ObjectLockConfiguration) string {
if conf == nil {
return ""
}
if conf.Rule == nil || conf.Rule.DefaultRetention == nil {
return conf.ObjectLockEnabled
}
defaults := conf.Rule.DefaultRetention
return fmt.Sprintf("%s,%d,%s,%d", conf.ObjectLockEnabled, defaults.Days, defaults.Mode, defaults.Years)
}