forked from TrueCloudLab/frostfs-s3-gw
Denis Kirillov
f5326b9f04
It's need to fit user expectation on deleting CORS for example. Previously after removing cors (that was uploaded in split manner) we can still get some data (from other node) because deletion worked only for latest node version. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com> Signed-off-by: Alex Vanin <a.vanin@yadro.com>
1762 lines
47 KiB
Go
1762 lines
47 KiB
Go
package tree
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/exp/maps"
|
|
)
|
|
|
|
type (
|
|
Tree struct {
|
|
service ServiceClient
|
|
log *zap.Logger
|
|
}
|
|
|
|
// ServiceClient is a client to interact with tree service.
|
|
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
|
|
ServiceClient interface {
|
|
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
|
|
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]NodeResponse, error)
|
|
GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error)
|
|
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
|
|
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
|
|
MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error
|
|
RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error
|
|
}
|
|
|
|
SubTreeStream interface {
|
|
Next() (NodeResponse, error)
|
|
}
|
|
|
|
treeNode struct {
|
|
ID []uint64
|
|
ParentID []uint64
|
|
ObjID oid.ID
|
|
TimeStamp []uint64
|
|
Size uint64
|
|
Meta map[string]string
|
|
}
|
|
|
|
multiSystemNode struct {
|
|
// the first element is latest
|
|
nodes []*treeNode
|
|
}
|
|
|
|
GetNodesParams struct {
|
|
BktInfo *data.BucketInfo
|
|
TreeID string
|
|
Path []string
|
|
Meta []string
|
|
LatestOnly bool
|
|
AllAttrs bool
|
|
}
|
|
)
|
|
|
|
const (
|
|
FileNameKey = "FileName"
|
|
)
|
|
|
|
var (
|
|
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
|
|
ErrNodeNotFound = layer.ErrNodeNotFound
|
|
|
|
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
|
|
ErrNodeAccessDenied = layer.ErrNodeAccessDenied
|
|
|
|
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
|
|
ErrGatewayTimeout = layer.ErrGatewayTimeout
|
|
)
|
|
|
|
const (
|
|
versioningKV = "Versioning"
|
|
cannedACLKV = "cannedACL"
|
|
ownerKeyKV = "ownerKey"
|
|
lockConfigurationKV = "LockConfiguration"
|
|
oidKV = "OID"
|
|
cidKV = "CID"
|
|
|
|
isCombinedKV = "IsCombined"
|
|
isUnversionedKV = "IsUnversioned"
|
|
isTagKV = "IsTag"
|
|
uploadIDKV = "UploadId"
|
|
partNumberKV = "Number"
|
|
sizeKV = "Size"
|
|
etagKV = "ETag"
|
|
md5KV = "MD5"
|
|
finishedKV = "Finished"
|
|
|
|
// keys for lock.
|
|
isLockKV = "IsLock"
|
|
legalHoldOIDKV = "LegalHoldOID"
|
|
retentionOIDKV = "RetentionOID"
|
|
untilDateKV = "UntilDate"
|
|
isComplianceKV = "IsCompliance"
|
|
|
|
// keys for delete marker nodes.
|
|
isDeleteMarkerKV = "IsDeleteMarker"
|
|
ownerKV = "Owner"
|
|
createdKV = "Created"
|
|
|
|
settingsFileName = "bucket-settings"
|
|
corsFilename = "bucket-cors"
|
|
bucketTaggingFilename = "bucket-tagging"
|
|
|
|
// versionTree -- ID of a tree with object versions.
|
|
versionTree = "version"
|
|
|
|
// systemTree -- ID of a tree with system objects
|
|
// i.e. bucket settings with versioning and lock configuration, cors.
|
|
systemTree = "system"
|
|
|
|
separator = "/"
|
|
userDefinedTagPrefix = "User-Tag-"
|
|
|
|
maxGetSubTreeDepth = 0 // means all subTree
|
|
)
|
|
|
|
// NewTree creates instance of Tree using provided address and create grpc connection.
|
|
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
|
|
return &Tree{
|
|
service: service,
|
|
log: log,
|
|
}
|
|
}
|
|
|
|
type Meta interface {
|
|
GetKey() string
|
|
GetValue() []byte
|
|
}
|
|
|
|
type NodeResponse interface {
|
|
GetMeta() []Meta
|
|
GetNodeID() []uint64
|
|
GetParentID() []uint64
|
|
GetTimestamp() []uint64
|
|
}
|
|
|
|
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
|
tNode := &treeNode{
|
|
ID: nodeInfo.GetNodeID(),
|
|
ParentID: nodeInfo.GetParentID(),
|
|
TimeStamp: nodeInfo.GetTimestamp(),
|
|
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
|
}
|
|
|
|
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
|
|
return nil, errors.New("invalid tree node: missing id")
|
|
}
|
|
|
|
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
|
|
return nil, errors.New("invalid tree node: length multiple ids mismatch")
|
|
}
|
|
|
|
for _, kv := range nodeInfo.GetMeta() {
|
|
switch kv.GetKey() {
|
|
case oidKV:
|
|
if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
|
|
return nil, err
|
|
}
|
|
case sizeKV:
|
|
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
|
|
var err error
|
|
if tNode.Size, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err)
|
|
}
|
|
}
|
|
default:
|
|
tNode.Meta[kv.GetKey()] = string(kv.GetValue())
|
|
}
|
|
}
|
|
|
|
return tNode, nil
|
|
}
|
|
|
|
func (n *treeNode) Get(key string) (string, bool) {
|
|
value, ok := n.Meta[key]
|
|
return value, ok
|
|
}
|
|
|
|
func (n *treeNode) FileName() (string, bool) {
|
|
value, ok := n.Meta[FileNameKey]
|
|
return value, ok
|
|
}
|
|
|
|
func (n *treeNode) IsSplit() bool {
|
|
return len(n.ID) != 1 || len(n.ParentID) != 1 || len(n.TimeStamp) != 1
|
|
}
|
|
|
|
func (n *treeNode) GetLatestNodeIndex() int {
|
|
var (
|
|
maxTimestamp uint64
|
|
index int
|
|
)
|
|
|
|
for i, timestamp := range n.TimeStamp {
|
|
if timestamp > maxTimestamp {
|
|
maxTimestamp = timestamp
|
|
index = i
|
|
}
|
|
}
|
|
|
|
return index
|
|
}
|
|
|
|
func newNodeVersion(log *zap.Logger, filePath string, node NodeResponse) (*data.NodeVersion, error) {
|
|
tNode, err := newTreeNode(node)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid tree node: %w", err)
|
|
}
|
|
|
|
return newNodeVersionFromTreeNode(log, filePath, tNode)
|
|
}
|
|
|
|
func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.NodeVersion, error) {
|
|
_, isUnversioned := treeNode.Get(isUnversionedKV)
|
|
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
|
_, isCombined := treeNode.Get(isCombinedKV)
|
|
eTag, _ := treeNode.Get(etagKV)
|
|
md5, _ := treeNode.Get(md5KV)
|
|
|
|
if treeNode.IsSplit() {
|
|
return nil, errors.New("invalid version tree node: this is split node")
|
|
}
|
|
|
|
version := &data.NodeVersion{
|
|
BaseNodeVersion: data.BaseNodeVersion{
|
|
ID: treeNode.ID[0],
|
|
ParenID: treeNode.ParentID[0],
|
|
OID: treeNode.ObjID,
|
|
Timestamp: treeNode.TimeStamp[0],
|
|
ETag: eTag,
|
|
MD5: md5,
|
|
Size: treeNode.Size,
|
|
FilePath: filePath,
|
|
IsDeleteMarker: isDeleteMarker,
|
|
},
|
|
IsUnversioned: isUnversioned,
|
|
IsCombined: isCombined,
|
|
}
|
|
|
|
if createdStr, ok := treeNode.Get(createdKV); ok {
|
|
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err))
|
|
} else {
|
|
created := time.UnixMilli(utcMilli)
|
|
version.Created = &created
|
|
}
|
|
}
|
|
|
|
if ownerStr, ok := treeNode.Get(ownerKV); ok {
|
|
var owner user.ID
|
|
if err := owner.DecodeString(ownerStr); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err))
|
|
} else {
|
|
version.Owner = &owner
|
|
}
|
|
}
|
|
|
|
return version, nil
|
|
}
|
|
|
|
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
|
|
var (
|
|
err error
|
|
index int
|
|
maxTimestamp uint64
|
|
)
|
|
|
|
if len(nodes) == 0 {
|
|
return nil, errors.New("multi node must have at least one node")
|
|
}
|
|
|
|
treeNodes := make([]*treeNode, len(nodes))
|
|
|
|
for i, node := range nodes {
|
|
if treeNodes[i], err = newTreeNode(node); err != nil {
|
|
return nil, fmt.Errorf("parse system node response: %w", err)
|
|
}
|
|
|
|
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
|
|
index = i
|
|
maxTimestamp = timestamp
|
|
}
|
|
}
|
|
|
|
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
|
|
|
|
return &multiSystemNode{
|
|
nodes: treeNodes,
|
|
}, nil
|
|
}
|
|
|
|
func (m *multiSystemNode) Latest() *treeNode {
|
|
return m.nodes[0]
|
|
}
|
|
|
|
func (m *multiSystemNode) Old() []*treeNode {
|
|
return m.nodes[1:]
|
|
}
|
|
|
|
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
|
|
uploadID, _ := treeNode.Get(uploadIDKV)
|
|
if uploadID == "" {
|
|
return nil, fmt.Errorf("it's not a multipart node: missing UploadId")
|
|
}
|
|
|
|
if treeNode.IsSplit() {
|
|
return nil, fmt.Errorf("invalid multipart node '%s': tree node is split", filePath)
|
|
}
|
|
|
|
multipartInfo := &data.MultipartInfo{
|
|
ID: treeNode.ID[0],
|
|
Key: filePath,
|
|
UploadID: uploadID,
|
|
Meta: treeNode.Meta,
|
|
}
|
|
|
|
if ownerID, ok := treeNode.Get(ownerKV); ok {
|
|
if err := multipartInfo.Owner.DecodeString(ownerID); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if created, ok := treeNode.Get(createdKV); ok {
|
|
if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err))
|
|
} else {
|
|
multipartInfo.Created = time.UnixMilli(utcMilli)
|
|
}
|
|
}
|
|
|
|
if finished, ok := treeNode.Get(finishedKV); ok {
|
|
if flag, err := strconv.ParseBool(finished); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err))
|
|
} else {
|
|
multipartInfo.Finished = flag
|
|
}
|
|
}
|
|
|
|
return multipartInfo, nil
|
|
}
|
|
|
|
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
|
|
if len(node.GetNodeID()) != 1 {
|
|
return nil, errors.New("invalid multipart node: this is split node")
|
|
}
|
|
|
|
multipartInfo := &data.MultipartInfo{
|
|
ID: node.GetNodeID()[0],
|
|
Meta: make(map[string]string, len(node.GetMeta())),
|
|
}
|
|
|
|
for _, kv := range node.GetMeta() {
|
|
switch kv.GetKey() {
|
|
case uploadIDKV:
|
|
multipartInfo.UploadID = string(kv.GetValue())
|
|
case FileNameKey:
|
|
multipartInfo.Key = string(kv.GetValue())
|
|
case createdKV:
|
|
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err))
|
|
} else {
|
|
multipartInfo.Created = time.UnixMilli(utcMilli)
|
|
}
|
|
case ownerKV:
|
|
if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err))
|
|
}
|
|
case finishedKV:
|
|
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil {
|
|
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err))
|
|
} else {
|
|
multipartInfo.Finished = isFinished
|
|
}
|
|
default:
|
|
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
|
|
}
|
|
}
|
|
|
|
if multipartInfo.UploadID == "" {
|
|
return nil, fmt.Errorf("it's not a multipart node")
|
|
}
|
|
|
|
return multipartInfo, nil
|
|
}
|
|
|
|
func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
|
var err error
|
|
partInfo := &data.PartInfo{}
|
|
|
|
for _, kv := range node.GetMeta() {
|
|
value := string(kv.GetValue())
|
|
switch kv.GetKey() {
|
|
case partNumberKV:
|
|
if partInfo.Number, err = strconv.Atoi(value); err != nil {
|
|
return nil, fmt.Errorf("invalid part number: %w", err)
|
|
}
|
|
case oidKV:
|
|
if err = partInfo.OID.DecodeString(value); err != nil {
|
|
return nil, fmt.Errorf("invalid oid: %w", err)
|
|
}
|
|
case etagKV:
|
|
partInfo.ETag = value
|
|
case sizeKV:
|
|
if partInfo.Size, err = strconv.ParseUint(value, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("invalid part size: %w", err)
|
|
}
|
|
case createdKV:
|
|
var utcMilli int64
|
|
if utcMilli, err = strconv.ParseInt(value, 10, 64); err != nil {
|
|
return nil, fmt.Errorf("invalid created timestamp: %w", err)
|
|
}
|
|
partInfo.Created = time.UnixMilli(utcMilli)
|
|
case md5KV:
|
|
partInfo.MD5 = value
|
|
}
|
|
}
|
|
|
|
if partInfo.Number <= 0 {
|
|
return nil, fmt.Errorf("it's not a part node")
|
|
}
|
|
|
|
return partInfo, nil
|
|
}
|
|
|
|
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't get node: %w", err)
|
|
}
|
|
|
|
node := multiNode.Latest()
|
|
|
|
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
|
|
if versioningValue, ok := node.Get(versioningKV); ok {
|
|
settings.Versioning = versioningValue
|
|
}
|
|
|
|
if lockConfigurationValue, ok := node.Get(lockConfigurationKV); ok {
|
|
if settings.LockConfiguration, err = parseLockConfiguration(lockConfigurationValue); err != nil {
|
|
return nil, fmt.Errorf("settings node: invalid lock configuration: %w", err)
|
|
}
|
|
}
|
|
|
|
settings.CannedACL, _ = node.Get(cannedACLKV)
|
|
|
|
if ownerKeyHex, ok := node.Get(ownerKeyKV); ok {
|
|
if settings.OwnerKey, err = keys.NewPublicKeyFromString(ownerKeyHex); err != nil {
|
|
c.reqLogger(ctx).Error(logs.SettingsNodeInvalidOwnerKey, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return settings, nil
|
|
}
|
|
|
|
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
if err != nil && !isErrNotFound {
|
|
return fmt.Errorf("couldn't get node: %w", err)
|
|
}
|
|
|
|
meta := metaFromSettings(settings)
|
|
|
|
if isErrNotFound {
|
|
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta)
|
|
return err
|
|
}
|
|
|
|
latest := multiNode.Latest()
|
|
ind := latest.GetLatestNodeIndex()
|
|
if latest.IsSplit() {
|
|
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
|
|
}
|
|
|
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
|
|
return fmt.Errorf("move settings node: %w", err)
|
|
}
|
|
|
|
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
|
|
node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
|
if err != nil {
|
|
return oid.Address{}, err
|
|
}
|
|
|
|
return getCORSAddress(node.Latest())
|
|
}
|
|
|
|
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
if err != nil && !isErrNotFound {
|
|
return nil, fmt.Errorf("couldn't get node: %w", err)
|
|
}
|
|
|
|
meta := make(map[string]string)
|
|
meta[FileNameKey] = corsFilename
|
|
meta[oidKV] = addr.Object().EncodeToString()
|
|
meta[cidKV] = addr.Container().EncodeToString()
|
|
|
|
if isErrNotFound {
|
|
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, layer.ErrNoNodeToRemove
|
|
}
|
|
|
|
latest := multiNode.Latest()
|
|
ind := latest.GetLatestNodeIndex()
|
|
if latest.IsSplit() {
|
|
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
|
|
}
|
|
|
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
|
|
return nil, fmt.Errorf("move cors node: %w", err)
|
|
}
|
|
|
|
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
|
|
objToDelete[0], err = getCORSAddress(latest)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err)
|
|
}
|
|
|
|
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
|
|
|
|
return objToDelete, nil
|
|
}
|
|
|
|
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
if err != nil && !isErrNotFound {
|
|
return nil, err
|
|
}
|
|
|
|
if isErrNotFound {
|
|
return nil, layer.ErrNoNodeToRemove
|
|
}
|
|
|
|
objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
|
|
if len(objToDelete) != len(multiNode.nodes) {
|
|
return nil, fmt.Errorf("clean old cors nodes: %w", err)
|
|
}
|
|
|
|
return objToDelete, nil
|
|
}
|
|
|
|
func getCORSAddress(node *treeNode) (oid.Address, error) {
|
|
var addr oid.Address
|
|
addr.SetObject(node.ObjID)
|
|
|
|
if cidStr, ok := node.Get(cidKV); ok {
|
|
var cnrID cid.ID
|
|
if err := cnrID.DecodeString(cidStr); err != nil {
|
|
return oid.Address{}, fmt.Errorf("couldn't decode cid: %w", err)
|
|
}
|
|
addr.SetContainer(cnrID)
|
|
}
|
|
|
|
return addr, nil
|
|
}
|
|
|
|
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address {
|
|
res := make([]oid.Address, 0, len(nodes))
|
|
|
|
for _, node := range nodes {
|
|
ind := node.GetLatestNodeIndex()
|
|
if node.IsSplit() {
|
|
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
|
|
}
|
|
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
|
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
|
|
} else {
|
|
addr, err := getCORSAddress(node)
|
|
if err != nil {
|
|
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
|
|
continue
|
|
}
|
|
res = append(res, addr)
|
|
}
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
|
|
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return getObjectTagging(tagNode), nil
|
|
}
|
|
|
|
func getObjectTagging(tagNode *treeNode) map[string]string {
|
|
if tagNode == nil {
|
|
return nil
|
|
}
|
|
|
|
meta := make(map[string]string)
|
|
|
|
for key, val := range tagNode.Meta {
|
|
if strings.HasPrefix(key, userDefinedTagPrefix) {
|
|
meta[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
|
|
}
|
|
}
|
|
|
|
return meta
|
|
}
|
|
|
|
func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error {
|
|
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
treeTagSet := make(map[string]string)
|
|
treeTagSet[isTagKV] = "true"
|
|
|
|
for key, val := range tagSet {
|
|
treeTagSet[userDefinedTagPrefix+key] = val
|
|
}
|
|
|
|
if tagNode == nil {
|
|
_, err = c.service.AddNode(ctx, bktInfo, versionTree, objVersion.ID, treeTagSet)
|
|
return err
|
|
}
|
|
|
|
ind := tagNode.GetLatestNodeIndex()
|
|
if tagNode.IsSplit() {
|
|
c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
|
|
}
|
|
|
|
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
|
|
}
|
|
|
|
func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) error {
|
|
return c.PutObjectTagging(ctx, bktInfo, objVersion, nil)
|
|
}
|
|
|
|
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tags := make(map[string]string)
|
|
|
|
for key, val := range multiNode.Latest().Meta {
|
|
if strings.HasPrefix(key, userDefinedTagPrefix) {
|
|
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
|
|
}
|
|
}
|
|
|
|
return tags, nil
|
|
}
|
|
|
|
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
if err != nil && !isErrNotFound {
|
|
return fmt.Errorf("couldn't get node: %w", err)
|
|
}
|
|
|
|
treeTagSet := make(map[string]string)
|
|
treeTagSet[FileNameKey] = bucketTaggingFilename
|
|
|
|
for key, val := range tagSet {
|
|
treeTagSet[userDefinedTagPrefix+key] = val
|
|
}
|
|
|
|
if isErrNotFound {
|
|
_, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, treeTagSet)
|
|
return err
|
|
}
|
|
|
|
latest := multiNode.Latest()
|
|
ind := latest.GetLatestNodeIndex()
|
|
if latest.IsSplit() {
|
|
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
|
|
}
|
|
|
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
|
|
return fmt.Errorf("move bucket tagging node: %w", err)
|
|
}
|
|
|
|
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
|
return c.PutBucketTagging(ctx, bktInfo, nil)
|
|
}
|
|
|
|
func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, key string) (*treeNode, error) {
|
|
nodes, err := c.getTreeNodes(ctx, bktInfo, nodeID, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// if there will be many allocations, consider having separate
|
|
// implementations of 'getTreeNode' and 'getTreeNodes'
|
|
return nodes[key], nil
|
|
}
|
|
|
|
func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) {
|
|
subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// consider using map[string][]*treeNode
|
|
// to be able to remove unused node, that can be added during split
|
|
treeNodes := make(map[string]*treeNode, len(keys))
|
|
|
|
for _, s := range subtree {
|
|
node, err := newTreeNode(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, key := range keys {
|
|
if _, ok := node.Get(key); ok {
|
|
treeNodes[key] = node
|
|
break
|
|
}
|
|
}
|
|
if len(treeNodes) == len(keys) {
|
|
break
|
|
}
|
|
}
|
|
|
|
return treeNodes, nil
|
|
}
|
|
|
|
func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepath string) ([]*data.NodeVersion, error) {
|
|
return c.getVersions(ctx, bktInfo, versionTree, filepath, false)
|
|
}
|
|
|
|
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
|
|
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
|
path := pathFromName(objectName)
|
|
|
|
p := &GetNodesParams{
|
|
BktInfo: bktInfo,
|
|
TreeID: versionTree,
|
|
Path: path,
|
|
Meta: meta,
|
|
LatestOnly: false,
|
|
AllAttrs: false,
|
|
}
|
|
nodes, err := c.service.GetNodes(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
latestNode, err := getLatestVersionNode(nodes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newNodeVersion(c.reqLogger(ctx), objectName, latestNode)
|
|
}
|
|
|
|
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
|
var (
|
|
maxCreationTime uint64
|
|
targetIndexNode = -1
|
|
)
|
|
|
|
for i, node := range nodes {
|
|
if !checkExistOID(node.GetMeta()) {
|
|
continue
|
|
}
|
|
|
|
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
|
|
targetIndexNode = i
|
|
maxCreationTime = currentCreationTime
|
|
}
|
|
}
|
|
|
|
if targetIndexNode == -1 {
|
|
return nil, layer.ErrNodeNotFound
|
|
}
|
|
|
|
return nodes[targetIndexNode], nil
|
|
}
|
|
|
|
func getMaxTimestamp(node NodeResponse) uint64 {
|
|
var maxTimestamp uint64
|
|
|
|
for _, timestamp := range node.GetTimestamp() {
|
|
if timestamp > maxTimestamp {
|
|
maxTimestamp = timestamp
|
|
}
|
|
}
|
|
|
|
return maxTimestamp
|
|
}
|
|
|
|
func checkExistOID(meta []Meta) bool {
|
|
for _, kv := range meta {
|
|
if kv.GetKey() == "OID" {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// pathFromName splits name by '/'.
|
|
func pathFromName(objectName string) []string {
|
|
return strings.Split(objectName, separator)
|
|
}
|
|
|
|
type DummySubTreeStream struct {
|
|
data NodeResponse
|
|
read bool
|
|
}
|
|
|
|
func (s *DummySubTreeStream) Next() (NodeResponse, error) {
|
|
if s.read {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
s.read = true
|
|
return s.data, nil
|
|
}
|
|
|
|
type MultiID []uint64
|
|
|
|
func (m MultiID) Equal(id MultiID) bool {
|
|
seen := make(map[uint64]struct{}, len(m))
|
|
|
|
for i := range m {
|
|
seen[m[i]] = struct{}{}
|
|
}
|
|
|
|
for i := range id {
|
|
if _, ok := seen[id[i]]; !ok {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
type VersionsByPrefixStreamImpl struct {
|
|
ctx context.Context
|
|
rootID MultiID
|
|
intermediateRootID MultiID
|
|
service ServiceClient
|
|
bktInfo *data.BucketInfo
|
|
mainStream SubTreeStream
|
|
innerStream SubTreeStream
|
|
headPrefix string
|
|
tailPrefix string
|
|
namesMap map[uint64]string
|
|
ended bool
|
|
latestOnly bool
|
|
currentLatest *data.NodeVersion
|
|
log *zap.Logger
|
|
}
|
|
|
|
func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) {
|
|
if s.ended {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
for {
|
|
if s.innerStream == nil {
|
|
node, err := s.getNodeFromMainStream()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
s.ended = true
|
|
if s.currentLatest != nil {
|
|
return s.currentLatest, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("get node from main stream: %w", err)
|
|
}
|
|
|
|
if err = s.initInnerStream(node); err != nil {
|
|
return nil, fmt.Errorf("init inner stream: %w", err)
|
|
}
|
|
}
|
|
|
|
nodeVersion, err := s.getNodeVersionFromInnerStream()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
s.innerStream = nil
|
|
maps.Clear(s.namesMap)
|
|
if s.currentLatest != nil && !s.intermediateRootID.Equal([]uint64{s.currentLatest.ID}) {
|
|
return s.currentLatest, nil
|
|
}
|
|
continue
|
|
}
|
|
return nil, fmt.Errorf("inner stream: %w", err)
|
|
}
|
|
return nodeVersion, nil
|
|
}
|
|
}
|
|
|
|
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
|
|
for {
|
|
node, err := s.mainStream.Next()
|
|
if err != nil {
|
|
if errors.Is(err, ErrNodeNotFound) {
|
|
return nil, io.EOF
|
|
}
|
|
return nil, fmt.Errorf("main stream next: %w", err)
|
|
}
|
|
|
|
if !s.rootID.Equal(node.GetNodeID()) && strings.HasPrefix(getFilename(node), s.tailPrefix) {
|
|
return node, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
|
|
if s.rootID.Equal(node.GetParentID()) {
|
|
s.intermediateRootID = node.GetNodeID()
|
|
}
|
|
|
|
if isIntermediate(node) {
|
|
s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
|
|
if err != nil {
|
|
return fmt.Errorf("get sub tree node from main stream: %w", err)
|
|
}
|
|
} else {
|
|
s.innerStream = &DummySubTreeStream{data: node}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
|
|
for {
|
|
node, err := s.innerStream.Next()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("inner stream: %w", err)
|
|
}
|
|
|
|
nodeVersion, skip, err := s.parseNodeResponse(node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if skip {
|
|
continue
|
|
}
|
|
|
|
if s.latestOnly {
|
|
if s.currentLatest == nil {
|
|
s.currentLatest = nodeVersion
|
|
continue
|
|
}
|
|
|
|
if s.currentLatest.FilePath != nodeVersion.FilePath {
|
|
res := s.currentLatest
|
|
s.currentLatest = nodeVersion
|
|
return res, nil
|
|
}
|
|
|
|
if s.currentLatest.Timestamp < nodeVersion.Timestamp {
|
|
s.currentLatest = nodeVersion
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
return nodeVersion, nil
|
|
}
|
|
}
|
|
|
|
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
|
|
trNode, fileName, err := parseTreeNode(node)
|
|
if err != nil {
|
|
s.log.Debug(logs.ParseTreeNode, zap.Error(err))
|
|
return nil, true, nil
|
|
}
|
|
|
|
var parentPrefix string
|
|
if s.headPrefix != "" { // The root of subTree can also have a parent
|
|
parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar'
|
|
}
|
|
|
|
var filepath string
|
|
if !s.intermediateRootID.Equal(trNode.ID) {
|
|
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
|
|
return nil, false, fmt.Errorf("invalid node order: %w", err)
|
|
}
|
|
} else {
|
|
filepath = parentPrefix + fileName
|
|
for _, id := range trNode.ID {
|
|
s.namesMap[id] = filepath
|
|
}
|
|
}
|
|
|
|
if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap
|
|
return nil, true, nil
|
|
}
|
|
|
|
nodeVersion, err := newNodeVersionFromTreeNode(s.log, filepath, trNode)
|
|
return nodeVersion, false, err
|
|
}
|
|
|
|
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
|
|
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return &VersionsByPrefixStreamImpl{ended: true}, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return &VersionsByPrefixStreamImpl{
|
|
ctx: ctx,
|
|
namesMap: map[uint64]string{},
|
|
rootID: rootID,
|
|
service: c.service,
|
|
bktInfo: bktInfo,
|
|
mainStream: mainStream,
|
|
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
|
|
tailPrefix: tailPrefix,
|
|
latestOnly: latestOnly,
|
|
log: c.reqLogger(ctx),
|
|
}, nil
|
|
}
|
|
|
|
func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, []uint64, error) {
|
|
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
|
|
if err != nil {
|
|
if errors.Is(err, layer.ErrNodeNotFound) {
|
|
return nil, "", nil, io.EOF
|
|
}
|
|
return nil, "", nil, err
|
|
}
|
|
|
|
subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
|
|
if err != nil {
|
|
if errors.Is(err, layer.ErrNodeNotFound) {
|
|
return nil, "", nil, io.EOF
|
|
}
|
|
return nil, "", nil, err
|
|
}
|
|
|
|
return subTree, tailPrefix, rootID, nil
|
|
}
|
|
|
|
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
|
|
rootID := []uint64{0}
|
|
path := strings.Split(prefix, separator)
|
|
tailPrefix := path[len(path)-1]
|
|
|
|
if len(path) > 1 {
|
|
var err error
|
|
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
}
|
|
|
|
return rootID, tailPrefix, nil
|
|
}
|
|
|
|
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
|
|
p := &GetNodesParams{
|
|
BktInfo: bktInfo,
|
|
TreeID: treeID,
|
|
Path: prefixPath,
|
|
LatestOnly: false,
|
|
AllAttrs: true,
|
|
}
|
|
nodes, err := c.service.GetNodes(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var intermediateNodes []uint64
|
|
for _, node := range nodes {
|
|
if isIntermediate(node) {
|
|
intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
|
|
}
|
|
}
|
|
|
|
if len(intermediateNodes) == 0 {
|
|
return nil, layer.ErrNodeNotFound
|
|
}
|
|
|
|
return intermediateNodes, nil
|
|
}
|
|
|
|
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
|
|
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
|
|
if err != nil {
|
|
if errors.Is(err, layer.ErrNodeNotFound) {
|
|
return nil, "", nil
|
|
}
|
|
return nil, "", err
|
|
}
|
|
|
|
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2)
|
|
if err != nil {
|
|
if errors.Is(err, layer.ErrNodeNotFound) {
|
|
return nil, "", nil
|
|
}
|
|
return nil, "", err
|
|
}
|
|
|
|
nodesMap := make(map[string][]NodeResponse, len(subTree))
|
|
for _, node := range subTree {
|
|
if MultiID(rootID).Equal(node.GetNodeID()) {
|
|
continue
|
|
}
|
|
|
|
fileName := getFilename(node)
|
|
if !strings.HasPrefix(fileName, tailPrefix) {
|
|
continue
|
|
}
|
|
|
|
nodes := nodesMap[fileName]
|
|
|
|
// Add all nodes if flag latestOnly is false.
|
|
// Add all intermediate nodes
|
|
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
|
|
if len(nodes) == 0 {
|
|
nodes = []NodeResponse{node}
|
|
} else if !latestOnly || isIntermediate(node) {
|
|
nodes = append(nodes, node)
|
|
} else if isIntermediate(nodes[0]) {
|
|
nodes = append([]NodeResponse{node}, nodes...)
|
|
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
|
|
nodes[0] = node
|
|
}
|
|
|
|
nodesMap[fileName] = nodes
|
|
}
|
|
|
|
result := make([]NodeResponse, 0, len(subTree))
|
|
for _, nodes := range nodesMap {
|
|
result = append(result, nodes...)
|
|
}
|
|
|
|
return result, strings.TrimSuffix(prefix, tailPrefix), nil
|
|
}
|
|
|
|
func getFilename(node NodeResponse) string {
|
|
for _, kv := range node.GetMeta() {
|
|
if kv.GetKey() == FileNameKey {
|
|
return string(kv.GetValue())
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func isIntermediate(node NodeResponse) bool {
|
|
if len(node.GetMeta()) != 1 {
|
|
return false
|
|
}
|
|
|
|
return node.GetMeta()[0].GetKey() == FileNameKey
|
|
}
|
|
|
|
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
|
|
var filepath string
|
|
|
|
for i, id := range node.GetParentID() {
|
|
parentPath, ok := namesMap[id]
|
|
if !ok {
|
|
return "", fmt.Errorf("couldn't get parent path")
|
|
}
|
|
|
|
filepath = parentPath + separator + fileName
|
|
namesMap[node.GetNodeID()[i]] = filepath
|
|
}
|
|
|
|
return filepath, nil
|
|
}
|
|
|
|
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
|
|
tNode, err := newTreeNode(node)
|
|
if err != nil { // invalid OID attribute
|
|
return nil, "", err
|
|
}
|
|
|
|
fileName, ok := tNode.FileName()
|
|
if !ok {
|
|
return nil, "", fmt.Errorf("doesn't contain FileName")
|
|
}
|
|
|
|
return tNode, fileName, nil
|
|
}
|
|
|
|
func formLatestNodeKey(parentID uint64, fileName string) string {
|
|
return strconv.FormatUint(parentID, 10) + "." + fileName
|
|
}
|
|
|
|
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
|
|
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
|
}
|
|
|
|
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
|
|
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(nodes) == 0 {
|
|
return nil, layer.ErrNodeNotFound
|
|
}
|
|
|
|
if len(nodes) > 1 {
|
|
c.reqLogger(ctx).Debug(logs.FoundMoreThanOneUnversionedNode,
|
|
zap.String("treeID", treeID), zap.String("filepath", filepath))
|
|
}
|
|
|
|
sort.Slice(nodes, func(i, j int) bool {
|
|
return nodes[i].Timestamp > nodes[j].Timestamp
|
|
})
|
|
|
|
return nodes[0], nil
|
|
}
|
|
|
|
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
|
|
return c.addVersion(ctx, bktInfo, versionTree, version)
|
|
}
|
|
|
|
func (c *Tree) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, id uint64) error {
|
|
return c.service.RemoveNode(ctx, bktInfo, versionTree, id)
|
|
}
|
|
|
|
func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
|
|
path := pathFromName(info.Key)
|
|
meta := metaFromMultipart(info, path[len(path)-1])
|
|
_, err := c.service.AddNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta)
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
|
|
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result []*data.MultipartInfo
|
|
for _, node := range subTreeNodes {
|
|
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, multipartUploads...)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
|
|
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var parentPrefix string
|
|
if parentFilePath != "" { // The root of subTree can also have a parent
|
|
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
|
|
}
|
|
|
|
var filepath string
|
|
namesMap := make(map[uint64]string, len(subTree))
|
|
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
|
|
|
|
for i, node := range subTree {
|
|
tNode, fileName, err := parseTreeNode(node)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if i != 0 {
|
|
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
|
|
return nil, fmt.Errorf("invalid node order: %w", err)
|
|
}
|
|
} else {
|
|
filepath = parentPrefix + fileName
|
|
for _, id := range tNode.ID {
|
|
namesMap[id] = filepath
|
|
}
|
|
}
|
|
|
|
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode)
|
|
if err != nil || multipartInfo.Finished {
|
|
continue
|
|
}
|
|
|
|
for _, id := range node.GetParentID() {
|
|
key := formLatestNodeKey(id, fileName)
|
|
multipartInfos, ok := multiparts[key]
|
|
if !ok {
|
|
multipartInfos = []*data.MultipartInfo{multipartInfo}
|
|
} else {
|
|
multipartInfos = append(multipartInfos, multipartInfo)
|
|
}
|
|
|
|
multiparts[key] = multipartInfos
|
|
}
|
|
}
|
|
|
|
result := make([]*data.MultipartInfo, 0, len(multiparts))
|
|
for _, multipartInfo := range multiparts {
|
|
result = append(result, multipartInfo...)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
|
|
path := pathFromName(objectName)
|
|
p := &GetNodesParams{
|
|
BktInfo: bktInfo,
|
|
TreeID: systemTree,
|
|
Path: path,
|
|
AllAttrs: true,
|
|
}
|
|
|
|
nodes, err := c.service.GetNodes(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log := c.reqLogger(ctx)
|
|
for _, node := range nodes {
|
|
info, err := newMultipartInfo(log, node)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if info.UploadID == uploadID {
|
|
if info.Finished {
|
|
break
|
|
}
|
|
return info, nil
|
|
}
|
|
}
|
|
|
|
return nil, layer.ErrNodeNotFound
|
|
}
|
|
|
|
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
|
|
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
|
|
if err != nil {
|
|
return oid.ID{}, err
|
|
}
|
|
|
|
meta := map[string]string{
|
|
partNumberKV: strconv.Itoa(info.Number),
|
|
oidKV: info.OID.EncodeToString(),
|
|
sizeKV: strconv.FormatUint(info.Size, 10),
|
|
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
|
|
etagKV: info.ETag,
|
|
md5KV: info.MD5,
|
|
}
|
|
|
|
for _, part := range parts {
|
|
if len(part.GetNodeID()) != 1 {
|
|
// multipart parts nodeID shouldn't have multiple values
|
|
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
|
|
zap.String("key", info.Key),
|
|
zap.String("upload id", info.UploadID),
|
|
zap.Uint64("multipart node id ", multipartNodeID),
|
|
zap.Uint64s("node ids", part.GetNodeID()))
|
|
continue
|
|
}
|
|
nodeID := part.GetNodeID()[0]
|
|
if nodeID == multipartNodeID {
|
|
continue
|
|
}
|
|
partInfo, err := newPartInfo(part)
|
|
if err != nil {
|
|
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
|
|
zap.String("key", info.Key),
|
|
zap.String("upload id", info.UploadID),
|
|
zap.Uint64("multipart node id ", multipartNodeID),
|
|
zap.Error(err))
|
|
continue
|
|
}
|
|
if partInfo.Number == info.Number {
|
|
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta)
|
|
}
|
|
}
|
|
|
|
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
|
return oid.ID{}, err
|
|
}
|
|
|
|
return oid.ID{}, layer.ErrNoNodeToRemove
|
|
}
|
|
|
|
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
|
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make([]*data.PartInfo, 0, len(parts))
|
|
for _, part := range parts {
|
|
if len(part.GetNodeID()) != 1 {
|
|
// multipart parts nodeID shouldn't have multiple values
|
|
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
|
|
zap.Uint64("multipart node id ", multipartNodeID),
|
|
zap.Uint64s("node ids", part.GetNodeID()))
|
|
continue
|
|
}
|
|
if part.GetNodeID()[0] == multipartNodeID {
|
|
continue
|
|
}
|
|
partInfo, err := newPartInfo(part)
|
|
if err != nil {
|
|
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
|
|
zap.Uint64("multipart node id ", multipartNodeID),
|
|
zap.Uint64s("node ids", part.GetNodeID()),
|
|
zap.Error(err))
|
|
continue
|
|
}
|
|
result = append(result, partInfo)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
|
|
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
multipartInfo.Finished = true
|
|
|
|
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
|
|
}
|
|
|
|
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
|
|
meta := map[string]string{isLockKV: "true"}
|
|
|
|
if lock.IsLegalHoldSet() {
|
|
meta[legalHoldOIDKV] = lock.LegalHold().EncodeToString()
|
|
}
|
|
if lock.IsRetentionSet() {
|
|
meta[retentionOIDKV] = lock.Retention().EncodeToString()
|
|
meta[untilDateKV] = lock.UntilDate()
|
|
if lock.IsCompliance() {
|
|
meta[isComplianceKV] = "true"
|
|
}
|
|
}
|
|
|
|
if lock.ID() == 0 {
|
|
_, err := c.service.AddNode(ctx, bktInfo, versionTree, nodeID, meta)
|
|
return err
|
|
}
|
|
|
|
return c.service.MoveNode(ctx, bktInfo, versionTree, lock.ID(), nodeID, meta)
|
|
}
|
|
|
|
func (c *Tree) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) {
|
|
lockNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isLockKV)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return getLock(lockNode)
|
|
}
|
|
|
|
func getLock(lockNode *treeNode) (*data.LockInfo, error) {
|
|
if lockNode == nil {
|
|
return &data.LockInfo{}, nil
|
|
}
|
|
if lockNode.IsSplit() {
|
|
return nil, errors.New("invalid lock node: this is split node")
|
|
}
|
|
lockInfo := data.NewLockInfo(lockNode.ID[0])
|
|
|
|
if legalHold, ok := lockNode.Get(legalHoldOIDKV); ok {
|
|
var legalHoldOID oid.ID
|
|
if err := legalHoldOID.DecodeString(legalHold); err != nil {
|
|
return nil, fmt.Errorf("invalid legal hold object id: %w", err)
|
|
}
|
|
lockInfo.SetLegalHold(legalHoldOID)
|
|
}
|
|
|
|
if retention, ok := lockNode.Get(retentionOIDKV); ok {
|
|
var retentionOID oid.ID
|
|
if err := retentionOID.DecodeString(retention); err != nil {
|
|
return nil, fmt.Errorf("invalid retention object id: %w", err)
|
|
}
|
|
_, isCompliance := lockNode.Get(isComplianceKV)
|
|
untilDate, _ := lockNode.Get(untilDateKV)
|
|
lockInfo.SetRetention(retentionOID, untilDate, isCompliance)
|
|
}
|
|
|
|
return lockInfo, nil
|
|
}
|
|
|
|
func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
|
|
nodes, err := c.getTreeNodes(ctx, bktInfo, objVersion.ID, isTagKV, isLockKV)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
lockInfo, err := getLock(nodes[isLockKV])
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
|
|
}
|
|
|
|
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
|
|
path := pathFromName(version.FilePath)
|
|
meta := map[string]string{
|
|
oidKV: version.OID.EncodeToString(),
|
|
FileNameKey: path[len(path)-1],
|
|
ownerKV: version.Owner.EncodeToString(),
|
|
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
|
|
}
|
|
|
|
if version.Size > 0 {
|
|
meta[sizeKV] = strconv.FormatUint(version.Size, 10)
|
|
}
|
|
if len(version.ETag) > 0 {
|
|
meta[etagKV] = version.ETag
|
|
}
|
|
if len(version.MD5) > 0 {
|
|
meta[md5KV] = version.MD5
|
|
}
|
|
|
|
if version.IsDeleteMarker {
|
|
meta[isDeleteMarkerKV] = "true"
|
|
}
|
|
|
|
if version.IsCombined {
|
|
meta[isCombinedKV] = "true"
|
|
}
|
|
|
|
if version.IsUnversioned {
|
|
meta[isUnversionedKV] = "true"
|
|
|
|
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
|
|
if err == nil {
|
|
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
|
|
}
|
|
|
|
if !errors.Is(err, layer.ErrNodeNotFound) {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
|
|
}
|
|
|
|
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
|
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if taggingNode != nil {
|
|
return c.service.RemoveNode(ctx, bktInfo, treeID, taggingNode.ID[0])
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
|
|
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
|
path := pathFromName(filepath)
|
|
p := &GetNodesParams{
|
|
BktInfo: bktInfo,
|
|
TreeID: treeID,
|
|
Path: path,
|
|
Meta: keysToReturn,
|
|
LatestOnly: false,
|
|
AllAttrs: false,
|
|
}
|
|
nodes, err := c.service.GetNodes(ctx, p)
|
|
if err != nil {
|
|
if errors.Is(err, layer.ErrNodeNotFound) {
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
log := c.reqLogger(ctx)
|
|
result := make([]*data.NodeVersion, 0, len(nodes))
|
|
for _, node := range nodes {
|
|
nodeVersion, err := newNodeVersion(log, filepath, node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if onlyUnversioned && !nodeVersion.IsUnversioned {
|
|
continue
|
|
}
|
|
|
|
result = append(result, nodeVersion)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func metaFromSettings(settings *data.BucketSettings) map[string]string {
|
|
results := make(map[string]string, 3)
|
|
|
|
results[FileNameKey] = settingsFileName
|
|
results[versioningKV] = settings.Versioning
|
|
results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
|
|
results[cannedACLKV] = settings.CannedACL
|
|
if settings.OwnerKey != nil {
|
|
results[ownerKeyKV] = hex.EncodeToString(settings.OwnerKey.Bytes())
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]string {
|
|
info.Meta[FileNameKey] = fileName
|
|
info.Meta[uploadIDKV] = info.UploadID
|
|
info.Meta[ownerKV] = info.Owner.EncodeToString()
|
|
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
|
|
if info.Finished {
|
|
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
|
|
}
|
|
|
|
return info.Meta
|
|
}
|
|
|
|
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
|
|
p := &GetNodesParams{
|
|
BktInfo: bktInfo,
|
|
TreeID: systemTree,
|
|
Path: []string{name},
|
|
LatestOnly: false,
|
|
AllAttrs: true,
|
|
}
|
|
nodes, err := c.service.GetNodes(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nodes = filterMultipartNodes(nodes)
|
|
|
|
if len(nodes) == 0 {
|
|
return nil, layer.ErrNodeNotFound
|
|
}
|
|
if len(nodes) != 1 {
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
|
|
}
|
|
|
|
return newMultiNode(nodes)
|
|
}
|
|
|
|
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
|
|
res := make([]NodeResponse, 0, len(nodes))
|
|
|
|
LOOP:
|
|
for _, node := range nodes {
|
|
for _, meta := range node.GetMeta() {
|
|
if meta.GetKey() == uploadIDKV {
|
|
continue LOOP
|
|
}
|
|
}
|
|
|
|
res = append(res, node)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
|
|
reqLogger := middleware.GetReqLog(ctx)
|
|
if reqLogger != nil {
|
|
return reqLogger
|
|
}
|
|
return c.log
|
|
}
|
|
|
|
func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) {
|
|
result := &data.ObjectLockConfiguration{}
|
|
if len(value) == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
lockValues := strings.Split(value, ",")
|
|
result.ObjectLockEnabled = lockValues[0]
|
|
|
|
if len(lockValues) == 1 {
|
|
return result, nil
|
|
}
|
|
|
|
if len(lockValues) != 4 {
|
|
return nil, fmt.Errorf("invalid lock configuration: %s", value)
|
|
}
|
|
|
|
var err error
|
|
var days, years int64
|
|
|
|
if len(lockValues[1]) > 0 {
|
|
if days, err = strconv.ParseInt(lockValues[1], 10, 64); err != nil {
|
|
return nil, fmt.Errorf("invalid lock configuration: %s", value)
|
|
}
|
|
}
|
|
|
|
if len(lockValues[3]) > 0 {
|
|
if years, err = strconv.ParseInt(lockValues[3], 10, 64); err != nil {
|
|
return nil, fmt.Errorf("invalid lock configuration: %s", value)
|
|
}
|
|
}
|
|
|
|
result.Rule = &data.ObjectLockRule{
|
|
DefaultRetention: &data.DefaultRetention{
|
|
Days: days,
|
|
Mode: lockValues[2],
|
|
Years: years,
|
|
},
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func encodeLockConfiguration(conf *data.ObjectLockConfiguration) string {
|
|
if conf == nil {
|
|
return ""
|
|
}
|
|
|
|
if conf.Rule == nil || conf.Rule.DefaultRetention == nil {
|
|
return conf.ObjectLockEnabled
|
|
}
|
|
|
|
defaults := conf.Rule.DefaultRetention
|
|
return fmt.Sprintf("%s,%d,%s,%d", conf.ObjectLockEnabled, defaults.Days, defaults.Mode, defaults.Years)
|
|
}
|