Fix user object counter #911

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:fix/user_counter into master 2024-09-04 19:51:05 +00:00
5 changed files with 77 additions and 12 deletions

View file

@ -221,6 +221,7 @@ func TestBlobstorFailback(t *testing.T) {
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close(context.Background()))
fyrchik marked this conversation as resolved Outdated

Why not do it in defer, similar to other changes in the recent refactoring?

Why not do it in defer, similar to other changes in the recent refactoring?

There are two te instances created, first instance is closed by regular function call, so I decided to do the same way with the second instance.

There are two `te` instances created, first instance is closed by regular function call, so I decided to do the same way with the second instance.
}
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {

View file

@ -316,6 +316,55 @@ func TestCounters(t *testing.T) {
})
}
func TestDoublePut(t *testing.T) {
t.Parallel()
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
obj := testutil.GenerateObject()
exp := make(map[cid.ID]meta.ObjectCounters)
cnrID, _ := obj.ContainerID()
exp[cnrID] = meta.ObjectCounters{
Logic: 1,
Phy: 1,
User: 1,
}
var prm meta.PutPrm
prm.SetObject(obj)
pr, err := db.Put(context.Background(), prm)
require.NoError(t, err)
require.True(t, pr.Inserted)
c, err := db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(1), c.Phy)
require.Equal(t, uint64(1), c.Logic)
require.Equal(t, uint64(1), c.User)
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
pr, err = db.Put(context.Background(), prm)
require.NoError(t, err)
require.False(t, pr.Inserted)
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(1), c.Phy)
require.Equal(t, uint64(1), c.Logic)
require.Equal(t, uint64(1), c.User)
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
}
func TestCounters_Expired(t *testing.T) {
// That test is about expired objects without
// GCMark yet. Such objects should be treated as

View file

@ -7,6 +7,7 @@ import (
"fmt"
"time"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -184,6 +185,13 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
return db.inhumeTx(tx, currEpoch, prm, &res)
})
success = err == nil
if success {
for _, addr := range prm.target {
storagelog.Write(db.log,
storagelog.AddressField(addr),
storagelog.OpField("metabase INHUME"))
}
}
return res, metaerr.Wrap(err)
}

View file

@ -36,7 +36,9 @@ type PutPrm struct {
}
// PutRes groups the resulting values of Put operation.
type PutRes struct{}
type PutRes struct {
Inserted bool
}
// SetObject is a Put option to set object to save.
func (p *PutPrm) SetObject(obj *objectSDK.Object) {
@ -85,7 +87,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
return db.put(tx, prm.obj, prm.id, nil, currEpoch)
var e error
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
fyrchik marked this conversation as resolved Outdated

What about a unit test for this?

What about a unit test for this?

Done

Done
return e
})
if err == nil {
success = true
@ -102,10 +106,10 @@ func (db *DB) put(tx *bbolt.Tx,
id []byte,
si *objectSDK.SplitInfo,
currEpoch uint64,
) error {
) (PutRes, error) {
cnr, ok := obj.ContainerID()
if !ok {
return errors.New("missing container in object")
return PutRes{}, errors.New("missing container in object")
}
isParent := si != nil
@ -116,14 +120,14 @@ func (db *DB) put(tx *bbolt.Tx,
if errors.As(err, &splitInfoError) {
exists = true // object exists, however it is virtual
} else if err != nil {
return err // return any error besides SplitInfoError
return PutRes{}, err // return any error besides SplitInfoError
}
if exists {
return db.updateObj(tx, obj, id, si, isParent)
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
}
return db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
}
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
@ -153,7 +157,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
return err
}
err = db.put(tx, par, id, parentSI, currEpoch)
_, err = db.put(tx, par, id, parentSI, currEpoch)
if err != nil {
return err
}

View file

@ -84,15 +84,18 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
var pPrm meta.PutPrm
pPrm.SetObject(prm.obj)
pPrm.SetStorageID(res.StorageID)
if _, err := s.metaBase.Put(ctx, pPrm); err != nil {
res, err := s.metaBase.Put(ctx, pPrm)
if err != nil {
// may we need to handle this case in a special way
// since the object has been successfully written to BlobStor
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
}
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
if res.Inserted {
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
}
}
return PutRes{}, nil