From e41aba610df29542e6c6d862f12966021cd9579f Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 27 Oct 2021 13:52:12 +0300 Subject: [PATCH] [#945] metabase: Fix containers listing Container listing should not ignore tombstone and storage group objects which are not stored in primary buckets. Signed-off-by: Alex Vanin --- .../metabase/containers.go | 22 ++++----- .../metabase/containers_test.go | 45 +++++++++++++++++++ 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index d5857083..6593733f 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -20,15 +20,14 @@ func (db *DB) Containers() (list []*cid.ID, err error) { func (db *DB) containers(tx *bbolt.Tx) ([]*cid.ID, error) { result := make([]*cid.ID, 0) + unique := make(map[string]struct{}) err := tx.ForEach(func(name []byte, _ *bbolt.Bucket) error { - id, err := parseContainerID(name) - if err != nil { - return err - } + id := parseContainerID(name, unique) if id != nil { result = append(result, id) + unique[id.String()] = struct{}{} } return nil @@ -58,16 +57,19 @@ func (db *DB) containerSize(tx *bbolt.Tx, id *cid.ID) (uint64, error) { return parseContainerSize(containerVolume.Get(key)), nil } -func parseContainerID(name []byte) (*cid.ID, error) { - strName := string(name) +func parseContainerID(name []byte, ignore map[string]struct{}) *cid.ID { + containerID := cid.New() + strContainerID := strings.Split(string(name), invalidBase58String)[0] - if strings.Contains(strName, invalidBase58String) { - return nil, nil + if _, ok := ignore[strContainerID]; ok { + return nil } - id := cid.New() + if err := containerID.Parse(strContainerID); err != nil { + return nil + } - return id, id.Parse(strName) + return containerID } func parseContainerSize(v []byte) uint64 { diff --git a/pkg/local_object_storage/metabase/containers_test.go b/pkg/local_object_storage/metabase/containers_test.go index 5c8eecad..d5d8e091 100644 --- a/pkg/local_object_storage/metabase/containers_test.go +++ b/pkg/local_object_storage/metabase/containers_test.go @@ -2,10 +2,12 @@ package meta_test import ( "math/rand" + "sort" "testing" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/stretchr/testify/require" @@ -72,6 +74,49 @@ func TestDB_Containers(t *testing.T) { }) } +func TestDB_ContainersCount(t *testing.T) { + db := newDB(t) + t.Cleanup(func() { releaseDB(db) }) + + const R, T, SG = 10, 11, 12 // amount of object per type + + uploadObjects := [...]struct { + amount int + typ objectSDK.Type + }{ + {R, objectSDK.TypeRegular}, + {T, objectSDK.TypeTombstone}, + {SG, objectSDK.TypeTombstone}, + } + + expected := make([]*cid.ID, 0, R+T+SG) + + for _, upload := range uploadObjects { + for i := 0; i < upload.amount; i++ { + obj := generateRawObject(t) + obj.SetType(upload.typ) + + err := putBig(db, obj.Object()) + require.NoError(t, err) + + expected = append(expected, obj.ContainerID()) + } + } + + sort.Slice(expected, func(i, j int) bool { + return expected[i].String() < expected[j].String() + }) + + got, err := db.Containers() + require.NoError(t, err) + + sort.Slice(got, func(i, j int) bool { + return got[i].String() < got[j].String() + }) + + require.Equal(t, expected, got) +} + func TestDB_ContainerSize(t *testing.T) { db := newDB(t) defer releaseDB(db)