[#904] metabase: Return if object was actuall inserted
This requires to count metrics properly. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c1a80235db
commit
57171907e3
3 changed files with 68 additions and 12 deletions
|
@ -316,6 +316,55 @@ func TestCounters(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDoublePut(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
db := newDB(t)
|
||||||
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
|
obj := testutil.GenerateObject()
|
||||||
|
|
||||||
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
|
cnrID, _ := obj.ContainerID()
|
||||||
|
exp[cnrID] = meta.ObjectCounters{
|
||||||
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
var prm meta.PutPrm
|
||||||
|
prm.SetObject(obj)
|
||||||
|
pr, err := db.Put(context.Background(), prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, pr.Inserted)
|
||||||
|
|
||||||
|
c, err := db.ObjectCounters()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(1), c.Phy)
|
||||||
|
require.Equal(t, uint64(1), c.Logic)
|
||||||
|
require.Equal(t, uint64(1), c.User)
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
|
|
||||||
|
pr, err = db.Put(context.Background(), prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, pr.Inserted)
|
||||||
|
|
||||||
|
c, err = db.ObjectCounters()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(1), c.Phy)
|
||||||
|
require.Equal(t, uint64(1), c.Logic)
|
||||||
|
require.Equal(t, uint64(1), c.User)
|
||||||
|
|
||||||
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
|
}
|
||||||
|
|
||||||
func TestCounters_Expired(t *testing.T) {
|
func TestCounters_Expired(t *testing.T) {
|
||||||
// That test is about expired objects without
|
// That test is about expired objects without
|
||||||
// GCMark yet. Such objects should be treated as
|
// GCMark yet. Such objects should be treated as
|
||||||
|
|
|
@ -36,7 +36,9 @@ type PutPrm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutRes groups the resulting values of Put operation.
|
// PutRes groups the resulting values of Put operation.
|
||||||
type PutRes struct{}
|
type PutRes struct {
|
||||||
|
Inserted bool
|
||||||
|
}
|
||||||
|
|
||||||
// SetObject is a Put option to set object to save.
|
// SetObject is a Put option to set object to save.
|
||||||
func (p *PutPrm) SetObject(obj *objectSDK.Object) {
|
func (p *PutPrm) SetObject(obj *objectSDK.Object) {
|
||||||
|
@ -85,7 +87,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
return db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
var e error
|
||||||
|
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
||||||
|
return e
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
success = true
|
success = true
|
||||||
|
@ -102,10 +106,10 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
id []byte,
|
id []byte,
|
||||||
si *objectSDK.SplitInfo,
|
si *objectSDK.SplitInfo,
|
||||||
currEpoch uint64,
|
currEpoch uint64,
|
||||||
) error {
|
) (PutRes, error) {
|
||||||
cnr, ok := obj.ContainerID()
|
cnr, ok := obj.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("missing container in object")
|
return PutRes{}, errors.New("missing container in object")
|
||||||
}
|
}
|
||||||
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
|
@ -116,14 +120,14 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
if errors.As(err, &splitInfoError) {
|
if errors.As(err, &splitInfoError) {
|
||||||
exists = true // object exists, however it is virtual
|
exists = true // object exists, however it is virtual
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err // return any error besides SplitInfoError
|
return PutRes{}, err // return any error besides SplitInfoError
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
return db.updateObj(tx, obj, id, si, isParent)
|
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
|
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
||||||
|
@ -153,7 +157,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.put(tx, par, id, parentSI, currEpoch)
|
_, err = db.put(tx, par, id, parentSI, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,16 +84,19 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
var pPrm meta.PutPrm
|
var pPrm meta.PutPrm
|
||||||
pPrm.SetObject(prm.obj)
|
pPrm.SetObject(prm.obj)
|
||||||
pPrm.SetStorageID(res.StorageID)
|
pPrm.SetStorageID(res.StorageID)
|
||||||
if _, err := s.metaBase.Put(ctx, pPrm); err != nil {
|
res, err := s.metaBase.Put(ctx, pPrm)
|
||||||
|
if err != nil {
|
||||||
// may we need to handle this case in a special way
|
// may we need to handle this case in a special way
|
||||||
// since the object has been successfully written to BlobStor
|
// since the object has been successfully written to BlobStor
|
||||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if res.Inserted {
|
||||||
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
|
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
|
||||||
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
||||||
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return PutRes{}, nil
|
return PutRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue