514 lines
12 KiB
Go
514 lines
12 KiB
Go
package meta_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const objCount = 10
|
|
|
|
func TestCounters(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("defaults", func(t *testing.T) {
|
|
t.Parallel()
|
|
db := newDB(t)
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
require.Zero(t, c.Phy)
|
|
require.Zero(t, c.Logic)
|
|
require.Zero(t, c.User)
|
|
|
|
cc, err := db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
require.Zero(t, len(cc.Counts))
|
|
})
|
|
|
|
t.Run("put", func(t *testing.T) {
|
|
t.Parallel()
|
|
db := newDB(t)
|
|
oo := make([]*objectSDK.Object, 0, objCount)
|
|
for i := 0; i < objCount; i++ {
|
|
oo = append(oo, testutil.GenerateObject())
|
|
}
|
|
|
|
var prm meta.PutPrm
|
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
|
|
|
for i := 0; i < objCount; i++ {
|
|
prm.SetObject(oo[i])
|
|
cnrID, _ := oo[i].ContainerID()
|
|
c := meta.ObjectCounters{}
|
|
exp[cnrID] = meta.ObjectCounters{
|
|
Logic: 1,
|
|
Phy: 1,
|
|
User: 1,
|
|
}
|
|
|
|
_, err := db.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
c, err = db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, uint64(i+1), c.Phy)
|
|
require.Equal(t, uint64(i+1), c.Logic)
|
|
|
|
cc, err := db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
}
|
|
})
|
|
|
|
t.Run("delete", func(t *testing.T) {
|
|
t.Parallel()
|
|
db := newDB(t)
|
|
oo := putObjs(t, db, objCount, false)
|
|
|
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
|
for _, obj := range oo {
|
|
cnrID, _ := obj.ContainerID()
|
|
exp[cnrID] = meta.ObjectCounters{
|
|
Logic: 1,
|
|
Phy: 1,
|
|
User: 1,
|
|
}
|
|
}
|
|
|
|
var prm meta.DeletePrm
|
|
for i := objCount - 1; i >= 0; i-- {
|
|
prm.SetAddresses(objectcore.AddressOf(oo[i]))
|
|
|
|
res, err := db.Delete(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(1), res.AvailableObjectsRemoved())
|
|
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, uint64(i), c.Phy)
|
|
require.Equal(t, uint64(i), c.Logic)
|
|
require.Equal(t, uint64(i), c.User)
|
|
|
|
cnrID, _ := oo[i].ContainerID()
|
|
if v, ok := exp[cnrID]; ok {
|
|
v.Phy--
|
|
v.Logic--
|
|
v.User--
|
|
if v.IsZero() {
|
|
delete(exp, cnrID)
|
|
} else {
|
|
exp[cnrID] = v
|
|
}
|
|
}
|
|
|
|
cc, err := db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
}
|
|
})
|
|
|
|
t.Run("inhume", func(t *testing.T) {
|
|
t.Parallel()
|
|
db := newDB(t)
|
|
oo := putObjs(t, db, objCount, false)
|
|
|
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
|
for _, obj := range oo {
|
|
cnrID, _ := obj.ContainerID()
|
|
exp[cnrID] = meta.ObjectCounters{
|
|
Logic: 1,
|
|
Phy: 1,
|
|
User: 1,
|
|
}
|
|
}
|
|
|
|
inhumedObjs := make([]oid.Address, objCount/2)
|
|
|
|
for i, o := range oo {
|
|
if i == len(inhumedObjs) {
|
|
break
|
|
}
|
|
|
|
inhumedObjs[i] = objectcore.AddressOf(o)
|
|
}
|
|
|
|
for _, addr := range inhumedObjs {
|
|
if v, ok := exp[addr.Container()]; ok {
|
|
v.Logic--
|
|
v.User--
|
|
if v.IsZero() {
|
|
delete(exp, addr.Container())
|
|
} else {
|
|
exp[addr.Container()] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
var prm meta.InhumePrm
|
|
prm.SetTombstoneAddress(oidtest.Address())
|
|
prm.SetAddresses(inhumedObjs...)
|
|
|
|
res, err := db.Inhume(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
|
|
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
|
|
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, uint64(objCount), c.Phy)
|
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
|
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)
|
|
|
|
cc, err := db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
})
|
|
|
|
t.Run("put_split", func(t *testing.T) {
|
|
t.Parallel()
|
|
db := newDB(t)
|
|
parObj := testutil.GenerateObject()
|
|
|
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
|
|
|
// put objects and check that parent info
|
|
// does not affect the counter
|
|
for i := 0; i < objCount; i++ {
|
|
o := testutil.GenerateObject()
|
|
if i < objCount/2 { // half of the objs will have the parent
|
|
o.SetParent(parObj)
|
|
o.SetSplitID(objectSDK.NewSplitID())
|
|
}
|
|
|
|
cnrID, _ := o.ContainerID()
|
|
exp[cnrID] = meta.ObjectCounters{
|
|
Logic: 1,
|
|
Phy: 1,
|
|
User: 1,
|
|
}
|
|
|
|
require.NoError(t, putBig(db, o))
|
|
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(i+1), c.Phy)
|
|
require.Equal(t, uint64(i+1), c.Logic)
|
|
require.Equal(t, uint64(i+1), c.User)
|
|
|
|
cc, err := db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
}
|
|
})
|
|
|
|
t.Run("delete_split", func(t *testing.T) {
|
|
t.Parallel()
|
|
db := newDB(t)
|
|
oo := putObjs(t, db, objCount, true)
|
|
|
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
|
for _, obj := range oo {
|
|
cnrID, _ := obj.ContainerID()
|
|
exp[cnrID] = meta.ObjectCounters{
|
|
Logic: 1,
|
|
Phy: 1,
|
|
User: 1,
|
|
}
|
|
}
|
|
|
|
// delete objects that have parent info
|
|
// and check that it does not affect
|
|
// the counter
|
|
for i, o := range oo {
|
|
addr := objectcore.AddressOf(o)
|
|
require.NoError(t, metaDelete(db, addr))
|
|
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(objCount-i-1), c.Phy)
|
|
require.Equal(t, uint64(objCount-i-1), c.Logic)
|
|
require.Equal(t, uint64(objCount-i-1), c.User)
|
|
|
|
if v, ok := exp[addr.Container()]; ok {
|
|
v.Logic--
|
|
v.Phy--
|
|
v.User--
|
|
if v.IsZero() {
|
|
delete(exp, addr.Container())
|
|
} else {
|
|
exp[addr.Container()] = v
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("inhume_split", func(t *testing.T) {
|
|
t.Parallel()
|
|
db := newDB(t)
|
|
oo := putObjs(t, db, objCount, true)
|
|
|
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
|
for _, obj := range oo {
|
|
cnrID, _ := obj.ContainerID()
|
|
exp[cnrID] = meta.ObjectCounters{
|
|
Logic: 1,
|
|
Phy: 1,
|
|
User: 1,
|
|
}
|
|
}
|
|
|
|
inhumedObjs := make([]oid.Address, objCount/2)
|
|
|
|
for i, o := range oo {
|
|
if i == len(inhumedObjs) {
|
|
break
|
|
}
|
|
|
|
inhumedObjs[i] = objectcore.AddressOf(o)
|
|
}
|
|
|
|
for _, addr := range inhumedObjs {
|
|
if v, ok := exp[addr.Container()]; ok {
|
|
v.Logic--
|
|
v.User--
|
|
if v.IsZero() {
|
|
delete(exp, addr.Container())
|
|
} else {
|
|
exp[addr.Container()] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
var prm meta.InhumePrm
|
|
prm.SetTombstoneAddress(oidtest.Address())
|
|
prm.SetAddresses(inhumedObjs...)
|
|
|
|
_, err := db.Inhume(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, uint64(objCount), c.Phy)
|
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
|
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)
|
|
|
|
cc, err := db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
})
|
|
}
|
|
|
|
func TestCounters_Expired(t *testing.T) {
|
|
// That test is about expired objects without
|
|
// GCMark yet. Such objects should be treated as
|
|
// logically available: decrementing logic counter
|
|
// should be done explicitly and only in `Delete`
|
|
// and `Inhume` operations, otherwise, it would be
|
|
// impossible to maintain logic counter.
|
|
|
|
const epoch = 123
|
|
|
|
es := &epochState{epoch}
|
|
db := newDB(t, meta.WithEpochState(es))
|
|
|
|
oo := make([]oid.Address, objCount)
|
|
for i := range oo {
|
|
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
|
|
}
|
|
|
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
|
for _, addr := range oo {
|
|
exp[addr.Container()] = meta.ObjectCounters{
|
|
Logic: 1,
|
|
Phy: 1,
|
|
User: 1,
|
|
}
|
|
}
|
|
|
|
// 1. objects are available and counters are correct
|
|
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(objCount), c.Phy)
|
|
require.Equal(t, uint64(objCount), c.Logic)
|
|
require.Equal(t, uint64(objCount), c.User)
|
|
|
|
cc, err := db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
|
|
for _, o := range oo {
|
|
_, err := metaGet(db, o, true)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// 2. objects are expired, not available but logic counter
|
|
// is the same
|
|
|
|
es.e = epoch + 2
|
|
|
|
c, err = db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(objCount), c.Phy)
|
|
require.Equal(t, uint64(objCount), c.Logic)
|
|
require.Equal(t, uint64(objCount), c.User)
|
|
|
|
cc, err = db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
|
|
for _, o := range oo {
|
|
_, err := metaGet(db, o, true)
|
|
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
|
}
|
|
|
|
// 3. inhuming an expired object with GCMark (like it would
|
|
// the GC do) should decrease the logic counter despite the
|
|
// expiration fact
|
|
|
|
var inhumePrm meta.InhumePrm
|
|
inhumePrm.SetGCMark()
|
|
inhumePrm.SetAddresses(oo[0])
|
|
|
|
inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
|
|
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
|
|
|
|
c, err = db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
|
require.Equal(t, uint64(len(oo)-1), c.Logic)
|
|
require.Equal(t, uint64(len(oo)-1), c.User)
|
|
|
|
if v, ok := exp[oo[0].Container()]; ok {
|
|
v.Logic--
|
|
v.User--
|
|
if v.IsZero() {
|
|
delete(exp, oo[0].Container())
|
|
} else {
|
|
exp[oo[0].Container()] = v
|
|
}
|
|
}
|
|
|
|
cc, err = db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
|
|
// 4. `Delete` an object with GCMark should decrease the
|
|
// phy counter but does not affect the logic counter (after
|
|
// that step they should be equal)
|
|
|
|
var deletePrm meta.DeletePrm
|
|
deletePrm.SetAddresses(oo[0])
|
|
|
|
deleteRes, err := db.Delete(context.Background(), deletePrm)
|
|
require.NoError(t, err)
|
|
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
|
require.Zero(t, deleteRes.UserObjectsRemoved())
|
|
|
|
if v, ok := exp[oo[0].Container()]; ok {
|
|
v.Phy--
|
|
if v.IsZero() {
|
|
delete(exp, oo[0].Container())
|
|
} else {
|
|
exp[oo[0].Container()] = v
|
|
}
|
|
}
|
|
|
|
oo = oo[1:]
|
|
|
|
c, err = db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
|
require.Equal(t, uint64(len(oo)), c.Logic)
|
|
require.Equal(t, uint64(len(oo)), c.User)
|
|
|
|
cc, err = db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
|
|
// 5 `Delete` an expired object (like it would the control
|
|
// service do) should decrease both counters despite the
|
|
// expiration fact
|
|
|
|
deletePrm.SetAddresses(oo[0])
|
|
|
|
deleteRes, err = db.Delete(context.Background(), deletePrm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
|
require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved())
|
|
|
|
if v, ok := exp[oo[0].Container()]; ok {
|
|
v.Phy--
|
|
v.Logic--
|
|
v.User--
|
|
if v.IsZero() {
|
|
delete(exp, oo[0].Container())
|
|
} else {
|
|
exp[oo[0].Container()] = v
|
|
}
|
|
}
|
|
|
|
oo = oo[1:]
|
|
|
|
c, err = db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
|
require.Equal(t, uint64(len(oo)), c.Logic)
|
|
require.Equal(t, uint64(len(oo)), c.User)
|
|
|
|
cc, err = db.ContainerCounters(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
|
}
|
|
|
|
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
|
|
var prm meta.PutPrm
|
|
var err error
|
|
parent := testutil.GenerateObject()
|
|
|
|
oo := make([]*objectSDK.Object, 0, count)
|
|
for i := 0; i < count; i++ {
|
|
o := testutil.GenerateObject()
|
|
if withParent {
|
|
o.SetParent(parent)
|
|
o.SetSplitID(objectSDK.NewSplitID())
|
|
}
|
|
|
|
oo = append(oo, o)
|
|
|
|
prm.SetObject(o)
|
|
_, err = db.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
c, err := db.ObjectCounters()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, uint64(i+1), c.Phy)
|
|
require.Equal(t, uint64(i+1), c.Logic)
|
|
}
|
|
|
|
return oo
|
|
}
|