diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cb3bee..c575d92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ This document outlines major changes between releases. - Implement chunk uploading (#106) - Add new `kludge.bypass_content_encoding_check_in_chunks` config param (#146) - Add new `frostfs.client_cut` config param (#192) +- Add selection of the node of the latest version of the object (#231) ### Changed - Update prometheus to v1.15.0 (#94) diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 1199cb2..57ee8c8 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -586,7 +586,7 @@ func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, o TreeID: versionTree, Path: path, Meta: meta, - LatestOnly: true, + LatestOnly: false, AllAttrs: false, } nodes, err := c.service.GetNodes(ctx, p) @@ -594,11 +594,43 @@ func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, o return nil, err } - if len(nodes) == 0 { + latestNode, err := getLatestNode(nodes) + if err != nil { + return nil, err + } + + return newNodeVersion(objectName, latestNode) +} + +func getLatestNode(nodes []NodeResponse) (NodeResponse, error) { + var ( + maxCreationTime uint64 + targetIndexNode = -1 + ) + + for i, node := range nodes { + currentCreationTime := node.GetTimestamp() + if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime { + maxCreationTime = currentCreationTime + targetIndexNode = i + } + } + + if targetIndexNode == -1 { return nil, layer.ErrNodeNotFound } - return newNodeVersion(objectName, nodes[0]) + return nodes[targetIndexNode], nil +} + +func checkExistOID(meta []Meta) bool { + for _, kv := range meta { + if kv.GetKey() == "OID" { + return true + } + } + + return false } // pathFromName splits name by '/'. diff --git a/pkg/service/tree/tree_test.go b/pkg/service/tree/tree_test.go index 2e39230..25160ad 100644 --- a/pkg/service/tree/tree_test.go +++ b/pkg/service/tree/tree_test.go @@ -168,3 +168,127 @@ func TestTreeServiceAddVersion(t *testing.T) { require.Len(t, versions, 1) require.Equal(t, storedNode, versions[0]) } + +func TestGetLatestNode(t *testing.T) { + for _, tc := range []struct { + name string + nodes []NodeResponse + exceptedNodeID uint64 + error bool + }{ + { + name: "empty", + nodes: []NodeResponse{}, + error: true, + }, + { + name: "one node of the object version", + nodes: []NodeResponse{ + nodeResponse{ + nodeID: 1, + parentID: 0, + timestamp: 1, + meta: []nodeMeta{ + { + key: oidKV, + value: []byte(oidtest.ID().String()), + }, + }, + }, + }, + exceptedNodeID: 1, + }, + { + name: "one node of the object version and one node of the secondary object", + nodes: []NodeResponse{ + nodeResponse{ + nodeID: 2, + parentID: 0, + timestamp: 3, + meta: []nodeMeta{}, + }, + nodeResponse{ + nodeID: 1, + parentID: 0, + timestamp: 1, + meta: []nodeMeta{ + { + key: oidKV, + value: []byte(oidtest.ID().String()), + }, + }, + }, + }, + exceptedNodeID: 1, + }, + { + name: "all nodes represent a secondary object", + nodes: []NodeResponse{ + nodeResponse{ + nodeID: 2, + parentID: 0, + timestamp: 3, + meta: []nodeMeta{}, + }, + nodeResponse{ + nodeID: 4, + parentID: 0, + timestamp: 5, + meta: []nodeMeta{}, + }, + }, + error: true, + }, + { + name: "several nodes of different types and with different timestamp", + nodes: []NodeResponse{ + nodeResponse{ + nodeID: 1, + parentID: 0, + timestamp: 1, + meta: []nodeMeta{ + { + key: oidKV, + value: []byte(oidtest.ID().String()), + }, + }, + }, + nodeResponse{ + nodeID: 3, + parentID: 0, + timestamp: 3, + meta: []nodeMeta{}, + }, + nodeResponse{ + nodeID: 4, + parentID: 0, + timestamp: 4, + meta: []nodeMeta{ + { + key: oidKV, + value: []byte(oidtest.ID().String()), + }, + }, + }, + nodeResponse{ + nodeID: 6, + parentID: 0, + timestamp: 6, + meta: []nodeMeta{}, + }, + }, + exceptedNodeID: 4, + }, + } { + t.Run(tc.name, func(t *testing.T) { + actualNode, err := getLatestNode(tc.nodes) + if tc.error { + require.Error(t, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.exceptedNodeID, actualNode.GetNodeID()) + }) + } +}