WIP: Change metabase engine to pebble #1221
17 changed files with 84 additions and 194 deletions
|
@ -83,6 +83,7 @@ func TestInitializationFailure(t *testing.T) {
|
||||||
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
|
||||||
})
|
})
|
||||||
t.Run("metabase", func(t *testing.T) {
|
t.Run("metabase", func(t *testing.T) {
|
||||||
|
t.Skip("will be implemented correctly later")
|
||||||
var openFileMetabaseSucceed atomic.Bool
|
var openFileMetabaseSucceed atomic.Bool
|
||||||
openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
|
||||||
if openFileMetabaseSucceed.Load() {
|
if openFileMetabaseSucceed.Load() {
|
||||||
|
|
|
@ -59,6 +59,12 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for v := it.First(); v; v = it.Next() {
|
for v := it.First(); v; v = it.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, errors.Join(ctx.Err(), it.Close())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
if parseContainerIDWithIgnore(&cnr, it.Key(), unique) {
|
if parseContainerIDWithIgnore(&cnr, it.Key(), unique) {
|
||||||
result = append(result, cnr)
|
result = append(result, cnr)
|
||||||
unique[string(it.Key()[1:containerSizeKeySize])] = struct{}{}
|
unique[string(it.Key()[1:containerSizeKeySize])] = struct{}{}
|
||||||
|
@ -79,6 +85,9 @@ func parseContainerIDWithIgnore(dst *cid.ID, name []byte, ignore map[string]stru
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) ContainerSize(ctx context.Context, id cid.ID) (size uint64, err error) {
|
func (db *DB) ContainerSize(ctx context.Context, id cid.ID) (size uint64, err error) {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerSize")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
|
|
@ -151,7 +151,7 @@ func TestDB_ContainerSize(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for cnr, volume := range cids {
|
for cnr, volume := range cids {
|
||||||
n, err := db.ContainerSize(cnr)
|
n, err := db.ContainerSize(context.Background(), cnr)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, volume, int(n))
|
require.Equal(t, volume, int(n))
|
||||||
}
|
}
|
||||||
|
@ -169,7 +169,7 @@ func TestDB_ContainerSize(t *testing.T) {
|
||||||
|
|
||||||
volume -= int(obj.PayloadSize())
|
volume -= int(obj.PayloadSize())
|
||||||
|
|
||||||
n, err := db.ContainerSize(cnr)
|
n, err := db.ContainerSize(context.Background(), cnr)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, volume, int(n))
|
require.Equal(t, volume, int(n))
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,9 +47,10 @@ func (db *DB) openDB(mode mode.Mode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) pebbleOptions(readOnly bool) *pebble.Options {
|
func (db *DB) pebbleOptions(readOnly bool) *pebble.Options {
|
||||||
return &pebble.Options{
|
opts := &pebble.Options{
|
||||||
ReadOnly: readOnly,
|
ReadOnly: readOnly,
|
||||||
}
|
}
|
||||||
|
return opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) openDatabase(readOnly bool) error {
|
func (db *DB) openDatabase(readOnly bool) error {
|
||||||
|
|
|
@ -17,7 +17,7 @@ func TestReset(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
defer func() { require.NoError(t, db.Close()) }()
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
|
|
||||||
err := db.Reset()
|
err := db.Reset(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
obj := testutil.GenerateObject()
|
obj := testutil.GenerateObject()
|
||||||
|
@ -47,7 +47,7 @@ func TestReset(t *testing.T) {
|
||||||
assertExists(addr, true, nil)
|
assertExists(addr, true, nil)
|
||||||
assertExists(addrToInhume, false, client.IsErrObjectAlreadyRemoved)
|
assertExists(addrToInhume, false, client.IsErrObjectAlreadyRemoved)
|
||||||
|
|
||||||
err = db.Reset()
|
err = db.Reset(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assertExists(addr, false, nil)
|
assertExists(addr, false, nil)
|
||||||
|
|
|
@ -23,7 +23,7 @@ func TestCounters(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
defer func() { require.NoError(t, db.Close()) }()
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, c.Phy)
|
require.Zero(t, c.Phy)
|
||||||
require.Zero(t, c.Logic)
|
require.Zero(t, c.Logic)
|
||||||
|
@ -59,7 +59,7 @@ func TestCounters(t *testing.T) {
|
||||||
_, err := db.Put(context.Background(), prm)
|
_, err := db.Put(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(i+1), c.Phy)
|
require.Equal(t, uint64(i+1), c.Phy)
|
||||||
|
@ -90,13 +90,13 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
var prm meta.DeletePrm
|
var prm meta.DeletePrm
|
||||||
for i := objCount - 1; i >= 0; i-- {
|
for i := objCount - 1; i >= 0; i-- {
|
||||||
prm.SetAddresses(objectcore.AddressOf(oo[i]))
|
prm.Address = objectcore.AddressOf(oo[i])
|
||||||
|
|
||||||
res, err := db.Delete(context.Background(), prm)
|
res, err := db.Delete(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), res.LogicCount())
|
require.Equal(t, uint64(1), res.LogicCount)
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(i), c.Phy)
|
require.Equal(t, uint64(i), c.Phy)
|
||||||
|
@ -164,7 +164,7 @@ func TestCounters(t *testing.T) {
|
||||||
require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed())
|
require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed())
|
||||||
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
|
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(objCount), c.Phy)
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
|
@ -203,7 +203,7 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, putBig(db, o))
|
require.NoError(t, putBig(db, o))
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(i+1), c.Phy)
|
require.Equal(t, uint64(i+1), c.Phy)
|
||||||
require.Equal(t, uint64(i+1), c.Logic)
|
require.Equal(t, uint64(i+1), c.Logic)
|
||||||
|
@ -238,7 +238,7 @@ func TestCounters(t *testing.T) {
|
||||||
addr := objectcore.AddressOf(o)
|
addr := objectcore.AddressOf(o)
|
||||||
require.NoError(t, metaDelete(db, addr))
|
require.NoError(t, metaDelete(db, addr))
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(objCount-i-1), c.Phy)
|
require.Equal(t, uint64(objCount-i-1), c.Phy)
|
||||||
require.Equal(t, uint64(objCount-i-1), c.Logic)
|
require.Equal(t, uint64(objCount-i-1), c.Logic)
|
||||||
|
@ -302,7 +302,7 @@ func TestCounters(t *testing.T) {
|
||||||
_, err := db.Inhume(context.Background(), prm)
|
_, err := db.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(objCount), c.Phy)
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
|
@ -336,7 +336,7 @@ func TestDoublePut(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, pr.Inserted)
|
require.True(t, pr.Inserted)
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(1), c.Phy)
|
require.Equal(t, uint64(1), c.Phy)
|
||||||
|
@ -352,7 +352,7 @@ func TestDoublePut(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, pr.Inserted)
|
require.False(t, pr.Inserted)
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(1), c.Phy)
|
require.Equal(t, uint64(1), c.Phy)
|
||||||
|
@ -395,7 +395,7 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
// 1. objects are available and counters are correct
|
// 1. objects are available and counters are correct
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(objCount), c.Phy)
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
require.Equal(t, uint64(objCount), c.Logic)
|
require.Equal(t, uint64(objCount), c.Logic)
|
||||||
|
@ -416,7 +416,7 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
es.e = epoch + 2
|
es.e = epoch + 2
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(objCount), c.Phy)
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
require.Equal(t, uint64(objCount), c.Logic)
|
require.Equal(t, uint64(objCount), c.Logic)
|
||||||
|
@ -445,7 +445,7 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
require.Equal(t, uint64(1), inhumeRes.LogicInhumed())
|
require.Equal(t, uint64(1), inhumeRes.LogicInhumed())
|
||||||
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
|
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy)
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
||||||
|
@ -472,12 +472,12 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
// that step they should be equal)
|
// that step they should be equal)
|
||||||
|
|
||||||
var deletePrm meta.DeletePrm
|
var deletePrm meta.DeletePrm
|
||||||
deletePrm.SetAddresses(oo[0])
|
deletePrm.Address = oo[0]
|
||||||
|
|
||||||
deleteRes, err := db.Delete(context.Background(), deletePrm)
|
deleteRes, err := db.Delete(context.Background(), deletePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, deleteRes.LogicCount())
|
require.Zero(t, deleteRes.LogicCount)
|
||||||
require.Zero(t, deleteRes.UserCount())
|
require.Zero(t, deleteRes.UserCount)
|
||||||
|
|
||||||
if v, ok := exp[oo[0].Container()]; ok {
|
if v, ok := exp[oo[0].Container()]; ok {
|
||||||
v.Phy--
|
v.Phy--
|
||||||
|
@ -486,7 +486,7 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
oo = oo[1:]
|
oo = oo[1:]
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy)
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
||||||
require.Equal(t, uint64(len(oo)), c.Logic)
|
require.Equal(t, uint64(len(oo)), c.Logic)
|
||||||
|
@ -501,12 +501,12 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
// service do) should decrease both counters despite the
|
// service do) should decrease both counters despite the
|
||||||
// expiration fact
|
// expiration fact
|
||||||
|
|
||||||
deletePrm.SetAddresses(oo[0])
|
deletePrm.Address = oo[0]
|
||||||
|
|
||||||
deleteRes, err = db.Delete(context.Background(), deletePrm)
|
deleteRes, err = db.Delete(context.Background(), deletePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), deleteRes.LogicCount())
|
require.Equal(t, uint64(1), deleteRes.LogicCount)
|
||||||
require.Equal(t, uint64(1), deleteRes.UserCount())
|
require.Equal(t, uint64(1), deleteRes.UserCount)
|
||||||
|
|
||||||
if v, ok := exp[oo[0].Container()]; ok {
|
if v, ok := exp[oo[0].Container()]; ok {
|
||||||
v.Phy--
|
v.Phy--
|
||||||
|
@ -517,7 +517,7 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
oo = oo[1:]
|
oo = oo[1:]
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy)
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
||||||
require.Equal(t, uint64(len(oo)), c.Logic)
|
require.Equal(t, uint64(len(oo)), c.Logic)
|
||||||
|
@ -548,7 +548,7 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK
|
||||||
_, err = db.Put(context.Background(), prm)
|
_, err = db.Put(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(i+1), c.Phy)
|
require.Equal(t, uint64(i+1), c.Phy)
|
||||||
|
|
|
@ -43,8 +43,6 @@ type DB struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
dbOptions *pebble.Options // optional
|
|
||||||
|
|
||||||
info Info
|
info Info
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
@ -60,10 +58,9 @@ func defaultCfg() *cfg {
|
||||||
info: Info{
|
info: Info{
|
||||||
Permission: os.ModePerm, // 0777
|
Permission: os.ModePerm, // 0777
|
||||||
},
|
},
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
metrics: &noopMetrics{},
|
metrics: &noopMetrics{},
|
||||||
dbOptions: &pebble.Options{},
|
guard: newConcurrency(),
|
||||||
guard: newConcurrency(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,37 +183,6 @@ func unknownMatcher(_ string, _ []byte, _ string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// bucketKeyHelper returns byte representation of val that is used as a key
|
|
||||||
// in boltDB. Useful for getting filter values from unique and list indexes.
|
|
||||||
func bucketKeyHelper(hdr string, val string) []byte {
|
|
||||||
switch hdr {
|
|
||||||
case v2object.FilterHeaderParent, v2object.FilterHeaderECParent:
|
|
||||||
v, err := base58.Decode(val)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return v
|
|
||||||
case v2object.FilterHeaderPayloadHash:
|
|
||||||
v, err := hex.DecodeString(val)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return v
|
|
||||||
case v2object.FilterHeaderSplitID:
|
|
||||||
s := objectSDK.NewSplitID()
|
|
||||||
|
|
||||||
err := s.Parse(val)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.ToV2()
|
|
||||||
default:
|
|
||||||
return []byte(val)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
||||||
func (db *DB) SetLogger(l *logger.Logger) {
|
func (db *DB) SetLogger(l *logger.Logger) {
|
||||||
db.log = l
|
db.log = l
|
||||||
|
|
|
@ -155,9 +155,11 @@ func TestDelete(t *testing.T) {
|
||||||
require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
|
require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
|
||||||
require.Equal(t, 10, len(addrs))
|
require.Equal(t, 10, len(addrs))
|
||||||
var deletePrm meta.DeletePrm
|
var deletePrm meta.DeletePrm
|
||||||
deletePrm.SetAddresses(addrs...)
|
for _, addr := range addrs {
|
||||||
_, err := db.Delete(context.Background(), deletePrm)
|
deletePrm.Address = addr
|
||||||
require.NoError(t, err)
|
_, err := db.Delete(context.Background(), deletePrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
addrs = nil
|
addrs = nil
|
||||||
iprm.SetHandler(func(o meta.GarbageObject) error {
|
iprm.SetHandler(func(o meta.GarbageObject) error {
|
||||||
|
@ -190,7 +192,7 @@ func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) {
|
||||||
require.Equal(t, 1, garbageCount)
|
require.Equal(t, 1, garbageCount)
|
||||||
|
|
||||||
var delPrm meta.DeletePrm
|
var delPrm meta.DeletePrm
|
||||||
delPrm.SetAddresses(addr)
|
delPrm.Address = addr
|
||||||
_, err = db.Delete(context.Background(), delPrm)
|
_, err = db.Delete(context.Background(), delPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -201,8 +203,13 @@ func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) {
|
||||||
|
|
||||||
func metaDelete(db *meta.DB, addrs ...oid.Address) error {
|
func metaDelete(db *meta.DB, addrs ...oid.Address) error {
|
||||||
var deletePrm meta.DeletePrm
|
var deletePrm meta.DeletePrm
|
||||||
deletePrm.SetAddresses(addrs...)
|
for _, addr := range addrs {
|
||||||
|
deletePrm.Address = addr
|
||||||
|
|
||||||
_, err := db.Delete(context.Background(), deletePrm)
|
_, err := db.Delete(context.Background(), deletePrm)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
@ -204,19 +203,6 @@ func inGraveyardWithKey(r pebble.Reader, addr oid.Address) (uint8, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// inBucket checks if key <key> is present in bucket <name>.
|
|
||||||
func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
|
||||||
bkt := tx.Bucket(name)
|
|
||||||
if bkt == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// using `get` as `exists`: https://github.com/boltdb/bolt/issues/321
|
|
||||||
val := bkt.Get(key)
|
|
||||||
|
|
||||||
return len(val) != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
||||||
// if there is no `key` record in root index.
|
// if there is no `key` record in root index.
|
||||||
func getSplitInfo(r pebble.Reader, addr oid.Address) (*objectSDK.SplitInfo, error) {
|
func getSplitInfo(r pebble.Reader, addr oid.Address) (*objectSDK.SplitInfo, error) {
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
@ -216,10 +215,7 @@ func BenchmarkGet(b *testing.B) {
|
||||||
|
|
||||||
func benchmarkGet(b *testing.B, numOfObj int) {
|
func benchmarkGet(b *testing.B, numOfObj int) {
|
||||||
prepareDb := func(batchSize int) (*meta.DB, []oid.Address) {
|
prepareDb := func(batchSize int) (*meta.DB, []oid.Address) {
|
||||||
db := newDB(b,
|
db := newDB(b)
|
||||||
meta.WithMaxBatchSize(batchSize),
|
|
||||||
meta.WithMaxBatchDelay(10*time.Millisecond),
|
|
||||||
)
|
|
||||||
defer func() { require.NoError(b, db.Close()) }()
|
defer func() { require.NoError(b, db.Close()) }()
|
||||||
addrs := make([]oid.Address, 0, numOfObj)
|
addrs := make([]oid.Address, 0, numOfObj)
|
||||||
|
|
||||||
|
|
|
@ -1,65 +0,0 @@
|
||||||
package meta
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/rand"
|
|
||||||
"math"
|
|
||||||
mrand "math/rand"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_getVarUint(t *testing.T) {
|
|
||||||
data := make([]byte, 10)
|
|
||||||
for _, val := range []uint64{0, 0xfc, 0xfd, 0xfffe, 0xffff, 0xfffffffe, 0xffffffff, math.MaxUint64} {
|
|
||||||
expSize := io.PutVarUint(data, val)
|
|
||||||
actual, actSize, err := getVarUint(data)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, val, actual)
|
|
||||||
require.Equal(t, expSize, actSize, "value: %x", val)
|
|
||||||
|
|
||||||
_, _, err = getVarUint(data[:expSize-1])
|
|
||||||
require.Error(t, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_decodeList(t *testing.T) {
|
|
||||||
t.Run("empty", func(t *testing.T) {
|
|
||||||
lst, err := decodeList(nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, len(lst) == 0)
|
|
||||||
})
|
|
||||||
t.Run("empty, 0 len", func(t *testing.T) {
|
|
||||||
lst, err := decodeList([]byte{0})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, len(lst) == 0)
|
|
||||||
})
|
|
||||||
t.Run("bad len", func(t *testing.T) {
|
|
||||||
_, err := decodeList([]byte{0xfe})
|
|
||||||
require.Error(t, err)
|
|
||||||
})
|
|
||||||
t.Run("random", func(t *testing.T) {
|
|
||||||
r := mrand.New(mrand.NewSource(time.Now().Unix()))
|
|
||||||
expected := make([][]byte, 20)
|
|
||||||
for i := range expected {
|
|
||||||
expected[i] = make([]byte, r.Uint32()%10)
|
|
||||||
rand.Read(expected[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := encodeList(expected)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
actual, err := decodeList(data)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, expected, actual)
|
|
||||||
|
|
||||||
t.Run("unexpected EOF", func(t *testing.T) {
|
|
||||||
for i := 1; i < len(data)-1; i++ {
|
|
||||||
_, err := decodeList(data[:i])
|
|
||||||
require.Error(t, err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -143,8 +143,6 @@ func (p *InhumePrm) SetForceGCMark() {
|
||||||
p.forceRemoval = true
|
p.forceRemoval = true
|
||||||
}
|
}
|
||||||
|
|
||||||
var errBreakBucketForEach = errors.New("bucket ForEach break")
|
|
||||||
|
|
||||||
// ErrLockObjectRemoval is returned when inhume operation is being
|
// ErrLockObjectRemoval is returned when inhume operation is being
|
||||||
// performed on lock object, and it is not a forced object removal.
|
// performed on lock object, and it is not a forced object removal.
|
||||||
var ErrLockObjectRemoval = logicerr.New("lock object removal")
|
var ErrLockObjectRemoval = logicerr.New("lock object removal")
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkListWithCursor(b *testing.B) {
|
func BenchmarkListWithCursor(b *testing.B) {
|
||||||
|
@ -29,9 +28,7 @@ func BenchmarkListWithCursor(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func listWithCursorPrepareDB(b *testing.B) *meta.DB {
|
func listWithCursorPrepareDB(b *testing.B) *meta.DB {
|
||||||
db := newDB(b, meta.WithMaxBatchSize(1), meta.WithBoltDBOptions(&bbolt.Options{
|
db := newDB(b) // faster single-thread generation
|
||||||
NoSync: true,
|
|
||||||
})) // faster single-thread generation
|
|
||||||
defer func() { require.NoError(b, db.Close()) }()
|
defer func() { require.NoError(b, db.Close()) }()
|
||||||
|
|
||||||
obj := testutil.GenerateObject()
|
obj := testutil.GenerateObject()
|
||||||
|
|
|
@ -117,7 +117,7 @@ func TestDB_Lock(t *testing.T) {
|
||||||
require.Len(t, res.DeletedLockObjects(), 1)
|
require.Len(t, res.DeletedLockObjects(), 1)
|
||||||
require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0])
|
require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0])
|
||||||
|
|
||||||
_, err = db.FreeLockedBy([]oid.Address{lockAddr})
|
_, err = db.FreeLockedBy(context.Background(), []oid.Address{lockAddr})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
inhumePrm.SetAddresses(objAddr)
|
inhumePrm.SetAddresses(objAddr)
|
||||||
|
@ -148,7 +148,7 @@ func TestDB_Lock(t *testing.T) {
|
||||||
|
|
||||||
// unlock just objects that were locked by
|
// unlock just objects that were locked by
|
||||||
// just removed locker
|
// just removed locker
|
||||||
_, err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
|
_, err = db.FreeLockedBy(context.Background(), []oid.Address{res.DeletedLockObjects()[0]})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// removing objects after unlock
|
// removing objects after unlock
|
||||||
|
|
|
@ -2,11 +2,9 @@ package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"runtime"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
@ -43,9 +41,7 @@ func prepareObjects(n int) []*objectSDK.Object {
|
||||||
|
|
||||||
func BenchmarkPut(b *testing.B) {
|
func BenchmarkPut(b *testing.B) {
|
||||||
b.Run("parallel", func(b *testing.B) {
|
b.Run("parallel", func(b *testing.B) {
|
||||||
db := newDB(b,
|
db := newDB(b)
|
||||||
meta.WithMaxBatchDelay(time.Millisecond*10),
|
|
||||||
meta.WithMaxBatchSize(runtime.NumCPU()))
|
|
||||||
defer func() { require.NoError(b, db.Close()) }()
|
defer func() { require.NoError(b, db.Close()) }()
|
||||||
// Ensure the benchmark is bound by CPU and not waiting batch-delay time.
|
// Ensure the benchmark is bound by CPU and not waiting batch-delay time.
|
||||||
b.SetParallelism(1)
|
b.SetParallelism(1)
|
||||||
|
@ -65,9 +61,7 @@ func BenchmarkPut(b *testing.B) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
b.Run("sequential", func(b *testing.B) {
|
b.Run("sequential", func(b *testing.B) {
|
||||||
db := newDB(b,
|
db := newDB(b)
|
||||||
meta.WithMaxBatchDelay(time.Millisecond*10),
|
|
||||||
meta.WithMaxBatchSize(1))
|
|
||||||
defer func() { require.NoError(b, db.Close()) }()
|
defer func() { require.NoError(b, db.Close()) }()
|
||||||
var index atomic.Int64
|
var index atomic.Int64
|
||||||
index.Store(-1)
|
index.Store(-1)
|
||||||
|
|
|
@ -8,8 +8,8 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type epochState struct{ e uint64 }
|
type epochState struct{ e uint64 }
|
||||||
|
@ -42,16 +42,19 @@ func TestResetDropsContainerBuckets(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, db.Reset())
|
require.NoError(t, db.Reset(context.Background()))
|
||||||
|
|
||||||
var bucketCount int
|
var cnt int
|
||||||
require.NoError(t, db.database.Update(func(tx *bbolt.Tx) error {
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
it, err := s.NewIter(nil)
|
||||||
_, exists := mStaticBuckets[string(name)]
|
if err != nil {
|
||||||
require.True(t, exists, "unexpected bucket:"+string(name))
|
return err
|
||||||
bucketCount++
|
}
|
||||||
return nil
|
for v := it.First(); v; v = it.Next() {
|
||||||
})
|
cnt++
|
||||||
}))
|
}
|
||||||
require.Equal(t, len(mStaticBuckets), bucketCount)
|
return it.Close()
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, cnt)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type epochStateImpl struct{}
|
type epochStateImpl struct{}
|
||||||
|
@ -27,12 +27,9 @@ func TestVersion(t *testing.T) {
|
||||||
WithPermissions(0o600), WithEpochState(epochStateImpl{}))
|
WithPermissions(0o600), WithEpochState(epochStateImpl{}))
|
||||||
}
|
}
|
||||||
check := func(t *testing.T, db *DB) {
|
check := func(t *testing.T, db *DB) {
|
||||||
require.NoError(t, db.database.View(func(tx *bbolt.Tx) error {
|
require.NoError(t, db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
data, err := valueSafe(s, shardInfoKey(versionKey))
|
||||||
if b == nil {
|
require.NoError(t, err)
|
||||||
return errors.New("shard info bucket not found")
|
|
||||||
}
|
|
||||||
data := b.Get(versionKey)
|
|
||||||
if len(data) != 8 {
|
if len(data) != 8 {
|
||||||
return errors.New("invalid version data")
|
return errors.New("invalid version data")
|
||||||
}
|
}
|
||||||
|
@ -68,8 +65,8 @@ func TestVersion(t *testing.T) {
|
||||||
t.Run("invalid version", func(t *testing.T) {
|
t.Run("invalid version", func(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, db.database.Update(func(tx *bbolt.Tx) error {
|
require.NoError(t, db.batch(func(b *pebble.Batch) error {
|
||||||
return updateVersion(tx, version+1)
|
return updateVersion(b, version+1)
|
||||||
}))
|
}))
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
@ -79,7 +76,7 @@ func TestVersion(t *testing.T) {
|
||||||
|
|
||||||
t.Run("reset", func(t *testing.T) {
|
t.Run("reset", func(t *testing.T) {
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, db.Reset())
|
require.NoError(t, db.Reset(context.Background()))
|
||||||
check(t, db)
|
check(t, db)
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue