|
|
|
@ -54,6 +54,11 @@ type (
|
|
|
|
|
Meta map[string]string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
multiSystemNode struct {
|
|
|
|
|
// the first element is latest
|
|
|
|
|
nodes []*treeNode
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
GetNodesParams struct {
|
|
|
|
|
BktInfo *data.BucketInfo
|
|
|
|
|
TreeID string
|
|
|
|
@ -270,6 +275,45 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
|
|
|
|
|
return version, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
|
|
|
|
|
var (
|
|
|
|
|
err error
|
|
|
|
|
index int
|
|
|
|
|
maxTimestamp uint64
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if len(nodes) == 0 {
|
|
|
|
|
return nil, errors.New("multi node must have at least one node")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
treeNodes := make([]*treeNode, len(nodes))
|
|
|
|
|
|
|
|
|
|
for i, node := range nodes {
|
|
|
|
|
if treeNodes[i], err = newTreeNode(node); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("parse system node response: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
|
|
|
|
|
index = i
|
|
|
|
|
maxTimestamp = timestamp
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
|
|
|
|
|
|
|
|
|
|
return &multiSystemNode{
|
|
|
|
|
nodes: treeNodes,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *multiSystemNode) Latest() *treeNode {
|
|
|
|
|
return m.nodes[0]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *multiSystemNode) Old() []*treeNode {
|
|
|
|
|
return m.nodes[1:]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
|
|
|
|
|
uploadID, _ := treeNode.Get(uploadIDKV)
|
|
|
|
|
if uploadID == "" {
|
|
|
|
@ -396,11 +440,13 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
|
|
|
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("couldn't get node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
node := multiNode.Latest()
|
|
|
|
|
|
|
|
|
|
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
|
|
|
|
|
if versioningValue, ok := node.Get(versioningKV); ok {
|
|
|
|
|
settings.Versioning = versioningValue
|
|
|
|
@ -424,7 +470,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
|
|
|
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
|
|
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
|
|
|
if err != nil && !isErrNotFound {
|
|
|
|
|
return fmt.Errorf("couldn't get node: %w", err)
|
|
|
|
@ -437,28 +483,35 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ind := node.GetLatestNodeIndex()
|
|
|
|
|
if node.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes)
|
|
|
|
|
latest := multiNode.Latest()
|
|
|
|
|
ind := latest.GetLatestNodeIndex()
|
|
|
|
|
if latest.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
|
|
|
|
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
|
|
|
|
|
return fmt.Errorf("move settings node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return oid.Address{}, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return getCORSAddress(node)
|
|
|
|
|
return getTreeNodeAddress(node.Latest())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) {
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
|
|
|
|
|
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
|
|
|
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
|
|
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
|
|
|
if err != nil && !isErrNotFound {
|
|
|
|
|
return oid.Address{}, fmt.Errorf("couldn't get node: %w", err)
|
|
|
|
|
return nil, fmt.Errorf("couldn't get node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
meta := make(map[string]string)
|
|
|
|
@ -468,48 +521,52 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr
|
|
|
|
|
|
|
|
|
|
if isErrNotFound {
|
|
|
|
|
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
|
|
|
|
|
return oid.Address{}, err
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return oid.Address{}, layer.ErrNoNodeToRemove
|
|
|
|
|
return nil, layer.ErrNoNodeToRemove
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ind := node.GetLatestNodeIndex()
|
|
|
|
|
if node.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
|
|
|
|
|
latest := multiNode.Latest()
|
|
|
|
|
ind := latest.GetLatestNodeIndex()
|
|
|
|
|
if latest.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
prevAddr, err := getCORSAddress(node)
|
|
|
|
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("move cors node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
|
|
|
|
|
objToDelete[0], err = getTreeNodeAddress(latest)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err)
|
|
|
|
|
return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return prevAddr, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
|
|
|
|
|
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
|
|
|
|
|
|
|
|
|
|
return objToDelete, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
|
|
|
|
|
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
|
|
|
|
|
return oid.Address{}, err
|
|
|
|
|
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
|
|
|
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
|
|
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
|
|
|
if err != nil && !isErrNotFound {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if node != nil {
|
|
|
|
|
ind := node.GetLatestNodeIndex()
|
|
|
|
|
if node.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
|
|
|
|
|
if isErrNotFound {
|
|
|
|
|
return nil, layer.ErrNoNodeToRemove
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
addr, err := getCORSAddress(node)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err)
|
|
|
|
|
objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo)
|
|
|
|
|
if len(objToDelete) != len(multiNode.nodes) {
|
|
|
|
|
return nil, fmt.Errorf("clean old cors nodes: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return addr, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind])
|
|
|
|
|
return objToDelete, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return oid.Address{}, layer.ErrNoNodeToRemove
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getCORSAddress(node *treeNode) (oid.Address, error) {
|
|
|
|
|
func getTreeNodeAddress(node *treeNode) (oid.Address, error) {
|
|
|
|
|
var addr oid.Address
|
|
|
|
|
addr.SetObject(node.ObjID)
|
|
|
|
|
|
|
|
|
@ -524,6 +581,29 @@ func getCORSAddress(node *treeNode) (oid.Address, error) {
|
|
|
|
|
return addr, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address {
|
|
|
|
|
res := make([]oid.Address, 0, len(nodes))
|
|
|
|
|
|
|
|
|
|
for _, node := range nodes {
|
|
|
|
|
ind := node.GetLatestNodeIndex()
|
|
|
|
|
if node.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID))
|
|
|
|
|
}
|
|
|
|
|
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
|
|
|
|
|
} else {
|
|
|
|
|
addr, err := getTreeNodeAddress(node)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
res = append(res, addr)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
|
|
|
|
|
tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -569,7 +649,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o
|
|
|
|
|
|
|
|
|
|
ind := tagNode.GetLatestNodeIndex()
|
|
|
|
|
if tagNode.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes)
|
|
|
|
|
c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
|
|
|
|
@ -580,14 +660,14 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
|
|
|
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tags := make(map[string]string)
|
|
|
|
|
|
|
|
|
|
for key, val := range node.Meta {
|
|
|
|
|
for key, val := range multiNode.Latest().Meta {
|
|
|
|
|
if strings.HasPrefix(key, userDefinedTagPrefix) {
|
|
|
|
|
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
|
|
|
|
|
}
|
|
|
|
@ -597,7 +677,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
|
|
|
|
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
|
|
|
|
|
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
|
|
|
|
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
|
|
|
|
if err != nil && !isErrNotFound {
|
|
|
|
|
return fmt.Errorf("couldn't get node: %w", err)
|
|
|
|
@ -615,12 +695,19 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ind := node.GetLatestNodeIndex()
|
|
|
|
|
if node.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes)
|
|
|
|
|
latest := multiNode.Latest()
|
|
|
|
|
ind := latest.GetLatestNodeIndex()
|
|
|
|
|
if latest.IsSplit() {
|
|
|
|
|
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet)
|
|
|
|
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
|
|
|
|
|
return fmt.Errorf("move bucket tagging node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
|
|
|
@ -643,6 +730,8 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// consider using map[string][]*treeNode
|
|
|
|
|
// to be able to remove unused node, that can be added during split
|
|
|
|
|
treeNodes := make(map[string]*treeNode, len(keys))
|
|
|
|
|
|
|
|
|
|
for _, s := range subtree {
|
|
|
|
@ -717,26 +806,6 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
|
|
|
|
return nodes[targetIndexNode], nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getLatestNode(nodes []NodeResponse) NodeResponse {
|
|
|
|
|
if len(nodes) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
index int
|
|
|
|
|
maxTimestamp uint64
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for i, node := range nodes {
|
|
|
|
|
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
|
|
|
|
|
index = i
|
|
|
|
|
maxTimestamp = timestamp
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nodes[index]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getMaxTimestamp(node NodeResponse) uint64 {
|
|
|
|
|
var maxTimestamp uint64
|
|
|
|
|
|
|
|
|
@ -1586,11 +1655,11 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
|
|
|
|
|
return info.Meta
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) {
|
|
|
|
|
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
|
|
|
|
|
p := &GetNodesParams{
|
|
|
|
|
BktInfo: bktInfo,
|
|
|
|
|
TreeID: systemTree,
|
|
|
|
|
Path: path,
|
|
|
|
|
Path: []string{name},
|
|
|
|
|
LatestOnly: false,
|
|
|
|
|
AllAttrs: true,
|
|
|
|
|
}
|
|
|
|
@ -1605,10 +1674,10 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path
|
|
|
|
|
return nil, layer.ErrNodeNotFound
|
|
|
|
|
}
|
|
|
|
|
if len(nodes) != 1 {
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/")))
|
|
|
|
|
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return newTreeNode(getLatestNode(nodes))
|
|
|
|
|
return newMultiNode(nodes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
|
|
|
|
|