diff --git a/api/layer/cors.go b/api/layer/cors.go index a9dcdc0..bdd2c2a 100644 --- a/api/layer/cors.go +++ b/api/layer/cors.go @@ -60,14 +60,16 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error { return fmt.Errorf("put cors object: %w", err) } - objToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID)) + objsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID)) objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) if err != nil && !objToDeleteNotFound { return err } if !objToDeleteNotFound { - n.deleteCORSObject(ctx, p.BktInfo, objToDelete) + for _, addr := range objsToDelete { + n.deleteCORSObject(ctx, p.BktInfo, addr) + } } n.cache.PutCORS(n.BearerOwner(ctx), p.BktInfo, cors) @@ -101,22 +103,15 @@ func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d } func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error { - obj, err := n.treeService.DeleteBucketCORS(ctx, bktInfo) + objs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo) objNotFound := errorsStd.Is(err, ErrNoNodeToRemove) if err != nil && !objNotFound { return err } if !objNotFound { - var prmAuth PrmAuth - corsBkt := bktInfo - if !obj.Container().Equals(bktInfo.CID) && !obj.Container().Equals(cid.ID{}) { - corsBkt = &data.BucketInfo{CID: obj.Container()} - prmAuth.PrivateKey = &n.gateKey.PrivateKey - } - - if err = n.objectDeleteWithAuth(ctx, corsBkt, obj.Object(), prmAuth); err != nil { - return fmt.Errorf("delete cors object: %w", err) + for _, addr := range objs { + n.deleteCORSObject(ctx, bktInfo, addr) } } diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 5b0fa49..daac689 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -127,7 +127,7 @@ func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketI return addr, nil } -func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { +func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) { systemMap, ok := t.system[bktInfo.CID.EncodeToString()] if !ok { systemMap = make(map[string]*data.BaseNodeVersion) @@ -139,10 +139,10 @@ func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketI t.system[bktInfo.CID.EncodeToString()] = systemMap - return oid.Address{}, ErrNoNodeToRemove + return nil, ErrNoNodeToRemove } -func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) (oid.Address, error) { +func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.Address, error) { panic("implement me") } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index dec1985..9f31051 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -25,13 +25,13 @@ type TreeService interface { // PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS. // - // If object id to remove is not found returns ErrNoNodeToRemove error. - PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) + // If object ids to remove is not found returns ErrNoNodeToRemove error. + PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) // DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS. // - // If object id to remove is not found returns ErrNoNodeToRemove error. - DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) + // If object ids to remove is not found returns ErrNoNodeToRemove error. + DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error diff --git a/internal/logs/logs.go b/internal/logs/logs.go index a298663..d875eb7 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -142,12 +142,15 @@ const ( CouldntCacheSubject = "couldn't cache subject info" UserGroupsListIsEmpty = "user groups list is empty, subject not found" CouldntCacheUserKey = "couldn't cache user key" - FoundSeveralBucketCorsNodes = "found several bucket cors nodes, latest be used" - FoundSeveralObjectTaggingNodes = "found several object tagging nodes, latest be used" - FoundSeveralBucketTaggingNodes = "found several bucket tagging nodes, latest be used" - FoundSeveralBucketSettingsNodes = "found several bucket settings nodes, latest be used" + ObjectTaggingNodeHasMultipleIDs = "object tagging node has multiple ids" + BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids" + BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids" + BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids" + SystemNodeHasMultipleIDs = "system node has multiple ids" + FailedToRemoveOldSystemNode = "failed to remove old system node" + FailedToParseAddressInTreeNode = "failed to parse object addr in tree node" UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts" - FoundSeveralSystemNodes = "found several system nodes, latest be used" + FoundSeveralSystemNodes = "found several system nodes" FailedToParsePartInfo = "failed to parse part info" CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info" CloseCredsObjectPayload = "close creds object payload" diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 33835aa..350f235 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -54,6 +54,11 @@ type ( Meta map[string]string } + multiSystemNode struct { + // the first element is latest + nodes []*treeNode + } + GetNodesParams struct { BktInfo *data.BucketInfo TreeID string @@ -270,6 +275,45 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree return version, nil } +func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) { + var ( + err error + index int + maxTimestamp uint64 + ) + + if len(nodes) == 0 { + return nil, errors.New("multi node must have at least one node") + } + + treeNodes := make([]*treeNode, len(nodes)) + + for i, node := range nodes { + if treeNodes[i], err = newTreeNode(node); err != nil { + return nil, fmt.Errorf("parse system node response: %w", err) + } + + if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp { + index = i + maxTimestamp = timestamp + } + } + + treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0] + + return &multiSystemNode{ + nodes: treeNodes, + }, nil +} + +func (m *multiSystemNode) Latest() *treeNode { + return m.nodes[0] +} + +func (m *multiSystemNode) Old() []*treeNode { + return m.nodes[1:] +} + func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) { uploadID, _ := treeNode.Get(uploadIDKV) if uploadID == "" { @@ -396,11 +440,13 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { } func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) + multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName) if err != nil { return nil, fmt.Errorf("couldn't get node: %w", err) } + node := multiNode.Latest() + settings := &data.BucketSettings{Versioning: data.VersioningUnversioned} if versioningValue, ok := node.Get(versioningKV); ok { settings.Versioning = versioningValue @@ -424,7 +470,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (* } func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { - node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}) + multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) @@ -437,28 +483,35 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se return err } - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes) + latest := multiNode.Latest() + ind := latest.GetLatestNodeIndex() + if latest.IsSplit() { + c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID)) } - return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil { + return fmt.Errorf("move settings node: %w", err) + } + + c.cleanOldNodes(ctx, multiNode.Old(), bktInfo) + + return nil } func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) + node, err := c.getSystemNode(ctx, bktInfo, corsFilename) if err != nil { return oid.Address{}, err } - return getCORSAddress(node) + return getCORSAddress(node.Latest()) } -func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) +func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) { + multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { - return oid.Address{}, fmt.Errorf("couldn't get node: %w", err) + return nil, fmt.Errorf("couldn't get node: %w", err) } meta := make(map[string]string) @@ -468,45 +521,49 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr if isErrNotFound { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil { - return oid.Address{}, err + return nil, err } - return oid.Address{}, layer.ErrNoNodeToRemove + return nil, layer.ErrNoNodeToRemove } - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) + latest := multiNode.Latest() + ind := latest.GetLatestNodeIndex() + if latest.IsSplit() { + c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs) } - prevAddr, err := getCORSAddress(node) + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil { + return nil, fmt.Errorf("move cors node: %w", err) + } + + objToDelete := make([]oid.Address, 1, len(multiNode.nodes)) + objToDelete[0], err = getCORSAddress(latest) if err != nil { - return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) + return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err) } - return prevAddr, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta) + objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...) + + return objToDelete, nil } -func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}) - if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { - return oid.Address{}, err +func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) { + multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename) + isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) + if err != nil && !isErrNotFound { + return nil, err } - if node != nil { - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes) - } - - addr, err := getCORSAddress(node) - if err != nil { - return oid.Address{}, fmt.Errorf("couldn't get cors object addr: %w", err) - } - - return addr, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]) + if isErrNotFound { + return nil, layer.ErrNoNodeToRemove } - return oid.Address{}, layer.ErrNoNodeToRemove + objToDelete := c.cleanOldNodes(ctx, multiNode.nodes, bktInfo) + if len(objToDelete) != len(multiNode.nodes) { + return nil, fmt.Errorf("clean old cors nodes: %w", err) + } + + return objToDelete, nil } func getCORSAddress(node *treeNode) (oid.Address, error) { @@ -524,6 +581,29 @@ func getCORSAddress(node *treeNode) (oid.Address, error) { return addr, nil } +func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address { + res := make([]oid.Address, 0, len(nodes)) + + for _, node := range nodes { + ind := node.GetLatestNodeIndex() + if node.IsSplit() { + c.reqLogger(ctx).Error(logs.SystemNodeHasMultipleIDs, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64s("ids", node.ID)) + } + if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil { + c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind])) + } else { + addr, err := getCORSAddress(node) + if err != nil { + c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind])) + continue + } + res = append(res, addr) + } + } + + return res +} + func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) { tagNode, err := c.getTreeNode(ctx, bktInfo, objVersion.ID, isTagKV) if err != nil { @@ -569,7 +649,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o ind := tagNode.GetLatestNodeIndex() if tagNode.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes) + c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs) } return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet) @@ -580,14 +660,14 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo } func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { - node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) + multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename) if err != nil { return nil, err } tags := make(map[string]string) - for key, val := range node.Meta { + for key, val := range multiNode.Latest().Meta { if strings.HasPrefix(key, userDefinedTagPrefix) { tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val } @@ -597,7 +677,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) ( } func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error { - node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}) + multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) @@ -615,12 +695,19 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t return err } - ind := node.GetLatestNodeIndex() - if node.IsSplit() { - c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes) + latest := multiNode.Latest() + ind := latest.GetLatestNodeIndex() + if latest.IsSplit() { + c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID)) } - return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet) + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil { + return fmt.Errorf("move bucket tagging node: %w", err) + } + + c.cleanOldNodes(ctx, multiNode.Old(), bktInfo) + + return nil } func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error { @@ -643,6 +730,8 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI return nil, err } + // consider using map[string][]*treeNode + // to be able to remove unused node, that can be added during split treeNodes := make(map[string]*treeNode, len(keys)) for _, s := range subtree { @@ -717,26 +806,6 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) { return nodes[targetIndexNode], nil } -func getLatestNode(nodes []NodeResponse) NodeResponse { - if len(nodes) == 0 { - return nil - } - - var ( - index int - maxTimestamp uint64 - ) - - for i, node := range nodes { - if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp { - index = i - maxTimestamp = timestamp - } - } - - return nodes[index] -} - func getMaxTimestamp(node NodeResponse) uint64 { var maxTimestamp uint64 @@ -1586,11 +1655,11 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str return info.Meta } -func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) { +func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) { p := &GetNodesParams{ BktInfo: bktInfo, TreeID: systemTree, - Path: path, + Path: []string{name}, LatestOnly: false, AllAttrs: true, } @@ -1605,10 +1674,10 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path return nil, layer.ErrNodeNotFound } if len(nodes) != 1 { - c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/"))) + c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name)) } - return newTreeNode(getLatestNode(nodes)) + return newMultiNode(nodes) } func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {