[#437] tree: Support removing old split system nodes
All checks were successful
/ DCO (pull_request) Successful in 1m47s
/ Builds (1.21) (pull_request) Successful in 2m20s
/ Builds (1.22) (pull_request) Successful in 2m22s
/ Vulncheck (pull_request) Successful in 2m6s
/ Lint (pull_request) Successful in 4m43s
/ Tests (1.21) (pull_request) Successful in 2m30s
/ Tests (1.22) (pull_request) Successful in 2m25s
All checks were successful
/ DCO (pull_request) Successful in 1m47s
/ Builds (1.21) (pull_request) Successful in 2m20s
/ Builds (1.22) (pull_request) Successful in 2m22s
/ Vulncheck (pull_request) Successful in 2m6s
/ Lint (pull_request) Successful in 4m43s
/ Tests (1.21) (pull_request) Successful in 2m30s
/ Tests (1.22) (pull_request) Successful in 2m25s
It's need to fit user expectation on deleting CORs for example. Previously after removing cors (that was uploaded in split manner) we can still get some data (from other node) because deletion worked only for latest node version. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
a031777a1b
commit
689f7ee818
5 changed files with 174 additions and 77 deletions
|
@ -49,17 +49,19 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||||
return fmt.Errorf("put system object: %w", err)
|
return fmt.Errorf("put system object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objIDToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, objID)
|
objIDsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, objID)
|
||||||
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||||
if err != nil && !objIDToDeleteNotFound {
|
if err != nil && !objIDToDeleteNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !objIDToDeleteNotFound {
|
if !objIDToDeleteNotFound {
|
||||||
if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil {
|
for _, id := range objIDsToDelete {
|
||||||
|
if err = n.objectDelete(ctx, p.BktInfo, id); err != nil {
|
||||||
n.reqLogger(ctx).Error(logs.CouldntDeleteCorsObject, zap.Error(err),
|
n.reqLogger(ctx).Error(logs.CouldntDeleteCorsObject, zap.Error(err),
|
||||||
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
||||||
zap.String("objID", objIDToDelete.EncodeToString()))
|
zap.String("objID", id.EncodeToString()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,16 +83,18 @@ func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||||
objID, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
|
objIDs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
|
||||||
objIDNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
objIDNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||||
if err != nil && !objIDNotFound {
|
if err != nil && !objIDNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !objIDNotFound {
|
if !objIDNotFound {
|
||||||
if err = n.objectDelete(ctx, bktInfo, objID); err != nil {
|
for _, id := range objIDs {
|
||||||
|
if err = n.objectDelete(ctx, bktInfo, id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
n.cache.DeleteCORS(bktInfo)
|
n.cache.DeleteCORS(bktInfo)
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketI
|
||||||
return node.OID, nil
|
return node.OID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
|
func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error) {
|
||||||
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
|
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
systemMap = make(map[string]*data.BaseNodeVersion)
|
systemMap = make(map[string]*data.BaseNodeVersion)
|
||||||
|
@ -136,10 +136,10 @@ func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketI
|
||||||
|
|
||||||
t.system[bktInfo.CID.EncodeToString()] = systemMap
|
t.system[bktInfo.CID.EncodeToString()] = systemMap
|
||||||
|
|
||||||
return oid.ID{}, ErrNoNodeToRemove
|
return nil, ErrNoNodeToRemove
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) (oid.ID, error) {
|
func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.ID, error) {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,13 +25,13 @@ type TreeService interface {
|
||||||
|
|
||||||
// PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS.
|
// PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS.
|
||||||
//
|
//
|
||||||
// If object id to remove is not found returns ErrNoNodeToRemove error.
|
// If object ids to remove is not found returns ErrNoNodeToRemove error.
|
||||||
PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error)
|
PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error)
|
||||||
|
|
||||||
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS.
|
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS.
|
||||||
//
|
//
|
||||||
// If object id to remove is not found returns ErrNoNodeToRemove error.
|
// If object ids to remove is not found returns ErrNoNodeToRemove error.
|
||||||
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
|
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.ID, error)
|
||||||
|
|
||||||
GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error)
|
GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error)
|
||||||
PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error
|
PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error
|
||||||
|
|
|
@ -142,11 +142,15 @@ const (
|
||||||
CouldntCacheSubject = "couldn't cache subject info"
|
CouldntCacheSubject = "couldn't cache subject info"
|
||||||
UserGroupsListIsEmpty = "user groups list is empty, subject not found"
|
UserGroupsListIsEmpty = "user groups list is empty, subject not found"
|
||||||
CouldntCacheUserKey = "couldn't cache user key"
|
CouldntCacheUserKey = "couldn't cache user key"
|
||||||
FoundSeveralBucketCorsNodes = "found several bucket cors nodes, latest be used"
|
ObjectTaggingNodeHasMultipleIDs = "object tagging node has multiple ids"
|
||||||
FoundSeveralObjectTaggingNodes = "found several object tagging nodes, latest be used"
|
BucketTaggingNodeHasMultipleIDs = "bucket tagging node has multiple ids"
|
||||||
FoundSeveralBucketTaggingNodes = "found several bucket tagging nodes, latest be used"
|
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
|
||||||
FoundSeveralBucketSettingsNodes = "found several bucket settings nodes, latest be used"
|
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
|
||||||
|
FailedToRemoveOldBucketSettingsNode = "failed to remove old bucket settings node"
|
||||||
|
FailedToRemoveOldBucketTaggingNode = "failed to remove old bucket tagging node"
|
||||||
|
FailedToRemoveOldBucketCORSNode = "failed to remove old bucket cors node"
|
||||||
|
FailedToRemoveOldPartNode = "failed to remove old part node"
|
||||||
UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts"
|
UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts"
|
||||||
FoundSeveralSystemNodes = "found several system nodes, latest be used"
|
FoundSeveralSystemNodes = "found several system nodes"
|
||||||
FailedToParsePartInfo = "failed to parse part info"
|
FailedToParsePartInfo = "failed to parse part info"
|
||||||
)
|
)
|
||||||
|
|
|
@ -53,6 +53,11 @@ type (
|
||||||
Meta map[string]string
|
Meta map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
multiSystemNode struct {
|
||||||
|
// the first element is latest
|
||||||
|
nodes []*treeNode
|
||||||
|
}
|
||||||
|
|
||||||
GetNodesParams struct {
|
GetNodesParams struct {
|
||||||
BktInfo *data.BucketInfo
|
BktInfo *data.BucketInfo
|
||||||
TreeID string
|
TreeID string
|
||||||
|
@ -268,6 +273,45 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
|
||||||
return version, nil
|
return version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
index int
|
||||||
|
maxTimestamp uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
return nil, errors.New("multi node must have at least one node")
|
||||||
|
}
|
||||||
|
|
||||||
|
treeNodes := make([]*treeNode, len(nodes))
|
||||||
|
|
||||||
|
for i, node := range nodes {
|
||||||
|
if treeNodes[i], err = newTreeNode(node); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse system node response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
|
||||||
|
index = i
|
||||||
|
maxTimestamp = timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
|
||||||
|
|
||||||
|
return &multiSystemNode{
|
||||||
|
nodes: treeNodes,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *multiSystemNode) Latest() *treeNode {
|
||||||
|
return m.nodes[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *multiSystemNode) Old() []*treeNode {
|
||||||
|
return m.nodes[1:]
|
||||||
|
}
|
||||||
|
|
||||||
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
|
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
|
||||||
uploadID, _ := treeNode.Get(uploadIDKV)
|
uploadID, _ := treeNode.Get(uploadIDKV)
|
||||||
if uploadID == "" {
|
if uploadID == "" {
|
||||||
|
@ -394,11 +438,13 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
||||||
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
|
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't get node: %w", err)
|
return nil, fmt.Errorf("couldn't get node: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node := multiNode.Latest()
|
||||||
|
|
||||||
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
|
settings := &data.BucketSettings{Versioning: data.VersioningUnversioned}
|
||||||
if versioningValue, ok := node.Get(versioningKV); ok {
|
if versioningValue, ok := node.Get(versioningKV); ok {
|
||||||
settings.Versioning = versioningValue
|
settings.Versioning = versioningValue
|
||||||
|
@ -422,7 +468,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
|
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
|
||||||
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
|
multiNode, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
||||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||||
if err != nil && !isErrNotFound {
|
if err != nil && !isErrNotFound {
|
||||||
return fmt.Errorf("couldn't get node: %w", err)
|
return fmt.Errorf("couldn't get node: %w", err)
|
||||||
|
@ -435,28 +481,43 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ind := node.GetLatestNodeIndex()
|
latest := multiNode.Latest()
|
||||||
if node.IsSplit() {
|
ind := latest.GetLatestNodeIndex()
|
||||||
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketSettingsNodes)
|
if latest.IsSplit() {
|
||||||
|
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
|
||||||
|
return fmt.Errorf("move settings node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range multiNode.Old() {
|
||||||
|
ind = node.GetLatestNodeIndex()
|
||||||
|
if node.IsSplit() {
|
||||||
|
c.reqLogger(ctx).Error(logs.BucketSettingsNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
|
||||||
|
}
|
||||||
|
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
|
||||||
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldBucketSettingsNode, zap.Uint64("id", node.ID[ind]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
||||||
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
|
node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oid.ID{}, err
|
return oid.ID{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return node.ObjID, nil
|
return node.Latest().ObjID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
|
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error) {
|
||||||
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
|
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
||||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||||
if err != nil && !isErrNotFound {
|
if err != nil && !isErrNotFound {
|
||||||
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
|
return nil, fmt.Errorf("couldn't get node: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := make(map[string]string)
|
meta := make(map[string]string)
|
||||||
|
@ -465,35 +526,66 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objI
|
||||||
|
|
||||||
if isErrNotFound {
|
if isErrNotFound {
|
||||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
|
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return oid.ID{}, layer.ErrNoNodeToRemove
|
return nil, layer.ErrNoNodeToRemove
|
||||||
}
|
}
|
||||||
|
|
||||||
ind := node.GetLatestNodeIndex()
|
latest := multiNode.Latest()
|
||||||
|
ind := latest.GetLatestNodeIndex()
|
||||||
|
if latest.IsSplit() {
|
||||||
|
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, meta); err != nil {
|
||||||
|
return nil, fmt.Errorf("move cors node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objToDelete := make([]oid.ID, 1, len(multiNode.nodes))
|
||||||
|
objToDelete[0] = latest.ObjID
|
||||||
|
|
||||||
|
for _, node := range multiNode.Old() {
|
||||||
|
ind = node.GetLatestNodeIndex()
|
||||||
if node.IsSplit() {
|
if node.IsSplit() {
|
||||||
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
|
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
|
||||||
|
}
|
||||||
|
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
|
||||||
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldBucketCORSNode, zap.Uint64("id", node.ID[ind]))
|
||||||
|
} else {
|
||||||
|
objToDelete = append(objToDelete, node.ObjID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
|
return objToDelete, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.ID, error) {
|
||||||
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
|
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
||||||
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||||
return oid.ID{}, err
|
if err != nil && !isErrNotFound {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if node != nil {
|
if isErrNotFound {
|
||||||
|
return nil, layer.ErrNoNodeToRemove
|
||||||
|
}
|
||||||
|
|
||||||
|
objToDelete := make([]oid.ID, len(multiNode.nodes))
|
||||||
|
|
||||||
|
for i, node := range multiNode.nodes {
|
||||||
ind := node.GetLatestNodeIndex()
|
ind := node.GetLatestNodeIndex()
|
||||||
if node.IsSplit() {
|
if node.IsSplit() {
|
||||||
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketCorsNodes)
|
c.reqLogger(ctx).Error(logs.BucketCORSNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
return node.ObjID, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind])
|
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
|
||||||
|
return nil, fmt.Errorf("delete cors node '%d': %w", node.ID[ind], err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return oid.ID{}, layer.ErrNoNodeToRemove
|
objToDelete[i] = node.ObjID
|
||||||
|
}
|
||||||
|
|
||||||
|
return objToDelete, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
|
func (c *Tree) GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error) {
|
||||||
|
@ -541,7 +633,7 @@ func (c *Tree) PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, o
|
||||||
|
|
||||||
ind := tagNode.GetLatestNodeIndex()
|
ind := tagNode.GetLatestNodeIndex()
|
||||||
if tagNode.IsSplit() {
|
if tagNode.IsSplit() {
|
||||||
c.reqLogger(ctx).Warn(logs.FoundSeveralObjectTaggingNodes)
|
c.reqLogger(ctx).Error(logs.ObjectTaggingNodeHasMultipleIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
|
return c.service.MoveNode(ctx, bktInfo, versionTree, tagNode.ID[ind], objVersion.ID, treeTagSet)
|
||||||
|
@ -552,14 +644,14 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
||||||
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
|
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := make(map[string]string)
|
tags := make(map[string]string)
|
||||||
|
|
||||||
for key, val := range node.Meta {
|
for key, val := range multiNode.Latest().Meta {
|
||||||
if strings.HasPrefix(key, userDefinedTagPrefix) {
|
if strings.HasPrefix(key, userDefinedTagPrefix) {
|
||||||
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
|
tags[strings.TrimPrefix(key, userDefinedTagPrefix)] = val
|
||||||
}
|
}
|
||||||
|
@ -569,7 +661,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
||||||
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
|
multiNode, err := c.getSystemNode(ctx, bktInfo, bucketTaggingFilename)
|
||||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||||
if err != nil && !isErrNotFound {
|
if err != nil && !isErrNotFound {
|
||||||
return fmt.Errorf("couldn't get node: %w", err)
|
return fmt.Errorf("couldn't get node: %w", err)
|
||||||
|
@ -587,12 +679,27 @@ func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, t
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ind := node.GetLatestNodeIndex()
|
latest := multiNode.Latest()
|
||||||
if node.IsSplit() {
|
ind := latest.GetLatestNodeIndex()
|
||||||
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketTaggingNodes)
|
if latest.IsSplit() {
|
||||||
|
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", latest.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, treeTagSet)
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latest.ID[ind], 0, treeTagSet); err != nil {
|
||||||
|
return fmt.Errorf("move bucket tagging node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range multiNode.Old() {
|
||||||
|
ind = node.GetLatestNodeIndex()
|
||||||
|
if node.IsSplit() {
|
||||||
|
c.reqLogger(ctx).Error(logs.BucketTaggingNodeHasMultipleIDs, zap.Uint64s("ids", node.ID))
|
||||||
|
}
|
||||||
|
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
|
||||||
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldBucketTaggingNode, zap.Uint64("id", node.ID[ind]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
func (c *Tree) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||||
|
@ -615,6 +722,8 @@ func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeI
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// consider using map[string][]*treeNode
|
||||||
|
// to be able to remove unused node, that can be added during split
|
||||||
treeNodes := make(map[string]*treeNode, len(keys))
|
treeNodes := make(map[string]*treeNode, len(keys))
|
||||||
|
|
||||||
for _, s := range subtree {
|
for _, s := range subtree {
|
||||||
|
@ -689,26 +798,6 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||||
return nodes[targetIndexNode], nil
|
return nodes[targetIndexNode], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLatestNode(nodes []NodeResponse) NodeResponse {
|
|
||||||
if len(nodes) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
index int
|
|
||||||
maxTimestamp uint64
|
|
||||||
)
|
|
||||||
|
|
||||||
for i, node := range nodes {
|
|
||||||
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
|
|
||||||
index = i
|
|
||||||
maxTimestamp = timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodes[index]
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMaxTimestamp(node NodeResponse) uint64 {
|
func getMaxTimestamp(node NodeResponse) uint64 {
|
||||||
var maxTimestamp uint64
|
var maxTimestamp uint64
|
||||||
|
|
||||||
|
@ -1558,11 +1647,11 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
|
||||||
return info.Meta
|
return info.Meta
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) {
|
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
|
||||||
p := &GetNodesParams{
|
p := &GetNodesParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
TreeID: systemTree,
|
TreeID: systemTree,
|
||||||
Path: path,
|
Path: []string{name},
|
||||||
LatestOnly: false,
|
LatestOnly: false,
|
||||||
AllAttrs: true,
|
AllAttrs: true,
|
||||||
}
|
}
|
||||||
|
@ -1577,10 +1666,10 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path
|
||||||
return nil, layer.ErrNodeNotFound
|
return nil, layer.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
if len(nodes) != 1 {
|
if len(nodes) != 1 {
|
||||||
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("path", strings.Join(path, "/")))
|
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemNodes, zap.String("name", name))
|
||||||
}
|
}
|
||||||
|
|
||||||
return newTreeNode(getLatestNode(nodes))
|
return newMultiNode(nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
|
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
|
||||||
|
|
Loading…
Reference in a new issue