frostfs-s3-gw/pkg/service/tree/tree.go
Nikita Zinkevich cea8d4b324
[#469] List multipart uploads streaming
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-11-21 17:04:46 +03:00

1929 lines
51 KiB
Go

package tree
import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error)
GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error)
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error
RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error
}
SubTreeStream interface {
Next() (NodeResponse, error)
}
treeNode struct {
ID []uint64
ParentID []uint64
ObjID oid.ID
TimeStamp []uint64
Size uint64
Meta map[string]string
}
multiSystemNode struct {
// the first element is latest
nodes []*treeNode
}
GetNodesParams struct {
BktInfo *data.BucketInfo
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
}
)
const (
FileNameKey = "FileName"
)
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = tree.ErrNodeNotFound
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = tree.ErrNodeAccessDenied
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = frostfs.ErrGatewayTimeout
errNodeDoesntContainFileName = errors.New("node doesn't contain FileName")
errParentPathNotFound = errors.New("couldn't get parent path")
)
const (
versioningKV = "Versioning"
cannedACLKV = "cannedACL"
ownerKeyKV = "ownerKey"
lockConfigurationKV = "LockConfiguration"
oidKV = "OID"
cidKV = "CID"
isCombinedKV = "IsCombined"
isUnversionedKV = "IsUnversioned"
isTagKV = "IsTag"
uploadIDKV = "UploadId"
partNumberKV = "Number"
sizeKV = "Size"
etagKV = "ETag"
md5KV = "MD5"
finishedKV = "Finished"
creationEpochKV = "CreationEpoch"
// keys for lock.
isLockKV = "IsLock"
legalHoldOIDKV = "LegalHoldOID"
retentionOIDKV = "RetentionOID"
untilDateKV = "UntilDate"
isComplianceKV = "IsCompliance"
// keys for delete marker nodes.
isDeleteMarkerKV = "IsDeleteMarker"
ownerKV = "Owner"
createdKV = "Created"
settingsFileName = "bucket-settings"
corsFilename = "bucket-cors"
bucketTaggingFilename = "bucket-tagging"
bucketLifecycleFilename = "bucket-lifecycle"
// versionTree -- ID of a tree with object versions.
versionTree = "version"
// systemTree -- ID of a tree with system objects
// i.e. bucket settings with versioning and lock configuration, cors.
systemTree = "system"
separator = "/"
userDefinedTagPrefix = "User-Tag-"
maxGetSubTreeDepth = 0 // means all subTree
)
// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{
service: service,
log: log,
}
}
type Meta interface {
GetKey() string
GetValue() []byte
}
type NodeResponse interface {
GetMeta() []Meta
GetNodeID() []uint64
GetParentID() []uint64
GetTimestamp() []uint64
}
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
if err := validateNodeResponse(nodeInfo); err != nil {
return nil, err
}
tNode := &treeNode{
ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(),
TimeStamp: nodeInfo.GetTimestamp(),
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
case sizeKV:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if tNode.Size, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err)
}
}
default:
tNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return tNode, nil
}
func (n *treeNode) Get(key string) (string, bool) {
value, ok := n.Meta[key]
return value, ok
}
func (n *treeNode) FileName() (string, bool) {
value, ok := n.Meta[FileNameKey]
return value, ok
}
func (n *treeNode) IsSplit() bool {
return len(n.ID) != 1 || len(n.ParentID) != 1 || len(n.TimeStamp) != 1
}
func (n *treeNode) GetLatestNodeIndex() int {
var (
maxTimestamp uint64
index int
)
for i, timestamp := range n.TimeStamp {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
index = i
}
}
return index
}
func newNodeVersion(log *zap.Logger, filePath string, node NodeResponse) (*data.NodeVersion, error) {
tNode, err := newTreeNode(node)
if err != nil {
return nil, fmt.Errorf("invalid tree node: %w", err)
}
return newNodeVersionFromTreeNode(log, filePath, tNode)
}
func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.NodeVersion, error) {
_, isUnversioned := treeNode.Get(isUnversionedKV)
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isCombined := treeNode.Get(isCombinedKV)
eTag, _ := treeNode.Get(etagKV)
md5, _ := treeNode.Get(md5KV)
if treeNode.IsSplit() {
return nil, errors.New("invalid version tree node: this is split node")
}
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
ID: treeNode.ID[0],
ParenID: treeNode.ParentID[0],
OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp[0],
ETag: eTag,
MD5: md5,
Size: treeNode.Size,
FilePath: filePath,
IsDeleteMarker: isDeleteMarker,
},
IsUnversioned: isUnversioned,
IsCombined: isCombined,
}
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err))
} else {
created := time.UnixMilli(utcMilli)
version.Created = &created
}
}
if ownerStr, ok := treeNode.Get(ownerKV); ok {
var owner user.ID
if err := owner.DecodeString(ownerStr); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err))
} else {
version.Owner = &owner
}
}
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
} else {
version.CreationEpoch = epoch
}
}
return version, nil
}
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
var (
err error
index int
maxTimestamp uint64
)
if len(nodes) == 0 {
return nil, errors.New("multi node must have at least one node")
}
treeNodes := make([]*treeNode, len(nodes))
for i, node := range nodes {
if treeNodes[i], err = newTreeNode(node); err != nil {
return nil, fmt.Errorf("parse system node response: %w", err)
}
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
return &multiSystemNode{
nodes: treeNodes,
}, nil
}
func (m *multiSystemNode) Latest() *treeNode {
return m.nodes[0]
}
func (m *multiSystemNode) Old() []*treeNode {
return m.nodes[1:]
}
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" {
return nil, fmt.Errorf("it's not a multipart node: missing UploadId")
}
if treeNode.IsSplit() {
return nil, fmt.Errorf("invalid multipart node '%s': tree node is split", filePath)
}
multipartInfo := &data.MultipartInfo{
ID: treeNode.ID[0],
Key: filePath,
UploadID: uploadID,
Meta: treeNode.Meta,
}
if ownerID, ok := treeNode.Get(ownerKV); ok {
if err := multipartInfo.Owner.DecodeString(ownerID); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err))
}
}
if created, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
}
if finished, ok := treeNode.Get(finishedKV); ok {
if flag, err := strconv.ParseBool(finished); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err))
} else {
multipartInfo.Finished = flag
}
}
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
} else {
multipartInfo.CreationEpoch = epoch
}
}
return multipartInfo, nil
}
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid multipart node: this is split node")
}
multipartInfo := &data.MultipartInfo{
ID: node.GetNodeID()[0],
Meta: make(map[string]string, len(node.GetMeta())),
}
for _, kv := range node.GetMeta() {
switch kv.GetKey() {
case uploadIDKV:
multipartInfo.UploadID = string(kv.GetValue())
case FileNameKey:
multipartInfo.Key = string(kv.GetValue())
case createdKV:
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
case ownerKV:
if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err))
}
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.Finished = isFinished
}
case creationEpochKV:
if epoch, err := strconv.ParseUint(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.CreationEpoch = epoch
}
default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
if multipartInfo.UploadID == "" {
return nil, fmt.Errorf("it's not a multipart node")
}
return multipartInfo, nil
}
func validateNodeResponse(node NodeResponse) error {
ids := node.GetNodeID()
parentIDs := node.GetParentID()
timestamps := node.GetTimestamp()
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
return errors.New("invalid node response: missing ids")
}
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
return errors.New("invalid node response: multiple ids length mismatch")
}
return nil
}
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid part node: this is split node")
}
partInfo := &data.PartInfoExtended{
Timestamp: node.GetTimestamp()[0],
}
var err error
for _, kv := range node.GetMeta() {
value := string(kv.GetValue())
switch kv.GetKey() {
case partNumberKV:
if partInfo.Number, err = strconv.Atoi(value); err != nil {
return nil, fmt.Errorf("invalid part number: %w", err)
}
case oidKV:
if err = partInfo.OID.DecodeString(value); err != nil {
return nil, fmt.Errorf("invalid oid: %w", err)
}
case etagKV:
partInfo.ETag = value
case sizeKV:
if partInfo.Size, err = strconv.ParseUint(value, 10, 64); err != nil {
return nil, fmt.Errorf("invalid part size: %w", err)
}
case createdKV:
var utcMilli int64
if utcMilli, err = strconv.ParseInt(value, 10, 64); err != nil {
return nil, fmt.Errorf("invalid created timestamp: %w", err)
}
partInfo.Created = time.UnixMilli(utcMilli)
case md5KV:
partInfo.MD5 = value
}
}
if partInfo.Number <= 0 {
return nil, fmt.Errorf("it's not a part node")
}
return partInfo, nil
}
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
node := multiNode.Latest()
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
if versioningValue, ok := node.Get(versioningKV); ok {
settings.Versioning = versioningValue
}
if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
if settings.LockConfiguration, err = parseLockConfiguration(lockConfigurationValue); err != nil {
return nil, fmt.Errorf("settings node: invalid lock configuration: %w", err)
}
}
settings.CannedACL, _ = node.Get(cannedACLKV)
if ownerKeyHex, ok := node.Get(ownerKeyKV); ok {
if settings.OwnerKey, err = keys.NewPublicKeyFromString(ownerKeyHex); err != nil {
c.reqLogger(ctx).Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err))
}
}
return settings, nil
}
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
}
meta := metaFromSettings(settings)
if isErrNotFound {
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta)
return err
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return fmt.Errorf("move settings node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
}
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
if err != nil {
return oid.Address{}, err
}
return getTreeNodeAddress(node.Latest())
}
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = corsFilename
meta[oidKV] = addr.Object().EncodeToString()
meta[cidKV] = addr.Container().EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return nil, err
}
return nil, tree.ErrNoNodeToRemove
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return nil, fmt.Errorf("move cors node: %w", err)
}
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
objToDelete[0], err = getTreeNodeAddress(latest)
if err != nil {
return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err)
}
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
return objToDelete, nil
}
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, err
}
if isErrNotFound {
return nil, tree.ErrNoNodeToRemove
}
objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if len(objToDelete) != len(multiNode.nodes) {
return nil, fmt.Errorf("failed to clean all old cors nodes")
}
return objToDelete, nil
}
func getTreeNodeAddress(node *treeNode) (oid.Address, error) {
var addr oid.Address
addr.SetObject(node.ObjID)
if cidStr, ok := node.Get(cidKV); ok {
var cnrID cid.ID
if err := cnrID.DecodeString(cidStr); err != nil {
return oid.Address{}, fmt.Errorf("couldn't decode cid: %w", err)
}
addr.SetContainer(cnrID)
}
return addr, nil
}
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address {
res := make([]oid.Address, 0, len(nodes))
for _, node := range nodes {
ind := node.GetLatestNodeIndex()
if node.IsSplit() {
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
}
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
} else {
addr, err := getTreeNodeAddress(node)
if err != nil {
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
continue
}
res = append(res, addr)
}
}
return res
}
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return nil, err
}
return getObjectTagging(tagNode), nil
}
func getObjectTagging(tagNode *treeNode) map[string]string {
if tagNode == nil {
return nil
}
meta := make(map[string]string)
for key, val := range tagNode.Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) {
meta[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
}
}
return meta
}
func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error {
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
if err != nil {
return err
}
treeTagSet := make(map[string]string)
treeTagSet[isTagKV] = "true"
for key, val := range tagSet {
treeTagSet[userDefinedTagPrefix+key] = val
}
if tagNode == nil {
_, err = c.service.AddNode(ctx, bktInfo, versionTree, objVersion.ID, treeTagSet)
return err
}
ind := tagNode.GetLatestNodeIndex()
if tagNode.IsSplit() {
c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
}
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
}
func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) error {
return c.PutObjectTagging(ctx, bktInfo, objVersion, nil)
}
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
if err != nil {
return nil, err
}
tags := make(map[string]string)
for key, val := range multiNode.Latest().Meta {
if strings.HasPrefix(key, userDefinedTagPrefix) {
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
}
}
return tags, nil
}
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
}
treeTagSet := make(map[string]string)
treeTagSet[FileNameKey] = bucketTaggingFilename
for key, val := range tagSet {
treeTagSet[userDefinedTagPrefix+key] = val
}
if isErrNotFound {
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, treeTagSet)
return err
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
return fmt.Errorf("move bucket tagging node: %w", err)
}
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
return nil
}
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
return c.PutBucketTagging(ctx, bktInfo, nil)
}
func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, key string) (*treeNode, error) {
nodes, err := c.getTreeNodes(ctx, bktInfo, nodeID, key)
if err != nil {
return nil, err
}
// if there will be many allocations, consider having separate
// implementations of 'getTreeNode' and 'getTreeNodes'
return nodes[key], nil
}
func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) {
subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2, false)
if err != nil {
return nil, err
}
// consider using map[string][]*treeNode
// to be able to remove unused node, that can be added during split
treeNodes := make(map[string]*treeNode, len(keys))
for _, s := range subtree {
node, err := newTreeNode(s)
if err != nil {
return nil, err
}
for _, key := range keys {
if _, ok := node.Get(key); ok {
treeNodes[key] = node
break
}
}
if len(treeNodes) == len(keys) {
break
}
}
return treeNodes, nil
}
func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepath string) ([]*data.NodeVersion, error) {
return c.getVersions(ctx, bktInfo, versionTree, filepath, false)
}
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
path := pathFromName(objectName)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: versionTree,
Path: path,
Meta: meta,
LatestOnly: false,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
latestNode, err := getLatestVersionNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(c.reqLogger(ctx), objectName, latestNode)
}
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
var (
maxCreationTime uint64
targetIndexNode = -1
)
for i, node := range nodes {
if !checkExistOID(node.GetMeta()) {
continue
}
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
targetIndexNode = i
maxCreationTime = currentCreationTime
}
}
if targetIndexNode == -1 {
return nil, tree.ErrNodeNotFound
}
return nodes[targetIndexNode], nil
}
func getMaxTimestamp(node NodeResponse) uint64 {
var maxTimestamp uint64
for _, timestamp := range node.GetTimestamp() {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
return maxTimestamp
}
func checkExistOID(meta []Meta) bool {
for _, kv := range meta {
if kv.GetKey() == "OID" {
return true
}
}
return false
}
// pathFromName splits name by '/'.
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
type DummySubTreeStream struct {
data NodeResponse
read bool
}
func (s *DummySubTreeStream) Next() (NodeResponse, error) {
if s.read {
return nil, io.EOF
}
s.read = true
return s.data, nil
}
type MultiID []uint64
func (m MultiID) Equal(id MultiID) bool {
seen := make(map[uint64]struct{}, len(m))
for i := range m {
seen[m[i]] = struct{}{}
}
for i := range id {
if _, ok := seen[id[i]]; !ok {
return false
}
}
return true
}
type VersionsByPrefixStreamImpl struct {
ctx context.Context
rootID MultiID
intermediateRootID MultiID
service ServiceClient
bktInfo *data.BucketInfo
mainStream SubTreeStream
innerStream SubTreeStream
headPrefix string
tailPrefix string
namesMap map[uint64]string
ended bool
latestOnly bool
currentLatest *data.NodeVersion
log *zap.Logger
}
func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) {
if s.ended {
return nil, io.EOF
}
for {
if s.innerStream == nil {
node, err := s.getNodeFromMainStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.ended = true
if s.currentLatest != nil {
return s.currentLatest, nil
}
}
return nil, fmt.Errorf("get node from main stream: %w", err)
}
if err = s.initInnerStream(node); err != nil {
return nil, fmt.Errorf("init inner stream: %w", err)
}
}
nodeVersion, err := s.getNodeVersionFromInnerStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.innerStream = nil
maps.Clear(s.namesMap)
if s.currentLatest != nil && !s.intermediateRootID.Equal([]uint64{s.currentLatest.ID}) {
return s.currentLatest, nil
}
continue
}
return nil, fmt.Errorf("inner stream: %w", err)
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
for {
node, err := s.mainStream.Next()
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, io.EOF
}
return nil, fmt.Errorf("main stream next: %w", err)
}
if !s.rootID.Equal(node.GetNodeID()) && strings.HasPrefix(getFilename(node), s.tailPrefix) {
return node, nil
}
}
}
func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
if s.rootID.Equal(node.GetParentID()) {
s.intermediateRootID = node.GetNodeID()
}
if isIntermediate(node) {
s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
if err != nil {
return fmt.Errorf("get sub tree node from main stream: %w", err)
}
} else {
s.innerStream = &DummySubTreeStream{data: node}
}
return nil
}
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
for {
node, err := s.innerStream.Next()
if err != nil {
return nil, fmt.Errorf("inner stream: %w", err)
}
nodeVersion, skip, err := s.parseNodeResponse(node)
if err != nil {
return nil, err
}
if skip {
continue
}
if s.latestOnly {
if s.currentLatest == nil {
s.currentLatest = nodeVersion
continue
}
if s.currentLatest.FilePath != nodeVersion.FilePath {
res := s.currentLatest
s.currentLatest = nodeVersion
return res, nil
}
if s.currentLatest.Timestamp < nodeVersion.Timestamp {
s.currentLatest = nodeVersion
}
continue
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
trNode, fileName, err := parseTreeNode(node)
if err != nil {
if !errors.Is(err, errNodeDoesntContainFileName) {
s.log.Debug(logs.ParseTreeNode, zap.Error(err))
}
return nil, true, nil
}
var parentPrefix string
if s.headPrefix != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar'
}
var filepath string
if !s.intermediateRootID.Equal(trNode.ID) {
if filepath, err = formFilePath(trNode, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
for _, id := range trNode.ID {
s.namesMap[id] = filepath
}
}
if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap
return nil, true, nil
}
nodeVersion, err := newNodeVersionFromTreeNode(s.log, filepath, trNode)
return nodeVersion, false, err
}
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, io.EOF) {
return &VersionsByPrefixStreamImpl{ended: true}, nil
}
return nil, err
}
return &VersionsByPrefixStreamImpl{
ctx: ctx,
namesMap: map[uint64]string{},
rootID: rootID,
service: c.service,
bktInfo: bktInfo,
mainStream: mainStream,
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
tailPrefix: tailPrefix,
latestOnly: latestOnly,
log: c.reqLogger(ctx),
}, nil
}
func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, []uint64, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil, io.EOF
}
return nil, "", nil, err
}
subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil, io.EOF
}
return nil, "", nil, err
}
return subTree, tailPrefix, rootID, nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
rootID := []uint64{0}
path := strings.Split(prefix, separator)
tailPrefix := path[len(path)-1]
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
if err != nil {
return nil, "", err
}
}
return rootID, tailPrefix, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
}
}
if len(intermediateNodes) == 0 {
return nil, tree.ErrNodeNotFound
}
return intermediateNodes, nil
}
func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, []uint64, error) {
rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, nil, nil
}
return nil, nil, err
}
stream, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
return stream, rootID, err
}
func getFilename(node NodeResponse) string {
for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey {
return string(kv.GetValue())
}
}
return ""
}
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func formFilePath(node *treeNode, fileName string, namesMap map[uint64]string) (string, error) {
var filePath string
for i, id := range node.ParentID {
parentPath, ok := namesMap[id]
if !ok {
return "", errParentPathNotFound
}
filePath = parentPath + separator + fileName
namesMap[node.ID[i]] = filePath
}
return filePath, nil
}
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
tNode, err := newTreeNode(node)
if err != nil { // invalid OID attribute
return nil, "", err
}
fileName, ok := tNode.FileName()
if !ok {
return nil, "", errNodeDoesntContainFileName
}
return tNode, fileName, nil
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
return nil, tree.ErrNodeNotFound
}
if len(nodes) > 1 {
c.reqLogger(ctx).Debug(logs.FoundMoreThanOneUnversionedNode,
zap.String("treeID", treeID), zap.String("filepath", filepath))
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Timestamp > nodes[j].Timestamp
})
return nodes[0], nil
}
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
return c.addVersion(ctx, bktInfo, versionTree, version)
}
func (c *Tree) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, id uint64) error {
return c.service.RemoveNode(ctx, bktInfo, versionTree, id)
}
func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
path := pathFromName(info.Key)
meta := metaFromMultipart(info, path[len(path)-1])
_, err := c.service.AddNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta)
return err
}
type multipartInfoStream struct {
log *zap.Logger
nodePaths map[uint64]string
rootID MultiID
// childStream stream of children nodes of prefix node.
childStream SubTreeStream
// currentStream stream of children's nodes with max depth.
currentStream SubTreeStream
treeService ServiceClient
bktInfo *data.BucketInfo
uploadID string
keyMarker string
headPrefix string
prefix string
}
func (m *multipartInfoStream) Next(ctx context.Context) (*data.MultipartInfo, error) {
var tNode *treeNode
var filePath string
if m.currentStream == nil {
var err error
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
}
for {
var err error
tNode, err = getTreeNodeFromStream(m.currentStream)
if err != nil {
if errors.Is(err, io.EOF) {
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
continue
}
return nil, err
}
var ok bool
if filePath, ok = m.checkTreeNode(tNode); ok {
break
}
}
return newMultipartInfoFromTreeNode(m.log, filePath, tNode)
}
// openNewStream creates subtree stream from childStream`s node.
func (m *multipartInfoStream) openNewStream(ctx context.Context) (SubTreeStream, error) {
node, err := getTreeNodeFromStream(m.childStream)
if err != nil {
return nil, err
}
if m.rootID.Equal(node.ID) {
// skip root node
return m.openNewStream(ctx)
}
stream, err := m.treeService.GetSubTreeStream(ctx, m.bktInfo, systemTree, node.ID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
return stream, nil
}
func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error) {
node, err := stream.Next()
if err != nil {
return nil, err
}
tNode, err := newTreeNode(node)
if err != nil {
return nil, err
}
return tNode, nil
}
func (m *multipartInfoStream) checkTreeNode(tNode *treeNode) (string, bool) {
var ok bool
var err error
if tNode.IsSplit() {
return "", false
}
fileName, ok := tNode.FileName()
if !ok {
return "", false
}
filePath, err := formFilePath(tNode, fileName, m.nodePaths)
if err != nil {
filePath = fileName
m.nodePaths[tNode.ID[0]] = filePath
}
filePath = m.headPrefix + filePath
if !strings.HasPrefix(filePath, m.prefix) {
return "", false
}
if _, ok = tNode.Meta[finishedKV]; ok {
return "", false
}
if id, ok := tNode.Meta[uploadIDKV]; ok {
if filePath > m.keyMarker || (filePath == m.keyMarker && m.uploadID != "" && id > m.uploadID) {
return filePath, true
}
}
return "", false
}
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) {
stream, rootID, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
return &multipartInfoStream{
log: c.reqLogger(ctx),
rootID: rootID,
childStream: stream,
nodePaths: make(map[uint64]string),
treeService: c.service,
bktInfo: bktInfo,
uploadID: params.UploadIDMarker,
keyMarker: params.KeyMarker,
prefix: params.Prefix,
headPrefix: strings.TrimRightFunc(params.Prefix, func(r rune) bool {
return r != '/'
}),
}, nil
}
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
path := pathFromName(objectName)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: systemTree,
Path: path,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
log := c.reqLogger(ctx)
for _, node := range nodes {
info, err := newMultipartInfo(log, node)
if err != nil {
continue
}
if info.UploadID == uploadID {
if info.Finished {
break
}
return info, nil
}
}
return nil, tree.ErrNodeNotFound
}
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
}
meta := map[string]string{
partNumberKV: strconv.Itoa(info.Number),
oidKV: info.OID.EncodeToString(),
sizeKV: strconv.FormatUint(info.Size, 10),
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
etagKV: info.ETag,
md5KV: info.MD5,
}
objToDelete := make([]oid.ID, 0, 1)
partsToDelete := make([]uint64, 0, 1)
var (
latestPartID uint64
maxTimestamp uint64
)
multiNodeID := MultiID{multipartNodeID}
for _, part := range parts {
if multiNodeID.Equal(part.GetNodeID()) {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("id", part.GetNodeID()),
zap.Error(err))
continue
}
if partInfo.Number == info.Number {
nodeID := part.GetNodeID()[0]
objToDelete = append(objToDelete, partInfo.OID)
partsToDelete = append(partsToDelete, nodeID)
timestamp := partInfo.Timestamp
if timestamp > maxTimestamp {
maxTimestamp = timestamp
latestPartID = nodeID
}
}
}
if len(objToDelete) != 0 {
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
return nil, fmt.Errorf("move part node: %w", err)
}
for _, nodeID := range partsToDelete {
if nodeID == latestPartID {
continue
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("id", nodeID))
}
}
return objToDelete, nil
}
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return nil, err
}
return nil, tree.ErrNoNodeToRemove
}
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
}
result := make([]*data.PartInfoExtended, 0, len(parts))
for _, part := range parts {
if len(part.GetNodeID()) != 1 {
// multipart parts nodeID shouldn't have multiple values
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()))
continue
}
if part.GetNodeID()[0] == multipartNodeID {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()),
zap.Error(err))
continue
}
result = append(result, partInfo)
}
return result, nil
}
func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = bucketLifecycleFilename
meta[oidKV] = addr.Object().EncodeToString()
meta[cidKV] = addr.Container().EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return nil, err
}
return nil, tree.ErrNoNodeToRemove
}
latest := multiNode.Latest()
ind := latest.GetLatestNodeIndex()
if latest.IsSplit() {
c.reqLogger(ctx).Error(logs.BucketLifecycleNodeHasMultipleIDs)
}
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
return nil, fmt.Errorf("move lifecycle node: %w", err)
}
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
objToDelete[0], err = getTreeNodeAddress(latest)
if err != nil {
return nil, fmt.Errorf("parse object addr of latest lifecycle node in tree: %w", err)
}
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
return objToDelete, nil
}
func (c *Tree) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
if err != nil {
return oid.Address{}, fmt.Errorf("get lifecycle node: %w", err)
}
return getTreeNodeAddress(node.Latest())
}
func (c *Tree) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketLifecycleFilename)
isErrNotFound := errors.Is(err, tree.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, err
}
if isErrNotFound {
return nil, tree.ErrNoNodeToRemove
}
objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
if len(objToDelete) != len(multiNode.nodes) {
return nil, fmt.Errorf("failed to clean all old lifecycle nodes")
}
return objToDelete, nil
}
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
}
multipartInfo.Finished = true
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
}
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
meta := map[string]string{isLockKV: "true"}
if lock.IsLegalHoldSet() {
meta[legalHoldOIDKV] = lock.LegalHold().EncodeToString()
}
if lock.IsRetentionSet() {
meta[retentionOIDKV] = lock.Retention().EncodeToString()
meta[untilDateKV] = lock.UntilDate()
if lock.IsCompliance() {
meta[isComplianceKV] = "true"
}
}
if lock.ID() == 0 {
_, err := c.service.AddNode(ctx, bktInfo, versionTree, nodeID, meta)
return err
}
return c.service.MoveNode(ctx, bktInfo, versionTree, lock.ID(), nodeID, meta)
}
func (c *Tree) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) {
lockNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isLockKV)
if err != nil {
return nil, err
}
return getLock(lockNode)
}
func getLock(lockNode *treeNode) (*data.LockInfo, error) {
if lockNode == nil {
return &data.LockInfo{}, nil
}
if lockNode.IsSplit() {
return nil, errors.New("invalid lock node: this is split node")
}
lockInfo := data.NewLockInfo(lockNode.ID[0])
if legalHold, ok := lockNode.Get(legalHoldOIDKV); ok {
var legalHoldOID oid.ID
if err := legalHoldOID.DecodeString(legalHold); err != nil {
return nil, fmt.Errorf("invalid legal hold object id: %w", err)
}
lockInfo.SetLegalHold(legalHoldOID)
}
if retention, ok := lockNode.Get(retentionOIDKV); ok {
var retentionOID oid.ID
if err := retentionOID.DecodeString(retention); err != nil {
return nil, fmt.Errorf("invalid retention object id: %w", err)
}
_, isCompliance := lockNode.Get(isComplianceKV)
untilDate, _ := lockNode.Get(untilDateKV)
lockInfo.SetRetention(retentionOID, untilDate, isCompliance)
}
return lockInfo, nil
}
func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
nodes, err := c.getTreeNodes(ctx, bktInfo, objVersion.ID, isTagKV, isLockKV)
if err != nil {
return nil, nil, err
}
lockInfo, err := getLock(nodes[isLockKV])
if err != nil {
return nil, nil, err
}
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
}
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
path := pathFromName(version.FilePath)
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
FileNameKey: path[len(path)-1],
ownerKV: version.Owner.EncodeToString(),
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
creationEpochKV: strconv.FormatUint(version.CreationEpoch, 10),
}
if version.Size > 0 {
meta[sizeKV] = strconv.FormatUint(version.Size, 10)
}
if len(version.ETag) > 0 {
meta[etagKV] = version.ETag
}
if len(version.MD5) > 0 {
meta[md5KV] = version.MD5
}
if version.IsDeleteMarker {
meta[isDeleteMarkerKV] = "true"
}
if version.IsCombined {
meta[isCombinedKV] = "true"
}
if version.IsUnversioned {
meta[isUnversionedKV] = "true"
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
if err == nil {
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil {
return 0, err
}
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
}
if !errors.Is(err, tree.ErrNodeNotFound) {
return 0, err
}
}
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
}
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
if err != nil {
return err
}
if taggingNode != nil {
return c.service.RemoveNode(ctx, bktInfo, treeID, taggingNode.ID[0])
}
return nil
}
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
path := pathFromName(filepath)
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: treeID,
Path: path,
Meta: keysToReturn,
LatestOnly: false,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
log := c.reqLogger(ctx)
result := make([]*data.NodeVersion, 0, len(nodes))
for _, node := range nodes {
nodeVersion, err := newNodeVersion(log, filepath, node)
if err != nil {
return nil, err
}
if onlyUnversioned && !nodeVersion.IsUnversioned {
continue
}
result = append(result, nodeVersion)
}
return result, nil
}
func metaFromSettings(settings *data.BucketSettings) map[string]string {
results := make(map[string]string, 3)
results[FileNameKey] = settingsFileName
results[versioningKV] = settings.Versioning
results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
results[cannedACLKV] = settings.CannedACL
if settings.OwnerKey != nil {
results[ownerKeyKV] = hex.EncodeToString(settings.OwnerKey.Bytes())
}
return results
}
func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]string {
info.Meta[FileNameKey] = fileName
info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
info.Meta[creationEpochKV] = strconv.FormatUint(info.CreationEpoch, 10)
return info.Meta
}
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
p := &GetNodesParams{
BktInfo: bktInfo,
TreeID: systemTree,
Path: []string{name},
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
nodes = filterMultipartNodes(nodes)
if len(nodes) == 0 {
return nil, tree.ErrNodeNotFound
}
if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
}
return newMultiNode(nodes)
}
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
res := make([]NodeResponse, 0, len(nodes))
LOOP:
for _, node := range nodes {
for _, meta := range node.GetMeta() {
if meta.GetKey() == uploadIDKV {
continue LOOP
}
}
res = append(res, node)
}
return res
}
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
reqLogger := middleware.GetReqLog(ctx)
if reqLogger != nil {
return reqLogger
}
return c.log
}
func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) {
result := &data.ObjectLockConfiguration{}
if len(value) == 0 {
return result, nil
}
lockValues := strings.Split(value, ",")
result.ObjectLockEnabled = lockValues[0]
if len(lockValues) == 1 {
return result, nil
}
if len(lockValues) != 4 {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
var err error
var days, years int64
if len(lockValues[1]) > 0 {
if days, err = strconv.ParseInt(lockValues[1], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
if len(lockValues[3]) > 0 {
if years, err = strconv.ParseInt(lockValues[3], 10, 64); err != nil {
return nil, fmt.Errorf("invalid lock configuration: %s", value)
}
}
result.Rule = &data.ObjectLockRule{
DefaultRetention: &data.DefaultRetention{
Days: days,
Mode: lockValues[2],
Years: years,
},
}
return result, nil
}
func encodeLockConfiguration(conf *data.ObjectLockConfiguration) string {
if conf == nil {
return ""
}
if conf.Rule == nil || conf.Rule.DefaultRetention == nil {
return conf.ObjectLockEnabled
}
defaults := conf.Rule.DefaultRetention
return fmt.Sprintf("%s,%d,%s,%d", conf.ObjectLockEnabled, defaults.Days, defaults.Mode, defaults.Years)
}