frostfs-s3-gw/pkg/service/tree/tree.go
Denis Kirillov 056f168d77 [#448] multipart: Support removing duplicated parts
Previously after tree split we can have duplicated parts
(several objects and tree node referred to the same part number).
Some of them couldn't be deleted after abort or compete action.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-09-03 13:20:38 +00:00

1918 lines
52 KiB
Go

package tree
import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error)
GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error)
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error
RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error
}
SubTreeStream interface {
Next() (NodeResponse, error)
}
treeNode struct {
ID []uint64
ParentID []uint64
ObjID oid.ID
TimeStamp []uint64
Size uint64
Meta map[string]string
}
multiSystemNode struct {
// the first element is latest
nodes []*treeNode
}
GetNodesParams struct {
BktInfo *data.BucketInfo
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
}
)
const (
FileNameKey = "FileName"
)
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = layer.ErrNodeNotFound
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = layer.ErrNodeAccessDenied
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = layer.ErrGatewayTimeout
errNodeDoesntContainFileName = fmt.Errorf("node doesn't contain FileName")
)
const (
versioningKV = "Versioning"
cannedACLKV = "cannedACL"
ownerKeyKV = "ownerKey"
lockConfigurationKV = "LockConfiguration"
oidKV = "OID"
cidKV = "CID"
isCombinedKV = "IsCombined"
isUnversionedKV = "IsUnversioned"
isTagKV = "IsTag"
uploadIDKV = "UploadId"
partNumberKV = "Number"
sizeKV = "Size"
etagKV = "ETag"
md5KV = "MD5"
finishedKV = "Finished"
creationEpochKV = "CreationEpoch"
// keys for lock.
isLockKV = "IsLock"
legalHoldOIDKV = "LegalHoldOID"
retentionOIDKV = "RetentionOID"
untilDateKV = "UntilDate"
isComplianceKV = "IsCompliance"
// keys for delete marker nodes.
isDeleteMarkerKV = "IsDeleteMarker"
ownerKV = "Owner"
createdKV = "Created"
settingsFileName = "bucket-settings"
corsFilename = "bucket-cors"
bucketTaggingFilename = "bucket-tagging"
bucketLifecycleFilename = "bucket-lifecycle"
// versionTree -- ID of a tree with object versions.
versionTree = "version"
// systemTree -- ID of a tree with system objects
// i.e. bucket settings with versioning and lock configuration, cors.
systemTree = "system"
separator = "/"
userDefinedTagPrefix = "User-Tag-"
maxGetSubTreeDepth = 0 // means all subTree
)
// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{
service: service,
log: log,
}
}
type Meta interface {
GetKey() string
GetValue() []byte
}
type NodeResponse interface {
GetMeta() []Meta
GetNodeID() []uint64
GetParentID() []uint64
GetTimestamp() []uint64
}
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
if err := validateNodeResponse(nodeInfo); err != nil {
return nil, err
}
tNode := &treeNode{
ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(),
TimeStamp: nodeInfo.GetTimestamp(),
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
case sizeKV:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if tNode.Size, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err)
}
}
default:
tNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return tNode, nil
}
func (n *treeNode) Get(key string) (string, bool) {
value, ok := n.Meta[key]
return value, ok
}
func (n *treeNode) FileName() (string, bool) {
value, ok := n.Meta[FileNameKey]
return value, ok
}
func (n *treeNode) IsSplit() bool {
return len(n.ID) != 1 || len(n.ParentID) != 1 || len(n.TimeStamp) != 1
}
func (n *treeNode) GetLatestNodeIndex() int {
var (
maxTimestamp uint64
index int
)
for i, timestamp := range n.TimeStamp {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
index = i
}
}
return index
}
func newNodeVersion(log *zap.Logger, filePath string, node NodeResponse) (*data.NodeVersion, error) {
tNode, err := newTreeNode(node)
if err != nil {
return nil, fmt.Errorf("invalid tree node: %w", err)
}
return newNodeVersionFromTreeNode(log, filePath, tNode)
}
func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.NodeVersion, error) {
_, isUnversioned := treeNode.Get(isUnversionedKV)
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isCombined := treeNode.Get(isCombinedKV)
eTag, _ := treeNode.Get(etagKV)
md5, _ := treeNode.Get(md5KV)
if treeNode.IsSplit() {
return nil, errors.New("invalid version tree node: this is split node")
}
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
ID: treeNode.ID[0],
ParenID: treeNode.ParentID[0],
OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp[0],
ETag: eTag,
MD5: md5,
Size: treeNode.Size,
FilePath: filePath,
IsDeleteMarker: isDeleteMarker,
},
IsUnversioned: isUnversioned,
IsCombined: isCombined,
}
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err))
} else {
created := time.UnixMilli(utcMilli)
version.Created = &created
}
}
if ownerStr, ok := treeNode.Get(ownerKV); ok {
var owner user.ID
if err := owner.DecodeString(ownerStr); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err))
} else {
version.Owner = &owner
}
}
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
} else {
version.CreationEpoch = epoch
}
}
return version, nil
}
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
var (
err error
index int
maxTimestamp uint64
)
if len(nodes) == 0 {
return nil, errors.New("multi node must have at least one node")
}
treeNodes := make([]*treeNode, len(nodes))
for i, node := range nodes {
if treeNodes[i], err = newTreeNode(node); err != nil {
return nil, fmt.Errorf("parse system node response: %w", err)
}
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
return &multiSystemNode{
nodes: treeNodes,
}, nil
}
func (m *multiSystemNode) Latest() *treeNode {
return m.nodes[0]
}
func (m *multiSystemNode) Old() []*treeNode {
return m.nodes[1:]
}
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" {
return nil, fmt.Errorf("it's not a multipart node: missing UploadId")
}
if treeNode.IsSplit() {
return nil, fmt.Errorf("invalid multipart node '%s': tree node is split", filePath)
}
multipartInfo := &data.MultipartInfo{
ID: treeNode.ID[0],
Key: filePath,
UploadID: uploadID,
Meta: treeNode.Meta,
}
if ownerID, ok := treeNode.Get(ownerKV); ok {
if err := multipartInfo.Owner.DecodeString(ownerID); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err))
}
}
if created, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
}
if finished, ok := treeNode.Get(finishedKV); ok {
if flag, err := strconv.ParseBool(finished); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err))
} else {
multipartInfo.Finished = flag
}
}
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
} else {
multipartInfo.CreationEpoch = epoch
}
}
return multipartInfo, nil
}
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid multipart node: this is split node")
}
multipartInfo := &data.MultipartInfo{
ID: node.GetNodeID()[0],
Meta: make(map[string]string, len(node.GetMeta())),
}
for _, kv := range node.GetMeta() {
switch kv.GetKey() {
case uploadIDKV:
multipartInfo.UploadID = string(kv.GetValue())
case FileNameKey:
multipartInfo.Key = string(kv.GetValue())
case createdKV:
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
case ownerKV:
if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err))
}
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.Finished = isFinished
}
case creationEpochKV:
if epoch, err := strconv.ParseUint(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.CreationEpoch = epoch
}
default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
if multipartInfo.UploadID == "" {
return nil, fmt.Errorf("it's not a multipart node")
}
return multipartInfo, nil
}
func validateNodeResponse(node NodeResponse) error {
ids := node.GetNodeID()
parentIDs := node.GetParentID()
timestamps := node.GetTimestamp()
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
return errors.New("invalid node response: missing ids")
}
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
return errors.New("invalid node response: multiple ids length mismatch")
}
return nil
}
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid part node: this is split node")
}
partInfo := &data.PartInfoExtended{
Timestamp: node.GetTimestamp()[0],
}
var err error
for _, kv := range node.GetMeta() {
value := string(kv.GetValue())
switch kv.GetKey() {
case partNumberKV:
if partInfo.Number, err = strconv.Atoi(value); err != nil {
return nil, fmt.Errorf("invalid part number: %w", err)
}
case oidKV:
if err = partInfo.OID.DecodeString(value); err != nil {
return nil, fmt.Errorf("invalid oid: %w", err)
}
case etagKV:
partInfo.ETag = value
case sizeKV:
if partInfo.Size, err = strconv.ParseUint(value, 10, 64); err != nil {
return nil, fmt.Errorf("invalid part size: %w", err)
}
case createdKV:
var utcMilli int64
if utcMilli, err = strconv.ParseInt(value, 10, 64); err != nil {
return nil, fmt.Errorf("invalid created timestamp: %w", err)
}
partInfo.Created = time.UnixMilli(utcMilli)
case md5KV:
partInfo.MD5 = value
}
}
if partInfo.Number <= 0 {
return nil, fmt.Errorf("it's not a part node")
}
return partInfo, nil
}
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
node := multiNode.Latest()
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
if versioningValue, ok := node.Get(versioningKV); ok {
settings.Versioning = versioningValue
}
if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
if settings.LockConfiguration, err = parseLockConfiguration(lockConfigurationValue); err != nil {
return nil, fmt.Errorf("settings node: invalid lock configuration: %w", err)
}
}
settings.CannedACL, _ = node.Get(cannedACLKV)
if ownerKeyHex, ok := node.Get(ownerKeyKV); ok {
if settings.OwnerKey, err = keys.NewPublicKeyFromString(ownerKeyHex); err != nil {
c.reqLogger(ctx).Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err))
}
}
return settings, nil
}
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
}
meta := metaFromSettings(settings)
if isErrNotFound {
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta)
return err
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return fmt.Errorf("move settings node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
}
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil {
return oid.Address{}, err
}
return getTreeNodeAddress(node.Latest())
}
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = corsFilename
meta[oidKV] = addr.Object().EncodeToString()
meta[cidKV] = addr.Container().EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return nil, err
}
return nil, layer.ErrNoNodeToRemove
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return nil, fmt.Errorf("move cors node: %w", err)
}
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
objToDelete[0], err = getTreeNodeAddress(latest)
if err != nil {
return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err)
}
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
return objToDelete, nil
}
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, err
}
if isErrNotFound {
return nil, layer.ErrNoNodeToRemove
}
objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if len(objToDelete) != len(multiNode.nodes) {
return nil, fmt.Errorf("failed to clean all old cors nodes")
}
return objToDelete, nil
}
func getTreeNodeAddress(node *treeNode) (oid.Address, error) {
var addr oid.Address
addr.SetObject(node.ObjID)
if cidStr, ok := node.Get(cidKV); ok {
var cnrID cid.ID
if err := cnrID.DecodeString(cidStr); err != nil {
return oid.Address{}, fmt.Errorf("couldn't decode cid: %w", err)
}
addr.SetContainer(cnrID)
}
return addr, nil
}
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address {
res := make([]oid.Address, 0, len(nodes))
for _, node := range nodes {
ind := node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
}
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
} else {
addr, err := getTreeNodeAddress(node)
if err != nil {
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
continue
}
res = append(res, addr)
}
}
return res
}
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return nil, err
}
return getObjectTagging(tagNode), nil
}
func getObjectTagging(tagNode *treeNode) map[string]string {
if tagNode == nil {
return nil
}
meta := make(map[string]string)
for key, val := range tagNode.Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) {
meta[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
}
}
return meta
}
func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return err
}
treeTagSet := make(map[string]string)
treeTagSet[isTagKV] = "true"
for key, val := range tagSet {
treeTagSet[userDefinedTagPrefix+key] = val
}
if tagNode == nil {
_, err = c.service.AddNode(ctx, bktInfo, versionTree, objVersion.ID, treeTagSet)
return err
}
ind := tagNode.GetLatestNodeIndex()
if tagNode.IsSplit() {
c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
}
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
}
func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) error {
return c.PutObjectTagging(ctx, bktInfo, objVersion, nil)
}
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
if err != nil {
return nil, err
}
tags := make(map[string]string)
for key, val := range multiNode.Latest().Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) {
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
}
}
return tags, nil
}
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
}
treeTagSet := make(map[string]string)
treeTagSet[FileNameKey] = bucketTaggingFilename
for key, val := range tagSet {
treeTagSet[userDefinedTagPrefix+key] = val
}
if isErrNotFound {
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, treeTagSet)
return err
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
return fmt.Errorf("move bucket tagging node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
}
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
return c.PutBucketTagging(ctx, bktInfo, nil)
}
func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, key string) (*treeNode, error) {
nodes, err := c.getTreeNodes(ctx, bktInfo, nodeID, key)
if err != nil {
return nil, err
}
// if there will be many allocations, consider having separate
// implementations of 'getTreeNode' and 'getTreeNodes'
return nodes[key], nil
}
func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) {
subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2, false)
if err != nil {
return nil, err
}
// consider using map[string][]*treeNode
// to be able to remove unused node, that can be added during split
treeNodes := make(map[string]*treeNode, len(keys))
for _, s := range subtree {
node, err := newTreeNode(s)
if err != nil {
return nil, err
}
for _, key := range keys {
if _, ok := node.Get(key); ok {
treeNodes[key] = node
break
}
}
if len(treeNodes) == len(keys) {
break
}
}
return treeNodes, nil
}
func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepath string) ([]*data.NodeVersion, error) {
return c.getVersions(ctx, bktInfo, versionTree, filepath, false)
}
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
path := pathFromName(objectName)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: versionTree,
Path: path,
Meta: meta,
LatestOnly: false,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
latestNode, err := getLatestVersionNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(c.reqLogger(ctx), objectName, latestNode)
}
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
var (
maxCreationTime uint64
targetIndexNode = -1
)
for i, node := range nodes {
if !checkExistOID(node.GetMeta()) {
continue
}
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
targetIndexNode = i
maxCreationTime = currentCreationTime
}
}
if targetIndexNode == -1 {
return nil, layer.ErrNodeNotFound
}
return nodes[targetIndexNode], nil
}
func getMaxTimestamp(node NodeResponse) uint64 {
var maxTimestamp uint64
for _, timestamp := range node.GetTimestamp() {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
return maxTimestamp
}
func checkExistOID(meta []Meta) bool {
for _, kv := range meta {
if kv.GetKey() == "OID" {
return true
}
}
return false
}
// pathFromName splits name by '/'.
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
type DummySubTreeStream struct {
data NodeResponse
read bool
}
func (s *DummySubTreeStream) Next() (NodeResponse, error) {
if s.read {
return nil, io.EOF
}
s.read = true
return s.data, nil
}
type MultiID []uint64
func (m MultiID) Equal(id MultiID) bool {
seen := make(map[uint64]struct{}, len(m))
for i := range m {
seen[m[i]] = struct{}{}
}
for i := range id {
if _, ok := seen[id[i]]; !ok {
return false
}
}
return true
}
type VersionsByPrefixStreamImpl struct {
ctx context.Context
rootID MultiID
intermediateRootID MultiID
service ServiceClient
bktInfo *data.BucketInfo
mainStream SubTreeStream
innerStream SubTreeStream
headPrefix string
tailPrefix string
namesMap map[uint64]string
ended bool
latestOnly bool
currentLatest *data.NodeVersion
log *zap.Logger
}
func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) {
if s.ended {
return nil, io.EOF
}
for {
if s.innerStream == nil {
node, err := s.getNodeFromMainStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.ended = true
if s.currentLatest != nil {
return s.currentLatest, nil
}
}
return nil, fmt.Errorf("get node from main stream: %w", err)
}
if err = s.initInnerStream(node); err != nil {
return nil, fmt.Errorf("init inner stream: %w", err)
}
}
nodeVersion, err := s.getNodeVersionFromInnerStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.innerStream = nil
maps.Clear(s.namesMap)
if s.currentLatest != nil && !s.intermediateRootID.Equal([]uint64{s.currentLatest.ID}) {
return s.currentLatest, nil
}
continue
}
return nil, fmt.Errorf("inner stream: %w", err)
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
for {
node, err := s.mainStream.Next()
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, io.EOF
}
return nil, fmt.Errorf("main stream next: %w", err)
}
if !s.rootID.Equal(node.GetNodeID()) && strings.HasPrefix(getFilename(node), s.tailPrefix) {
return node, nil
}
}
}
func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
if s.rootID.Equal(node.GetParentID()) {
s.intermediateRootID = node.GetNodeID()
}
if isIntermediate(node) {
s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
if err != nil {
return fmt.Errorf("get sub tree node from main stream: %w", err)
}
} else {
s.innerStream = &DummySubTreeStream{data: node}
}
return nil
}
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
for {
node, err := s.innerStream.Next()
if err != nil {
return nil, fmt.Errorf("inner stream: %w", err)
}
nodeVersion, skip, err := s.parseNodeResponse(node)
if err != nil {
return nil, err
}
if skip {
continue
}
if s.latestOnly {
if s.currentLatest == nil {
s.currentLatest = nodeVersion
continue
}
if s.currentLatest.FilePath != nodeVersion.FilePath {
res := s.currentLatest
s.currentLatest = nodeVersion
return res, nil
}
if s.currentLatest.Timestamp < nodeVersion.Timestamp {
s.currentLatest = nodeVersion
}
continue
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
trNode, fileName, err := parseTreeNode(node)
if err != nil {
if !errors.Is(err, errNodeDoesntContainFileName) {
s.log.Debug(logs.ParseTreeNode, zap.Error(err))
}
return nil, true, nil
}
var parentPrefix string
if s.headPrefix != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar'
}
var filepath string
if !s.intermediateRootID.Equal(trNode.ID) {
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
for _, id := range trNode.ID {
s.namesMap[id] = filepath
}
}
if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap
return nil, true, nil
}
nodeVersion, err := newNodeVersionFromTreeNode(s.log, filepath, trNode)
return nodeVersion, false, err
}
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, io.EOF) {
return &VersionsByPrefixStreamImpl{ended: true}, nil
}
return nil, err
}
return &VersionsByPrefixStreamImpl{
ctx: ctx,
namesMap: map[uint64]string{},
rootID: rootID,
service: c.service,
bktInfo: bktInfo,
mainStream: mainStream,
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
tailPrefix: tailPrefix,
latestOnly: latestOnly,
log: c.reqLogger(ctx),
}, nil
}
func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, []uint64, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil, io.EOF
}
return nil, "", nil, err
}
subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil, io.EOF
}
return nil, "", nil, err
}
return subTree, tailPrefix, rootID, nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
rootID := []uint64{0}
path := strings.Split(prefix, separator)
tailPrefix := path[len(path)-1]
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
if err != nil {
return nil, "", err
}
}
return rootID, tailPrefix, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
}
}
if len(intermediateNodes) == 0 {
return nil, layer.ErrNodeNotFound
}
return intermediateNodes, nil
}
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
nodesMap := make(map[string][]NodeResponse, len(subTree))
for _, node := range subTree {
if MultiID(rootID).Equal(node.GetNodeID()) {
continue
}
fileName := getFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]NodeResponse{node}, nodes...)
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
}
func getFilename(node NodeResponse) string {
for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey {
return string(kv.GetValue())
}
}
return ""
}
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
var filepath string
for i, id := range node.GetParentID() {
parentPath, ok := namesMap[id]
if !ok {
return "", fmt.Errorf("couldn't get parent path")
}
filepath = parentPath + separator + fileName
namesMap[node.GetNodeID()[i]] = filepath
}
return filepath, nil
}
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
tNode, err := newTreeNode(node)
if err != nil { // invalid OID attribute
return nil, "", err
}
fileName, ok := tNode.FileName()
if !ok {
return nil, "", errNodeDoesntContainFileName
}
return tNode, fileName, nil
}
func formLatestNodeKey(parentID uint64, fileName string) string {
return strconv.FormatUint(parentID, 10) + "." + fileName
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound
}
if len(nodes) > 1 {
c.reqLogger(ctx).Debug(logs.FoundMoreThanOneUnversionedNode,
zap.String("treeID", treeID), zap.String("filepath", filepath))
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Timestamp > nodes[j].Timestamp
})
return nodes[0], nil
}
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
return c.addVersion(ctx, bktInfo, versionTree, version)
}
func (c *Tree) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, id uint64) error {
return c.service.RemoveNode(ctx, bktInfo, versionTree, id)
}
func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
path := pathFromName(info.Key)
meta := metaFromMultipart(info, path[len(path)-1])
_, err := c.service.AddNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta)
return err
}
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
if err != nil {
return nil, err
}
var result []*data.MultipartInfo
for _, node := range subTreeNodes {
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
if err != nil {
return nil, err
}
result = append(result, multipartUploads...)
}
return result, nil
}
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
// sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute
// so when we are only interested in multipart nodes, we can set this flag
// (despite we sort multiparts in above layer anyway)
// to skip its children (parts) that don't have FileName
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true)
if err != nil {
return nil, err
}
var parentPrefix string
if parentFilePath != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
}
var filepath string
namesMap := make(map[uint64]string, len(subTree))
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
for i, node := range subTree {
tNode, fileName, err := parseTreeNode(node)
if err != nil {
continue
}
if i != 0 {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
for _, id := range tNode.ID {
namesMap[id] = filepath
}
}
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode)
if err != nil || multipartInfo.Finished {
continue
}
for _, id := range node.GetParentID() {
key := formLatestNodeKey(id, fileName)
multipartInfos, ok := multiparts[key]
if !ok {
multipartInfos = []*data.MultipartInfo{multipartInfo}
} else {
multipartInfos = append(multipartInfos, multipartInfo)
}
multiparts[key] = multipartInfos
}
}
result := make([]*data.MultipartInfo, 0, len(multiparts))
for _, multipartInfo := range multiparts {
result = append(result, multipartInfo...)
}
return result, nil
}
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
path := pathFromName(objectName)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: systemTree,
Path: path,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
log := c.reqLogger(ctx)
for _, node := range nodes {
info, err := newMultipartInfo(log, node)
if err != nil {
continue
}
if info.UploadID == uploadID {
if info.Finished {
break
}
return info, nil
}
}
return nil, layer.ErrNodeNotFound
}
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
}
meta := map[string]string{
partNumberKV: strconv.Itoa(info.Number),
oidKV: info.OID.EncodeToString(),
sizeKV: strconv.FormatUint(info.Size, 10),
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
etagKV: info.ETag,
md5KV: info.MD5,
}
objToDelete := make([]oid.ID, 0, 1)
partsToDelete := make([]uint64, 0, 1)
var (
latestPartID uint64
maxTimestamp uint64
)
multiNodeID := MultiID{multipartNodeID}
for _, part := range parts {
if multiNodeID.Equal(part.GetNodeID()) {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("id", part.GetNodeID()),
zap.Error(err))
continue
}
if partInfo.Number == info.Number {
nodeID := part.GetNodeID()[0]
objToDelete = append(objToDelete, partInfo.OID)
partsToDelete = append(partsToDelete, nodeID)
timestamp := partInfo.Timestamp
if timestamp > maxTimestamp {
maxTimestamp = timestamp
latestPartID = nodeID
}
}
}
if len(objToDelete) != 0 {
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
return nil, fmt.Errorf("move part node: %w", err)
}
for _, nodeID := range partsToDelete {
if nodeID == latestPartID {
continue
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("id", nodeID))
}
}
return objToDelete, nil
}
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return nil, err
}
return nil, layer.ErrNoNodeToRemove
}
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
}
result := make([]*data.PartInfoExtended, 0, len(parts))
for _, part := range parts {
if len(part.GetNodeID()) != 1 {
// multipart parts nodeID shouldn't have multiple values
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()))
continue
}
if part.GetNodeID()[0] == multipartNodeID {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()),
zap.Error(err))
continue
}
result = append(result, partInfo)
}
return result, nil
}
func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = bucketLifecycleFilename
meta[oidKV] = addr.Object().EncodeToString()
meta[cidKV] = addr.Container().EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return nil, err
}
return nil, layer.ErrNoNodeToRemove
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketLifecycleNodeHasMultipleIDs)
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return nil, fmt.Errorf("move lifecycle node: %w", err)
}
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
objToDelete[0], err = getTreeNodeAddress(latest)
if err != nil {
return nil, fmt.Errorf("parse object addr of latest lifecycle node in tree: %w", err)
}
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
return objToDelete, nil
}
func (c *Tree) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
if err != nil {
return oid.Address{}, fmt.Errorf("get lifecycle node: %w", err)
}
return getTreeNodeAddress(node.Latest())
}
func (c *Tree) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, err
}
if isErrNotFound {
return nil, layer.ErrNoNodeToRemove
}
objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if len(objToDelete) != len(multiNode.nodes) {
return nil, fmt.Errorf("failed to clean all old lifecycle nodes")
}
return objToDelete, nil
}
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
}
multipartInfo.Finished = true
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
}
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
meta := map[string]string{isLockKV: "true"}
if lock.IsLegalHoldSet() {
meta[legalHoldOIDKV] = lock.LegalHold().EncodeToString()
}
if lock.IsRetentionSet() {
meta[retentionOIDKV] = lock.Retention().EncodeToString()
meta[untilDateKV] = lock.UntilDate()
if lock.IsCompliance() {
meta[isComplianceKV] = "true"
}
}
if lock.ID() == 0 {
_, err := c.service.AddNode(ctx, bktInfo, versionTree, nodeID, meta)
return err
}
return c.service.MoveNode(ctx, bktInfo, versionTree, lock.ID(), nodeID, meta)
}
func (c *Tree) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) {
lockNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isLockKV)
if err != nil {
return nil, err
}
return getLock(lockNode)
}
func getLock(lockNode *treeNode) (*data.LockInfo, error) {
if lockNode == nil {
return &data.LockInfo{}, nil
}
if lockNode.IsSplit() {
return nil, errors.New("invalid lock node: this is split node")
}
lockInfo := data.NewLockInfo(lockNode.ID[0])
if legalHold, ok := lockNode.Get(legalHoldOIDKV); ok {
var legalHoldOID oid.ID
if err := legalHoldOID.DecodeString(legalHold); err != nil {
return nil, fmt.Errorf("invalid legal hold object id: %w", err)
}
lockInfo.SetLegalHold(legalHoldOID)
}
if retention, ok := lockNode.Get(retentionOIDKV); ok {
var retentionOID oid.ID
if err := retentionOID.DecodeString(retention); err != nil {
return nil, fmt.Errorf("invalid retention object id: %w", err)
}
_, isCompliance := lockNode.Get(isComplianceKV)
untilDate, _ := lockNode.Get(untilDateKV)
lockInfo.SetRetention(retentionOID, untilDate, isCompliance)
}
return lockInfo, nil
}
func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
nodes, err := c.getTreeNodes(ctx, bktInfo, objVersion.ID, isTagKV, isLockKV)
if err != nil {
return nil, nil, err
}
lockInfo, err := getLock(nodes[isLockKV])
if err != nil {
return nil, nil, err
}
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
}
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
path := pathFromName(version.FilePath)
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
FileNameKey: path[len(path)-1],
ownerKV: version.Owner.EncodeToString(),
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
creationEpochKV: strconv.FormatUint(version.CreationEpoch, 10),
}
if version.Size > 0 {
meta[sizeKV] = strconv.FormatUint(version.Size, 10)
}
if len(version.ETag) > 0 {
meta[etagKV] = version.ETag
}
if len(version.MD5) > 0 {
meta[md5KV] = version.MD5
}
if version.IsDeleteMarker {
meta[isDeleteMarkerKV] = "true"
}
if version.IsCombined {
meta[isCombinedKV] = "true"
}
if version.IsUnversioned {
meta[isUnversionedKV] = "true"
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
if err == nil {
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil {
return 0, err
}
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
}
if !errors.Is(err, layer.ErrNodeNotFound) {
return 0, err
}
}
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
}
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
if err != nil {
return err
}
if taggingNode != nil {
return c.service.RemoveNode(ctx, bktInfo, treeID, taggingNode.ID[0])
}
return nil
}
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
path := pathFromName(filepath)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: treeID,
Path: path,
Meta: keysToReturn,
LatestOnly: false,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
log := c.reqLogger(ctx)
result := make([]*data.NodeVersion, 0, len(nodes))
for _, node := range nodes {
nodeVersion, err := newNodeVersion(log, filepath, node)
if err != nil {
return nil, err
}
if onlyUnversioned && !nodeVersion.IsUnversioned {
continue
}
result = append(result, nodeVersion)
}
return result, nil
}
func metaFromSettings(settings *data.BucketSettings) map[string]string {
results := make(map[string]string, 3)
results[FileNameKey] = settingsFileName
results[versioningKV] = settings.Versioning
results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
results[cannedACLKV] = settings.CannedACL
if settings.OwnerKey != nil {
results[ownerKeyKV] = hex.EncodeToString(settings.OwnerKey.Bytes())
}
return results
}
func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]string {
info.Meta[FileNameKey] = fileName
info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
info.Meta[creationEpochKV] = strconv.FormatUint(info.CreationEpoch, 10)
return info.Meta
}
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: systemTree,
Path: []string{name},
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
nodes = filterMultipartNodes(nodes)
if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound
}
if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
}
return newMultiNode(nodes)
}
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
res := make([]NodeResponse, 0, len(nodes))
LOOP:
for _, node := range nodes {
for _, meta := range node.GetMeta() {
if meta.GetKey() == uploadIDKV {
continue LOOP
}
}
res = append(res, node)
}
return res
}
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
reqLogger := middleware.GetReqLog(ctx)
if reqLogger != nil {
return reqLogger
}
return c.log
}
func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) {
result := &data.ObjectLockConfiguration{}
if len(value) == 0 {
return result, nil
}
lockValues := strings.Split(value, ",")
result.ObjectLockEnabled = lockValues[0]
if len(lockValues) == 1 {
return result, nil
}
if len(lockValues) != 4 {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
var err error
var days, years int64
if len(lockValues[1]) > 0 {
if days, err = strconv.ParseInt(lockValues[1], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
if len(lockValues[3]) > 0 {
if years, err = strconv.ParseInt(lockValues[3], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
result.Rule = &data.ObjectLockRule{
DefaultRetention: &data.DefaultRetention{
Days: days,
Mode: lockValues[2],
Years: years,
},
}
return result, nil
}
func encodeLockConfiguration(conf *data.ObjectLockConfiguration) string {
if conf == nil {
return ""
}
if conf.Rule == nil || conf.Rule.DefaultRetention == nil {
return conf.ObjectLockEnabled
}
defaults := conf.Rule.DefaultRetention
return fmt.Sprintf("%s,%d,%s,%d", conf.ObjectLockEnabled, defaults.Days, defaults.Mode, defaults.Years)
}