package meta_test import ( "context" "errors" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) const currEpoch = 1000 func TestDB_Exists(t *testing.T) { db := newDB(t, meta.WithEpochState(epochState{currEpoch})) defer func() { require.NoError(t, db.Close(context.Background())) }() t.Run("no object", func(t *testing.T) { nonExist := testutil.GenerateObject() exists, err := metaExists(db, object.AddressOf(nonExist)) require.NoError(t, err) require.False(t, exists) }) t.Run("regular object", func(t *testing.T) { regular := testutil.GenerateObject() err := putBig(db, regular) require.NoError(t, err) exists, err := metaExists(db, object.AddressOf(regular)) require.NoError(t, err) require.True(t, exists) t.Run("removed object", func(t *testing.T) { err := metaInhume(db, object.AddressOf(regular), oidtest.ID()) require.NoError(t, err) exists, err := metaExists(db, object.AddressOf(regular)) require.True(t, client.IsErrObjectAlreadyRemoved(err)) require.False(t, exists) }) }) t.Run("tombstone object", func(t *testing.T) { ts := testutil.GenerateObject() ts.SetType(objectSDK.TypeTombstone) err := putBig(db, ts) require.NoError(t, err) exists, err := metaExists(db, object.AddressOf(ts)) require.NoError(t, err) require.True(t, exists) }) t.Run("lock object", func(t *testing.T) { lock := testutil.GenerateObject() lock.SetType(objectSDK.TypeLock) err := putBig(db, lock) require.NoError(t, err) exists, err := metaExists(db, object.AddressOf(lock)) require.NoError(t, err) require.True(t, exists) }) t.Run("virtual object", func(t *testing.T) { cnr := cidtest.ID() parent := testutil.GenerateObjectWithCID(cnr) child := testutil.GenerateObjectWithCID(cnr) child.SetParent(parent) idParent, _ := parent.ID() child.SetParentID(idParent) err := putBig(db, child) require.NoError(t, err) _, err = metaExists(db, object.AddressOf(parent)) var expectedErr *objectSDK.SplitInfoError require.True(t, errors.As(err, &expectedErr)) }) t.Run("merge split info", func(t *testing.T) { cnr := cidtest.ID() splitID := objectSDK.NewSplitID() parent := testutil.GenerateObjectWithCID(cnr) testutil.AddAttribute(parent, "foo", "bar") child := testutil.GenerateObjectWithCID(cnr) child.SetParent(parent) idParent, _ := parent.ID() child.SetParentID(idParent) child.SetSplitID(splitID) link := testutil.GenerateObjectWithCID(cnr) link.SetParent(parent) link.SetParentID(idParent) idChild, _ := child.ID() link.SetChildren(idChild) link.SetSplitID(splitID) t.Run("direct order", func(t *testing.T) { err := putBig(db, child) require.NoError(t, err) err = putBig(db, link) require.NoError(t, err) _, err = metaExists(db, object.AddressOf(parent)) require.Error(t, err) var si *objectSDK.SplitInfoError require.ErrorAs(t, err, &si) require.Equal(t, splitID, si.SplitInfo().SplitID()) id1, _ := child.ID() id2, _ := si.SplitInfo().LastPart() require.Equal(t, id1, id2) id1, _ = link.ID() id2, _ = si.SplitInfo().Link() require.Equal(t, id1, id2) }) t.Run("reverse order", func(t *testing.T) { err := metaPut(db, link, nil) require.NoError(t, err) err = putBig(db, child) require.NoError(t, err) _, err = metaExists(db, object.AddressOf(parent)) require.Error(t, err) var si *objectSDK.SplitInfoError require.ErrorAs(t, err, &si) require.Equal(t, splitID, si.SplitInfo().SplitID()) id1, _ := child.ID() id2, _ := si.SplitInfo().LastPart() require.Equal(t, id1, id2) id1, _ = link.ID() id2, _ = si.SplitInfo().Link() require.Equal(t, id1, id2) }) }) t.Run("random object", func(t *testing.T) { addr := oidtest.Address() exists, err := metaExists(db, addr) require.NoError(t, err) require.False(t, exists) }) t.Run("expired object", func(t *testing.T) { checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) { gotObj, err := metaExists(db, object.AddressOf(exp)) require.False(t, gotObj) require.ErrorIs(t, err, meta.ErrObjectIsExpired) gotObj, err = metaExists(db, object.AddressOf(nonExp)) require.NoError(t, err) require.True(t, gotObj) }) }) }