Fix user object counter #911
5 changed files with 77 additions and 12 deletions
|
@ -221,6 +221,7 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close(context.Background()))
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
}
|
||||
|
||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
|
||||
|
|
|
@ -316,6 +316,55 @@ func TestCounters(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestDoublePut(t *testing.T) {
|
||||
t.Parallel()
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
obj := testutil.GenerateObject()
|
||||
|
||||
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||
cnrID, _ := obj.ContainerID()
|
||||
exp[cnrID] = meta.ObjectCounters{
|
||||
Logic: 1,
|
||||
Phy: 1,
|
||||
User: 1,
|
||||
}
|
||||
|
||||
var prm meta.PutPrm
|
||||
prm.SetObject(obj)
|
||||
pr, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.True(t, pr.Inserted)
|
||||
|
||||
c, err := db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint64(1), c.Phy)
|
||||
require.Equal(t, uint64(1), c.Logic)
|
||||
require.Equal(t, uint64(1), c.User)
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||
|
||||
pr, err = db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.False(t, pr.Inserted)
|
||||
|
||||
c, err = db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint64(1), c.Phy)
|
||||
require.Equal(t, uint64(1), c.Logic)
|
||||
require.Equal(t, uint64(1), c.User)
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||
}
|
||||
|
||||
func TestCounters_Expired(t *testing.T) {
|
||||
// That test is about expired objects without
|
||||
// GCMark yet. Such objects should be treated as
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -184,6 +185,13 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
||||
})
|
||||
success = err == nil
|
||||
if success {
|
||||
for _, addr := range prm.target {
|
||||
storagelog.Write(db.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.OpField("metabase INHUME"))
|
||||
}
|
||||
}
|
||||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,9 @@ type PutPrm struct {
|
|||
}
|
||||
|
||||
// PutRes groups the resulting values of Put operation.
|
||||
type PutRes struct{}
|
||||
type PutRes struct {
|
||||
Inserted bool
|
||||
}
|
||||
|
||||
// SetObject is a Put option to set object to save.
|
||||
func (p *PutPrm) SetObject(obj *objectSDK.Object) {
|
||||
|
@ -85,7 +87,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
|||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
return db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
||||
var e error
|
||||
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What about a unit test for this? What about a unit test for this?
dstepanov-yadro
commented
Done Done
|
||||
return e
|
||||
})
|
||||
if err == nil {
|
||||
success = true
|
||||
|
@ -102,10 +106,10 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
id []byte,
|
||||
si *objectSDK.SplitInfo,
|
||||
currEpoch uint64,
|
||||
) error {
|
||||
) (PutRes, error) {
|
||||
cnr, ok := obj.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container in object")
|
||||
return PutRes{}, errors.New("missing container in object")
|
||||
}
|
||||
|
||||
isParent := si != nil
|
||||
|
@ -116,14 +120,14 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
if errors.As(err, &splitInfoError) {
|
||||
exists = true // object exists, however it is virtual
|
||||
} else if err != nil {
|
||||
return err // return any error besides SplitInfoError
|
||||
return PutRes{}, err // return any error besides SplitInfoError
|
||||
}
|
||||
|
||||
if exists {
|
||||
return db.updateObj(tx, obj, id, si, isParent)
|
||||
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
||||
}
|
||||
|
||||
return db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
|
||||
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
|
||||
}
|
||||
|
||||
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
||||
|
@ -153,7 +157,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
|||
return err
|
||||
}
|
||||
|
||||
err = db.put(tx, par, id, parentSI, currEpoch)
|
||||
_, err = db.put(tx, par, id, parentSI, currEpoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -84,15 +84,18 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
var pPrm meta.PutPrm
|
||||
pPrm.SetObject(prm.obj)
|
||||
pPrm.SetStorageID(res.StorageID)
|
||||
if _, err := s.metaBase.Put(ctx, pPrm); err != nil {
|
||||
res, err := s.metaBase.Put(ctx, pPrm)
|
||||
if err != nil {
|
||||
// may we need to handle this case in a special way
|
||||
// since the object has been successfully written to BlobStor
|
||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
}
|
||||
|
||||
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
|
||||
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
||||
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
||||
if res.Inserted {
|
||||
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
|
||||
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
||||
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
||||
}
|
||||
}
|
||||
|
||||
return PutRes{}, nil
|
||||
|
|
Loading…
Add table
Reference in a new issue
Why not do it in defer, similar to other changes in the recent refactoring?
There are two
te
instances created, first instance is closed by regular function call, so I decided to do the same way with the second instance.