From 57171907e3485dc29d4b375ce5139758fdeef718 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 15 Jan 2024 09:04:25 +0300 Subject: [PATCH] [#904] metabase: Return if object was actuall inserted This requires to count metrics properly. Signed-off-by: Dmitrii Stepanov --- .../metabase/counter_test.go | 49 +++++++++++++++++++ pkg/local_object_storage/metabase/put.go | 20 +++++--- pkg/local_object_storage/shard/put.go | 11 +++-- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 306bb0cc0..1797fc0aa 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -316,6 +316,55 @@ func TestCounters(t *testing.T) { }) } +func TestDoublePut(t *testing.T) { + t.Parallel() + db := newDB(t) + defer func() { require.NoError(t, db.Close()) }() + obj := testutil.GenerateObject() + + exp := make(map[cid.ID]meta.ObjectCounters) + cnrID, _ := obj.ContainerID() + exp[cnrID] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } + + var prm meta.PutPrm + prm.SetObject(obj) + pr, err := db.Put(context.Background(), prm) + require.NoError(t, err) + require.True(t, pr.Inserted) + + c, err := db.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, uint64(1), c.Phy) + require.Equal(t, uint64(1), c.Logic) + require.Equal(t, uint64(1), c.User) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) + + pr, err = db.Put(context.Background(), prm) + require.NoError(t, err) + require.False(t, pr.Inserted) + + c, err = db.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, uint64(1), c.Phy) + require.Equal(t, uint64(1), c.Logic) + require.Equal(t, uint64(1), c.User) + + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) +} + func TestCounters_Expired(t *testing.T) { // That test is about expired objects without // GCMark yet. Such objects should be treated as diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 2f5108953..1f0022389 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -36,7 +36,9 @@ type PutPrm struct { } // PutRes groups the resulting values of Put operation. -type PutRes struct{} +type PutRes struct { + Inserted bool +} // SetObject is a Put option to set object to save. func (p *PutPrm) SetObject(obj *objectSDK.Object) { @@ -85,7 +87,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - return db.put(tx, prm.obj, prm.id, nil, currEpoch) + var e error + res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch) + return e }) if err == nil { success = true @@ -102,10 +106,10 @@ func (db *DB) put(tx *bbolt.Tx, id []byte, si *objectSDK.SplitInfo, currEpoch uint64, -) error { +) (PutRes, error) { cnr, ok := obj.ContainerID() if !ok { - return errors.New("missing container in object") + return PutRes{}, errors.New("missing container in object") } isParent := si != nil @@ -116,14 +120,14 @@ func (db *DB) put(tx *bbolt.Tx, if errors.As(err, &splitInfoError) { exists = true // object exists, however it is virtual } else if err != nil { - return err // return any error besides SplitInfoError + return PutRes{}, err // return any error besides SplitInfoError } if exists { - return db.updateObj(tx, obj, id, si, isParent) + return PutRes{}, db.updateObj(tx, obj, id, si, isParent) } - return db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch) + return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch) } func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error { @@ -153,7 +157,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o return err } - err = db.put(tx, par, id, parentSI, currEpoch) + _, err = db.put(tx, par, id, parentSI, currEpoch) if err != nil { return err } diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 09a98260d..d7a9e7012 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -84,15 +84,18 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) { var pPrm meta.PutPrm pPrm.SetObject(prm.obj) pPrm.SetStorageID(res.StorageID) - if _, err := s.metaBase.Put(ctx, pPrm); err != nil { + res, err := s.metaBase.Put(ctx, pPrm) + if err != nil { // may we need to handle this case in a special way // since the object has been successfully written to BlobStor return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) } - s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj)) - s.addToPayloadSize(int64(prm.obj.PayloadSize())) - s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) + if res.Inserted { + s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj)) + s.addToPayloadSize(int64(prm.obj.PayloadSize())) + s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) + } } return PutRes{}, nil