From f5326b9f04a4fbf09df5dcc4606a9c9b817f21d5 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 17 Jul 2024 12:44:38 +0300 Subject: [PATCH] [#437] tree: Support removing old split system nodes It's need to fit user expectation on deleting CORS for example. Previously after removing cors (that was uploaded in split manner) we can still get some data (from other node) because deletion worked only for latest node version. Signed-off-by: Denis Kirillov Signed-off-by: Alex Vanin --- api/layer/cors.go | 19 ++-- api/layer/tree_mock.go | 6 +- api/layer/tree_service.go | 8 +- internal/logs/logs.go | 13 ++- pkg/service/tree/tree.go | 205 +++++++++++++++++++++++++------------- 5 files changed, 159 insertions(+), 92 deletions(-) diff --git a/api/layer/cors.go b/api/layer/cors.go index a9dcdc02..bdd2c2a0 100644 --- a/api/layer/cors.go +++ b/api/layer/cors.go @@ -60,14 +60,16 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error { return fmt.Errorf("put cors object: %w", err) } - objToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID)) + objsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID)) objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) if err != nil && !objToDeleteNotFound { return err } if !objToDeleteNotFound { - n.deleteCORSObject(ctx, p.BktInfo, objToDelete) + for _, addr := range objsToDelete { + n.deleteCORSObject(ctx, p.BktInfo, addr) + } } n.cache.PutCORS(n.BearerOwner(ctx), p.BktInfo, cors) @@ -101,22 +103,15 @@ func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d } func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error { - obj, err := n.treeService.DeleteBucketCORS(ctx, bktInfo) + objs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo) objNotFound := errorsStd.Is(err, ErrNoNodeToRemove) if err != nil && !objNotFound { return err } if !objNotFound { - var prmAuth PrmAuth - corsBkt := bktInfo - if !obj.Container().Equals(bktInfo.CID) && !obj.Container().Equals(cid.ID{}) { - corsBkt = &data.BucketInfo{CID: obj.Container()} - prmAuth.PrivateKey = &n.gateKey.PrivateKey - } - - if err = n.objectDeleteWithAuth(ctx, corsBkt, obj.Object(), prmAuth); err != nil { - return fmt.Errorf("delete cors object: %w", err) + for _, addr := range objs { + n.deleteCORSObject(ctx, bktInfo, addr) } } diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 5b0fa49f..daac6893 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -127,7 +127,7 @@ func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketI return addr, nil } -func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { +func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) { systemMap, ok := t.system[bktInfo.CID.EncodeToString()] if !ok { systemMap = make(map[string]*data.BaseNodeVersion) @@ -139,10 +139,10 @@ func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketI t.system[bktInfo.CID.EncodeToString()] = systemMap - return oid.Address{}, ErrNoNodeToRemove + return nil, ErrNoNodeToRemove } -func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) (oid.Address, error) { +func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.Address, error) { panic("implement me") } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index dec19854..9f310515 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -25,13 +25,13 @@ type TreeService interface { // PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS. // - // If object id to remove is not found returns ErrNoNodeToRemove error. - PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) + // If object ids to remove is not found returns ErrNoNodeToRemove error. + PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) // DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS. // - // If object id to remove is not found returns ErrNoNodeToRemove error. - DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) + // If object ids to remove is not found returns ErrNoNodeToRemove error. + DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error diff --git a/internal/logs/logs.go b/internal/logs/logs.go index a298663b..d875eb74 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -142,12 +142,15 @@ const ( CouldntCacheSubject = "couldn't cache subject info" UserGroupsListIsEmpty = "user groups list is empty, subject not found" CouldntCacheUserKey = "couldn't cache user key" - FoundSeveralBucketCorsNodes = "found several bucket cors nodes, latest be used" - FoundSeveralObjectTaggingNodes = "found several object tagging nodes, latest be used" - FoundSeveralBucketTaggingNodes = "found several bucket tagging nodes, latest be used" - FoundSeveralBucketSettingsNodes = "found several bucket settings nodes, latest be used" + ObjectTaggingNodeHasMultipleIDs = "object tagging node has multiple ids" + BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids" + BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids" + BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids" + SystemNodeHasMultipleIDs = "system node has multiple ids" + FailedToRemoveOldSystemNode = "failed to remove old system node" + FailedToParseAddressInTreeNode = "failed to parse object addr in tree node" UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts" - FoundSeveralSystemNodes = "found several system nodes, latest be used" + FoundSeveralSystemNodes = "found several system nodes" FailedToParsePartInfo = "failed to parse part info" CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info" CloseCredsObjectPayload = "close creds object payload" diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 33835aa2..350f235b 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -54,6 +54,11 @@ type ( Meta map[string]string } + multiSystemNode struct { + // the first element is latest + nodes []*treeNode + } + GetNodesParams struct { BktInfo *data.BucketInfo TreeID string @@ -270,6 +275,45 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree return version, nil } +func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) { + var ( + err error + index int + maxTimestamp uint64 + ) + + if len(nodes) == 0 { + return nil, errors.New("multi node must have at least one node") + } + + treeNodes := make([]*treeNode, len(nodes)) + + for i, node := range nodes { + if treeNodes[i], err = newTreeNode(node); err != nil { + return nil, fmt.Errorf("parse system node response: %w", err) + } + + if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp { + index = i + maxTimestamp = timestamp + } + } + + treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0] + + return &multiSystemNode{ + nodes: treeNodes, + }, nil +} + +func (m *multiSystemNode) Latest() *treeNode { + return m.nodes[0] +} + +func (m *multiSystemNode) Old() []*treeNode { + return m.nodes[1:] +} + func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) { uploadID, _ := treeNode.Get(uploadIDKV) if uploadID == "" { @@ -396,11 +440,13 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { } func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) + multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName) if err != nil { return nil, fmt.Errorf("couldn't get node: %w", err) } + node := multiNode.Latest() + settings := &data.BucketSettings{Versioning: data.VersioningUnversioned} if versioningValue, ok := node.Get(versioningKV); ok { settings.Versioning = versioningValue @@ -424,7 +470,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (* } func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { - node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) + multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) @@ -437,28 +483,35 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se return err } - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes) + latest := multiNode.Latest() + ind := latest.GetLatestNodeIndex() + if latest.IsSplit() { + c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID)) } - return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil { + return fmt.Errorf("move settings node: %w", err) + } + + c.cleanOldNodes(ctx, multiNode.Old(), bktInfo) + + return nil } func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) + node, err := c.getSystemNode(ctx, bktInfo, corsFilename) if err != nil { return oid.Address{}, err } - return getCORSAddress(node) + return getCORSAddress(node.Latest()) } -func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) +func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) { + multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { - return oid.Address{}, fmt.Errorf("couldn't get node: %w", err) + return nil, fmt.Errorf("couldn't get node: %w", err) } meta := make(map[string]string) @@ -468,45 +521,49 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr if isErrNotFound { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil { - return oid.Address{}, err + return nil, err } - return oid.Address{}, layer.ErrNoNodeToRemove + return nil, layer.ErrNoNodeToRemove } - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) + latest := multiNode.Latest() + ind := latest.GetLatestNodeIndex() + if latest.IsSplit() { + c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs) } - prevAddr, err := getCORSAddress(node) + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil { + return nil, fmt.Errorf("move cors node: %w", err) + } + + objToDelete := make([]oid.Address, 1, len(multiNode.nodes)) + objToDelete[0], err = getCORSAddress(latest) if err != nil { - return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) + return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err) } - return prevAddr, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) + objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...) + + return objToDelete, nil } -func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) - if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { - return oid.Address{}, err +func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) { + multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename) + isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) + if err != nil && !isErrNotFound { + return nil, err } - if node != nil { - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) - } - - addr, err := getCORSAddress(node) - if err != nil { - return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) - } - - return addr, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]) + if isErrNotFound { + return nil, layer.ErrNoNodeToRemove } - return oid.Address{}, layer.ErrNoNodeToRemove + objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo) + if len(objToDelete) != len(multiNode.nodes) { + return nil, fmt.Errorf("clean old cors nodes: %w", err) + } + + return objToDelete, nil } func getCORSAddress(node *treeNode) (oid.Address, error) { @@ -524,6 +581,29 @@ func getCORSAddress(node *treeNode) (oid.Address, error) { return addr, nil } +func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address { + res := make([]oid.Address, 0, len(nodes)) + + for _, node := range nodes { + ind := node.GetLatestNodeIndex() + if node.IsSplit() { + c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID)) + } + if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil { + c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind])) + } else { + addr, err := getCORSAddress(node) + if err != nil { + c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind])) + continue + } + res = append(res, addr) + } + } + + return res +} + func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) { tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV) if err != nil { @@ -569,7 +649,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o ind := tagNode.GetLatestNodeIndex() if tagNode.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes) + c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs) } return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet) @@ -580,14 +660,14 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo } func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) + multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename) if err != nil { return nil, err } tags := make(map[string]string) - for key, val := range node.Meta { + for key, val := range multiNode.Latest().Meta { if strings.HasPrefix(key, userDefinedTagPrefix) { tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val } @@ -597,7 +677,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) ( } func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error { - node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) + multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) @@ -615,12 +695,19 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t return err } - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes) + latest := multiNode.Latest() + ind := latest.GetLatestNodeIndex() + if latest.IsSplit() { + c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID)) } - return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet) + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil { + return fmt.Errorf("move bucket tagging node: %w", err) + } + + c.cleanOldNodes(ctx, multiNode.Old(), bktInfo) + + return nil } func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error { @@ -643,6 +730,8 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI return nil, err } + // consider using map[string][]*treeNode + // to be able to remove unused node, that can be added during split treeNodes := make(map[string]*treeNode, len(keys)) for _, s := range subtree { @@ -717,26 +806,6 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) { return nodes[targetIndexNode], nil } -func getLatestNode(nodes []NodeResponse) NodeResponse { - if len(nodes) == 0 { - return nil - } - - var ( - index int - maxTimestamp uint64 - ) - - for i, node := range nodes { - if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp { - index = i - maxTimestamp = timestamp - } - } - - return nodes[index] -} - func getMaxTimestamp(node NodeResponse) uint64 { var maxTimestamp uint64 @@ -1586,11 +1655,11 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str return info.Meta } -func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) { +func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) { p := &GetNodesParams{ BktInfo: bktInfo, TreeID: systemTree, - Path: path, + Path: []string{name}, LatestOnly: false, AllAttrs: true, } @@ -1605,10 +1674,10 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path return nil, layer.ErrNodeNotFound } if len(nodes) != 1 { - c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/"))) + c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name)) } - return newTreeNode(getLatestNode(nodes)) + return newMultiNode(nodes) } func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {