frostfs-node/pkg/local_object_storage/metabase/select_test.go
Pavel Karpy 4ff98a7e2b [#760] metabase: Support COMMON_PREFIX matchtype
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
2021-08-27 15:03:15 +03:00

683 lines
18 KiB
Go

package meta_test
import (
"encoding/hex"
"testing"
"github.com/nspcc-dev/neofs-api-go/pkg"
cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/stretchr/testify/require"
)
func TestDB_SelectUserAttributes(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
raw1 := generateRawObjectWithCID(t, cid)
addAttribute(raw1, "foo", "bar")
addAttribute(raw1, "x", "y")
err := putBig(db, raw1.Object())
require.NoError(t, err)
raw2 := generateRawObjectWithCID(t, cid)
addAttribute(raw2, "foo", "bar")
addAttribute(raw2, "x", "z")
err = putBig(db, raw2.Object())
require.NoError(t, err)
raw3 := generateRawObjectWithCID(t, cid)
addAttribute(raw3, "a", "b")
err = putBig(db, raw3.Object())
require.NoError(t, err)
raw4 := generateRawObjectWithCID(t, cid)
addAttribute(raw4, "path", "test/1/2")
err = putBig(db, raw4.Object())
require.NoError(t, err)
raw5 := generateRawObjectWithCID(t, cid)
addAttribute(raw5, "path", "test/1/3")
err = putBig(db, raw5.Object())
require.NoError(t, err)
raw6 := generateRawObjectWithCID(t, cid)
addAttribute(raw6, "path", "test/2/3")
err = putBig(db, raw6.Object())
require.NoError(t, err)
fs := objectSDK.SearchFilters{}
fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter("x", "y", objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, raw1.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter("x", "y", objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs, raw2.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter("a", "b", objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, raw3.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter("c", "d", objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs)
fs = objectSDK.SearchFilters{}
fs.AddFilter("foo", "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs,
raw3.Object().Address(),
raw4.Object().Address(),
raw5.Object().Address(),
raw6.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter("a", "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
raw4.Object().Address(),
raw5.Object().Address(),
raw6.Object().Address(),
)
fs = objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
raw3.Object().Address(),
raw4.Object().Address(),
raw5.Object().Address(),
raw6.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter("key", "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
raw3.Object().Address(),
raw4.Object().Address(),
raw5.Object().Address(),
raw6.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter("path", "test", objectSDK.MatchCommonPrefix)
testSelect(t, db, cid, fs,
raw4.Object().Address(),
raw5.Object().Address(),
raw6.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter("path", "test/1", objectSDK.MatchCommonPrefix)
testSelect(t, db, cid, fs,
raw4.Object().Address(),
raw5.Object().Address(),
)
}
func TestDB_SelectRootPhyParent(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
// prepare
small := generateRawObjectWithCID(t, cid)
err := putBig(db, small.Object())
require.NoError(t, err)
ts := generateRawObjectWithCID(t, cid)
ts.SetType(objectSDK.TypeTombstone)
err = putBig(db, ts.Object())
require.NoError(t, err)
sg := generateRawObjectWithCID(t, cid)
sg.SetType(objectSDK.TypeStorageGroup)
err = putBig(db, sg.Object())
require.NoError(t, err)
leftChild := generateRawObjectWithCID(t, cid)
leftChild.InitRelations()
err = putBig(db, leftChild.Object())
require.NoError(t, err)
parent := generateRawObjectWithCID(t, cid)
rightChild := generateRawObjectWithCID(t, cid)
rightChild.SetParent(parent.Object().SDK())
rightChild.SetParentID(parent.ID())
err = putBig(db, rightChild.Object())
require.NoError(t, err)
link := generateRawObjectWithCID(t, cid)
link.SetParent(parent.Object().SDK())
link.SetParentID(parent.ID())
link.SetChildren(leftChild.ID(), rightChild.ID())
err = putBig(db, link.Object())
require.NoError(t, err)
t.Run("root objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddRootFilter()
testSelect(t, db, cid, fs,
small.Object().Address(),
parent.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterPropertyRoot, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("phy objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddPhyFilter()
testSelect(t, db, cid, fs,
small.Object().Address(),
ts.Object().Address(),
sg.Object().Address(),
leftChild.Object().Address(),
rightChild.Object().Address(),
link.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterPropertyPhy, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("regular objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeRegular.String(), objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs,
small.Object().Address(),
leftChild.Object().Address(),
rightChild.Object().Address(),
link.Object().Address(),
parent.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeRegular.String(), objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs,
ts.Object().Address(),
sg.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("tombstone objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeTombstone.String(), objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, ts.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeTombstone.String(), objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs,
small.Object().Address(),
leftChild.Object().Address(),
rightChild.Object().Address(),
link.Object().Address(),
parent.Object().Address(),
sg.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("storage group objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeStorageGroup.String(), objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, sg.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeStorageGroup.String(), objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs,
small.Object().Address(),
leftChild.Object().Address(),
rightChild.Object().Address(),
link.Object().Address(),
parent.Object().Address(),
ts.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("objects with parent", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderParent,
parent.ID().String(),
objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs,
rightChild.Object().Address(),
link.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderParent, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("all objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
small.Object().Address(),
ts.Object().Address(),
sg.Object().Address(),
leftChild.Object().Address(),
rightChild.Object().Address(),
link.Object().Address(),
parent.Object().Address(),
)
})
}
func TestDB_SelectInhume(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
raw1 := generateRawObjectWithCID(t, cid)
err := putBig(db, raw1.Object())
require.NoError(t, err)
raw2 := generateRawObjectWithCID(t, cid)
err = putBig(db, raw2.Object())
require.NoError(t, err)
fs := objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
)
tombstone := objectSDK.NewAddress()
tombstone.SetContainerID(cid)
tombstone.SetObjectID(testOID())
err = meta.Inhume(db, raw2.Object().Address(), tombstone)
require.NoError(t, err)
fs = objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
raw1.Object().Address(),
)
}
func TestDB_SelectPayloadHash(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
raw1 := generateRawObjectWithCID(t, cid)
err := putBig(db, raw1.Object())
require.NoError(t, err)
raw2 := generateRawObjectWithCID(t, cid)
err = putBig(db, raw2.Object())
require.NoError(t, err)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadHash,
hex.EncodeToString(raw1.PayloadChecksum().Sum()),
objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, raw1.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadHash,
hex.EncodeToString(raw1.PayloadChecksum().Sum()),
objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs, raw2.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadHash,
"",
objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
}
func TestDB_SelectWithSlowFilters(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
v20 := new(pkg.Version)
v20.SetMajor(2)
v21 := new(pkg.Version)
v21.SetMajor(2)
v21.SetMinor(1)
raw1 := generateRawObjectWithCID(t, cid)
raw1.SetPayloadSize(10)
raw1.SetCreationEpoch(11)
raw1.SetVersion(v20)
err := putBig(db, raw1.Object())
require.NoError(t, err)
raw2 := generateRawObjectWithCID(t, cid)
raw2.SetPayloadSize(20)
raw2.SetCreationEpoch(21)
raw2.SetVersion(v21)
err = putBig(db, raw2.Object())
require.NoError(t, err)
t.Run("object with TZHash", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderHomomorphicHash,
hex.EncodeToString(raw1.PayloadHomomorphicHash().Sum()),
objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, raw1.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderHomomorphicHash,
hex.EncodeToString(raw1.PayloadHomomorphicHash().Sum()),
objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs, raw2.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderHomomorphicHash,
"",
objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("object with payload length", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, raw2.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs, raw1.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadLength, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("object with creation epoch", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, raw1.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringNotEqual)
testSelect(t, db, cid, fs, raw2.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderCreationEpoch, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("object with version", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectVersionFilter(objectSDK.MatchStringEqual, v21)
testSelect(t, db, cid, fs, raw2.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddObjectVersionFilter(objectSDK.MatchStringNotEqual, v21)
testSelect(t, db, cid, fs, raw1.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddObjectVersionFilter(objectSDK.MatchNotPresent, nil)
testSelect(t, db, cid, fs)
})
}
func TestDB_SelectObjectID(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
// prepare
parent := generateRawObjectWithCID(t, cid)
regular := generateRawObjectWithCID(t, cid)
regular.SetParentID(parent.ID())
regular.SetParent(parent.Object().SDK())
err := putBig(db, regular.Object())
require.NoError(t, err)
ts := generateRawObjectWithCID(t, cid)
ts.SetType(objectSDK.TypeTombstone)
err = putBig(db, ts.Object())
require.NoError(t, err)
sg := generateRawObjectWithCID(t, cid)
sg.SetType(objectSDK.TypeStorageGroup)
err = putBig(db, sg.Object())
require.NoError(t, err)
t.Run("not present", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchNotPresent, nil)
testSelect(t, db, cid, fs)
})
t.Run("not found objects", func(t *testing.T) {
raw := generateRawObjectWithCID(t, cid)
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, raw.ID())
testSelect(t, db, cid, fs)
fs = objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, raw.ID())
testSelect(t, db, cid, fs,
regular.Object().Address(),
parent.Object().Address(),
sg.Object().Address(),
ts.Object().Address(),
)
})
t.Run("regular objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, regular.ID())
testSelect(t, db, cid, fs, regular.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, regular.ID())
testSelect(t, db, cid, fs,
parent.Object().Address(),
sg.Object().Address(),
ts.Object().Address(),
)
})
t.Run("tombstone objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, ts.ID())
testSelect(t, db, cid, fs, ts.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, ts.ID())
testSelect(t, db, cid, fs,
regular.Object().Address(),
parent.Object().Address(),
sg.Object().Address(),
)
})
t.Run("storage group objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, sg.ID())
testSelect(t, db, cid, fs, sg.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, sg.ID())
testSelect(t, db, cid, fs,
regular.Object().Address(),
parent.Object().Address(),
ts.Object().Address(),
)
})
t.Run("parent objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, parent.ID())
testSelect(t, db, cid, fs, parent.Object().Address())
fs = objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, parent.ID())
testSelect(t, db, cid, fs,
regular.Object().Address(),
sg.Object().Address(),
ts.Object().Address(),
)
})
}
func TestDB_SelectSplitID(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
child1 := generateRawObjectWithCID(t, cid)
child2 := generateRawObjectWithCID(t, cid)
child3 := generateRawObjectWithCID(t, cid)
split1 := objectSDK.NewSplitID()
split2 := objectSDK.NewSplitID()
child1.SetSplitID(split1)
child2.SetSplitID(split1)
child3.SetSplitID(split2)
require.NoError(t, putBig(db, child1.Object()))
require.NoError(t, putBig(db, child2.Object()))
require.NoError(t, putBig(db, child3.Object()))
t.Run("not present", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchNotPresent)
testSelect(t, db, cid, fs)
})
t.Run("split id", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, split1.String(), objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs,
child1.Object().Address(),
child2.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, split2.String(), objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs, child3.Object().Address())
})
t.Run("empty split", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs)
})
t.Run("unknown split id", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID,
objectSDK.NewSplitID().String(),
objectSDK.MatchStringEqual)
testSelect(t, db, cid, fs)
})
}
func TestDB_SelectContainerID(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := cidtest.Generate()
obj1 := generateRawObjectWithCID(t, cid)
err := putBig(db, obj1.Object())
require.NoError(t, err)
obj2 := generateRawObjectWithCID(t, cid)
err = putBig(db, obj2.Object())
require.NoError(t, err)
t.Run("same cid", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid)
testSelect(t, db, cid, fs,
obj1.Object().Address(),
obj2.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddObjectContainerIDFilter(objectSDK.MatchStringNotEqual, cid)
testSelect(t, db, cid, fs,
obj1.Object().Address(),
obj2.Object().Address(),
)
fs = objectSDK.SearchFilters{}
fs.AddObjectContainerIDFilter(objectSDK.MatchNotPresent, cid)
testSelect(t, db, cid, fs)
})
t.Run("not same cid", func(t *testing.T) {
newCID := cidtest.Generate()
fs := objectSDK.SearchFilters{}
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, newCID)
testSelect(t, db, cid, fs)
})
}