From 059e9e88a25ab86752629d12165c02a3c2c32f46 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 6 Jun 2023 12:27:19 +0300 Subject: [PATCH] [#373] metabase: Add metrics Signed-off-by: Dmitrii Stepanov --- .../internal/meta/list-garbage.go | 2 +- .../internal/meta/list-graveyard.go | 2 +- cmd/frostfs-node/container.go | 2 +- cmd/frostfs-node/notificator.go | 2 +- pkg/local_object_storage/engine/container.go | 14 +++--- pkg/local_object_storage/engine/evacuate.go | 2 +- .../engine/evacuate_test.go | 4 +- pkg/local_object_storage/engine/list.go | 5 +- pkg/local_object_storage/engine/list_test.go | 2 +- .../engine/remove_copies.go | 2 +- pkg/local_object_storage/engine/select.go | 12 ++--- pkg/local_object_storage/metabase/children.go | 24 +++++++++- .../metabase/containers.go | 18 +++++++- .../metabase/containers_test.go | 13 +++--- pkg/local_object_storage/metabase/control.go | 10 +++- pkg/local_object_storage/metabase/db.go | 2 + pkg/local_object_storage/metabase/delete.go | 10 ++++ pkg/local_object_storage/metabase/exists.go | 11 ++++- pkg/local_object_storage/metabase/expired.go | 20 ++++++++ pkg/local_object_storage/metabase/get.go | 11 ++++- .../metabase/graveyard.go | 46 +++++++++++++++++-- .../metabase/graveyard_test.go | 30 ++++++------ pkg/local_object_storage/metabase/inhume.go | 10 +++- .../metabase/iterators.go | 36 +++++++++++++-- .../metabase/iterators_test.go | 6 +-- pkg/local_object_storage/metabase/list.go | 24 +++++++++- .../metabase/list_test.go | 5 +- pkg/local_object_storage/metabase/lock.go | 42 ++++++++++++++--- pkg/local_object_storage/metabase/metrics.go | 20 ++++++++ pkg/local_object_storage/metabase/mode.go | 1 + pkg/local_object_storage/metabase/put.go | 10 ++++ pkg/local_object_storage/metabase/select.go | 12 ++++- pkg/local_object_storage/shard/control.go | 2 +- pkg/local_object_storage/shard/gc.go | 14 +++--- pkg/local_object_storage/shard/list.go | 36 ++++++++++++--- pkg/local_object_storage/shard/list_test.go | 2 +- pkg/local_object_storage/shard/shard.go | 4 +- pkg/services/policer/process.go | 2 +- pkg/services/policer/queue.go | 5 +- 39 files changed, 379 insertions(+), 96 deletions(-) create mode 100644 pkg/local_object_storage/metabase/metrics.go diff --git a/cmd/frostfs-lens/internal/meta/list-garbage.go b/cmd/frostfs-lens/internal/meta/list-garbage.go index 3ab9a8f88..61b10ca1f 100644 --- a/cmd/frostfs-lens/internal/meta/list-garbage.go +++ b/cmd/frostfs-lens/internal/meta/list-garbage.go @@ -28,6 +28,6 @@ func listGarbageFunc(cmd *cobra.Command, _ []string) { return nil }) - err := db.IterateOverGarbage(garbPrm) + err := db.IterateOverGarbage(cmd.Context(), garbPrm) common.ExitOnErr(cmd, common.Errf("could not iterate over garbage bucket: %w", err)) } diff --git a/cmd/frostfs-lens/internal/meta/list-graveyard.go b/cmd/frostfs-lens/internal/meta/list-graveyard.go index db90513eb..19a93691c 100644 --- a/cmd/frostfs-lens/internal/meta/list-graveyard.go +++ b/cmd/frostfs-lens/internal/meta/list-graveyard.go @@ -33,6 +33,6 @@ func listGraveyardFunc(cmd *cobra.Command, _ []string) { return nil }) - err := db.IterateOverGraveyard(gravePrm) + err := db.IterateOverGraveyard(cmd.Context(), gravePrm) common.ExitOnErr(cmd, common.Errf("could not iterate over graveyard bucket: %w", err)) } diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 569e4a7ca..d5a5afce1 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -452,7 +452,7 @@ type localStorageLoad struct { } func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error { - idList, err := engine.ListContainers(d.engine) + idList, err := engine.ListContainers(context.TODO(), d.engine) if err != nil { return fmt.Errorf("list containers on engine failure: %w", err) } diff --git a/cmd/frostfs-node/notificator.go b/cmd/frostfs-node/notificator.go index 358b39a72..3fa486955 100644 --- a/cmd/frostfs-node/notificator.go +++ b/cmd/frostfs-node/notificator.go @@ -27,7 +27,7 @@ type notificationSource struct { func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) { log := n.l.With(zap.Uint64("epoch", epoch)) - listRes, err := n.e.ListContainers(engine.ListContainersPrm{}) + listRes, err := n.e.ListContainers(ctx, engine.ListContainersPrm{}) if err != nil { log.Error(logs.FrostFSNodeNotificatorCouldNotListContainers, zap.Error(err)) return diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 061e2fea0..e45f502ac 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -1,6 +1,8 @@ package engine import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" @@ -92,9 +94,9 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRe // ListContainers returns a unique container IDs presented in the engine objects. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) ListContainers(_ ListContainersPrm) (res ListContainersRes, err error) { +func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm) (res ListContainersRes, err error) { err = e.execIfNotBlocked(func() error { - res, err = e.listContainers() + res, err = e.listContainers(ctx) return err }) @@ -102,10 +104,10 @@ func (e *StorageEngine) ListContainers(_ ListContainersPrm) (res ListContainersR } // ListContainers calls ListContainers method on engine to get a unique container IDs presented in the engine objects. -func ListContainers(e *StorageEngine) ([]cid.ID, error) { +func ListContainers(ctx context.Context, e *StorageEngine) ([]cid.ID, error) { var prm ListContainersPrm - res, err := e.ListContainers(prm) + res, err := e.ListContainers(ctx, prm) if err != nil { return nil, err } @@ -113,7 +115,7 @@ func ListContainers(e *StorageEngine) ([]cid.ID, error) { return res.Containers(), nil } -func (e *StorageEngine) listContainers() (ListContainersRes, error) { +func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes, error) { if e.metrics != nil { defer elapsed("ListContainers", e.metrics.AddMethodDuration)() } @@ -121,7 +123,7 @@ func (e *StorageEngine) listContainers() (ListContainersRes, error) { uniqueIDs := make(map[string]cid.ID) e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { - res, err := sh.Shard.ListContainers(shard.ListContainersPrm{}) + res, err := sh.Shard.ListContainers(ctx, shard.ListContainersPrm{}) if err != nil { e.reportShardError(sh, "can't get list of containers", err) return false diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 73b7a7830..98a3a202d 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -261,7 +261,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes // because ListWithCursor works only with the metabase. - listRes, err := sh.ListWithCursor(listPrm) + listRes, err := sh.ListWithCursor(ctx, listPrm) if err != nil { if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { break diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 43737e7f7..13ac59f24 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -69,7 +69,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng err := e.Put(context.Background(), putPrm) require.NoError(t, err) - res, err := e.shards[ids[len(ids)-1].String()].List() + res, err := e.shards[ids[len(ids)-1].String()].List(context.Background()) require.NoError(t, err) if len(res.AddressList()) == objPerShard { break @@ -209,7 +209,7 @@ func TestEvacuateNetwork(t *testing.T) { var totalCount uint64 for i := range evacuateIDs { - res, err := e.shards[ids[i].String()].List() + res, err := e.shards[ids[i].String()].List(context.Background()) require.NoError(t, err) totalCount += uint64(len(res.AddressList())) diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 8781416f4..f9229a2b1 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -1,6 +1,7 @@ package engine import ( + "context" "math/rand" "sort" @@ -96,7 +97,7 @@ func (l ListWithCursorRes) Cursor() *Cursor { // // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) { +func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) { result := make([]objectcore.AddressWithType, 0, prm.count) // Set initial cursors @@ -142,7 +143,7 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes shardPrm.WithCount(count) shardPrm.WithCursor(cursor.getCurrentShardCursor()) - res, err := shardInstance.ListWithCursor(shardPrm) + res, err := shardInstance.ListWithCursor(ctx, shardPrm) if err != nil { cursor.setShardRead(curr) continue diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 5b927cf11..6cea2d0f4 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -107,7 +107,7 @@ func TestListWithCursor(t *testing.T) { var prm ListWithCursorPrm prm.count = tt.batchSize for { - res, err := e.ListWithCursor(prm) + res, err := e.ListWithCursor(context.Background(), prm) if err == ErrEndOfListing { require.Empty(t, res.AddressList()) break diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go index 7681e0e50..4b48d179c 100644 --- a/pkg/local_object_storage/engine/remove_copies.go +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -69,7 +69,7 @@ func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicat var listPrm shard.ListWithCursorPrm listPrm.WithCount(uint32(prm.Concurrency)) listPrm.WithCursor(cursor) - res, err := sh.ListWithCursor(listPrm) + res, err := sh.ListWithCursor(ctx, listPrm) if err != nil { if errors.Is(err, meta.ErrEndOfListing) { return nil diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 9f651845f..6a7bf1b7f 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -98,16 +98,16 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, // If limit is zero, then returns all available object addresses. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) List(limit uint64) (res SelectRes, err error) { +func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) { err = e.execIfNotBlocked(func() error { - res, err = e.list(limit) + res, err = e.list(ctx, limit) return err }) return } -func (e *StorageEngine) list(limit uint64) (SelectRes, error) { +func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) { if e.metrics != nil { defer elapsed("ListObjects", e.metrics.AddMethodDuration)() } @@ -118,7 +118,7 @@ func (e *StorageEngine) list(limit uint64) (SelectRes, error) { // consider iterating over shuffled shards e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { - res, err := sh.List() // consider limit result of shard iterator + res, err := sh.List(ctx) // consider limit result of shard iterator if err != nil { e.reportShardError(sh, "could not select objects from shard", err) } else { @@ -159,8 +159,8 @@ func Select(ctx context.Context, storage *StorageEngine, cnr cid.ID, fs object.S // List returns `limit` available physically storage object addresses in // engine. If limit is zero, then returns all available object addresses. -func List(storage *StorageEngine, limit uint64) ([]oid.Address, error) { - res, err := storage.List(limit) +func List(ctx context.Context, storage *StorageEngine, limit uint64) ([]oid.Address, error) { + res, err := storage.List(ctx, limit) if err != nil { return nil, err } diff --git a/pkg/local_object_storage/metabase/children.go b/pkg/local_object_storage/metabase/children.go index 500e83e7a..6816358d2 100644 --- a/pkg/local_object_storage/metabase/children.go +++ b/pkg/local_object_storage/metabase/children.go @@ -1,14 +1,34 @@ package meta import ( + "context" + "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // GetChildren returns parent -> children map. // If an object has no children, then map will contain addr -> empty slice value. -func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) { +func (db *DB) GetChildren(ctx context.Context, addresses []oid.Address) (map[oid.Address][]oid.Address, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("GetChildren", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.GetChildren", + trace.WithAttributes( + attribute.Int("addr_count", len(addresses)), + )) + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -53,6 +73,6 @@ func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Addres if err != nil { return nil, metaerr.Wrap(err) } - + success = true return result, nil } diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index fe38e0b6d..472b2affc 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -1,14 +1,28 @@ package meta import ( + "context" "encoding/binary" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.etcd.io/bbolt" ) -func (db *DB) Containers() (list []cid.ID, err error) { +func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("Containers", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.Containers") + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -21,7 +35,7 @@ func (db *DB) Containers() (list []cid.ID, err error) { return err }) - + success = err == nil return list, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/metabase/containers_test.go b/pkg/local_object_storage/metabase/containers_test.go index c0565b35a..4e2dd550d 100644 --- a/pkg/local_object_storage/metabase/containers_test.go +++ b/pkg/local_object_storage/metabase/containers_test.go @@ -1,6 +1,7 @@ package meta_test import ( + "context" "math/rand" "sort" "testing" @@ -34,7 +35,7 @@ func TestDB_Containers(t *testing.T) { require.NoError(t, err) } - lst, err := db.Containers() + lst, err := db.Containers(context.Background()) require.NoError(t, err) for _, cnr := range lst { @@ -60,7 +61,7 @@ func TestDB_Containers(t *testing.T) { require.NoError(t, putBig(db, obj)) - cnrs, err := db.Containers() + cnrs, err := db.Containers(context.Background()) require.NoError(t, err) cnr, _ := obj.ContainerID() @@ -68,7 +69,7 @@ func TestDB_Containers(t *testing.T) { require.NoError(t, metaInhume(db, object.AddressOf(obj), oidtest.Address())) - cnrs, err = db.Containers() + cnrs, err = db.Containers(context.Background()) require.NoError(t, err) assertContains(cnrs, cnr) }) @@ -78,14 +79,14 @@ func TestDB_Containers(t *testing.T) { require.NoError(t, putBig(db, obj)) - cnrs, err := db.Containers() + cnrs, err := db.Containers(context.Background()) require.NoError(t, err) cnr, _ := obj.ContainerID() assertContains(cnrs, cnr) require.NoError(t, metaToMoveIt(db, object.AddressOf(obj))) - cnrs, err = db.Containers() + cnrs, err = db.Containers(context.Background()) require.NoError(t, err) assertContains(cnrs, cnr) }) @@ -126,7 +127,7 @@ func TestDB_ContainersCount(t *testing.T) { return expected[i].EncodeToString() < expected[j].EncodeToString() }) - got, err := db.Containers() + got, err := db.Containers(context.Background()) require.NoError(t, err) sort.Slice(got, func(i, j int) bool { diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 370fddda1..d0a9c4723 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -175,10 +175,14 @@ func (db *DB) SyncCounters() error { // Close closes boltDB instance. func (db *DB) Close() error { + var err error if db.boltDB != nil { - return metaerr.Wrap(db.boltDB.Close()) + err = metaerr.Wrap(db.boltDB.Close()) } - return nil + if err == nil { + db.metrics.Close() + } + return err } // Reload reloads part of the configuration. @@ -202,12 +206,14 @@ func (db *DB) Reload(opts ...Option) (bool, error) { } db.mode = mode.Degraded + db.metrics.SetMode(mode.Degraded) db.info.Path = c.info.Path if err := db.openBolt(); err != nil { return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err)) } db.mode = mode.ReadWrite + db.metrics.SetMode(mode.ReadWrite) return true, nil } diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 5a9ca3aa9..c54ed98d0 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -60,6 +60,7 @@ type cfg struct { log *logger.Logger epochState EpochState + metrics Metrics } func defaultCfg() *cfg { @@ -70,6 +71,7 @@ func defaultCfg() *cfg { boltBatchDelay: bbolt.DefaultMaxBatchDelay, boltBatchSize: bbolt.DefaultMaxBatchSize, log: &logger.Logger{Logger: zap.L()}, + metrics: &noopMetrics{}, } } diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 7e8e0e5dd..d387b3d04 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" @@ -71,6 +72,14 @@ type referenceCounter map[string]*referenceNumber // Delete removed object records from metabase indexes. func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { + var ( + startedAt = time.Now() + deleted = false + ) + defer func() { + db.metrics.AddMethodDuration("Delete", time.Since(startedAt), deleted) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.Delete", trace.WithAttributes( attribute.Int("addr_count", len(prm.addrs)), @@ -98,6 +107,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { return err }) if err == nil { + deleted = true for i := range prm.addrs { storagelog.Write(db.log, storagelog.AddressField(prm.addrs[i]), diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index f9c563bdd..b6e5ea052 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -3,6 +3,7 @@ package meta import ( "context" "fmt" + "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -45,6 +46,14 @@ func (p ExistsRes) Exists() bool { // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("Exists", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.Exists", trace.WithAttributes( attribute.String("address", prm.addr.EncodeToString()), @@ -65,7 +74,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err return err }) - + success = err == nil return res, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/metabase/expired.go b/pkg/local_object_storage/metabase/expired.go index aac158ba4..43933d12d 100644 --- a/pkg/local_object_storage/metabase/expired.go +++ b/pkg/local_object_storage/metabase/expired.go @@ -5,18 +5,37 @@ import ( "errors" "fmt" "strconv" + "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // FilterExpired return expired items from addresses. // Address considered expired if metabase does contain information about expiration and // expiration epoch is less than epoch. func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) { + var ( + startedAt = time.Now() + success = true + ) + defer func() { + db.metrics.AddMethodDuration("FilterExpired", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.FilterExpired", + trace.WithAttributes( + attribute.String("epoch", strconv.FormatUint(epoch, 10)), + attribute.Int("addr_count", len(addresses)), + )) + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -68,6 +87,7 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A if err != nil { return nil, metaerr.Wrap(err) } + success = true return result, nil } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 2ce82bcee..ad35b4c18 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -3,6 +3,7 @@ package meta import ( "context" "fmt" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -52,6 +53,14 @@ func (r GetRes) Header() *objectSDK.Object { // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("Get", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.Get", trace.WithAttributes( attribute.String("address", prm.addr.EncodeToString()), @@ -74,7 +83,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { return err }) - + success = err == nil return res, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index e2530bd31..df9a3d302 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -2,10 +2,13 @@ package meta import ( "bytes" + "context" "errors" "fmt" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -58,7 +61,18 @@ func (g *GarbageIterationPrm) SetOffset(offset oid.Address) { // // If h returns ErrInterruptIterator, nil returns immediately. // Returns other errors of h directly. -func (db *DB) IterateOverGarbage(p GarbageIterationPrm) error { +func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverGarbage", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverGarbage") + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -66,9 +80,11 @@ func (db *DB) IterateOverGarbage(p GarbageIterationPrm) error { return ErrDegradedMode } - return metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset) })) + success = err == nil + return err } // TombstonedObject represents descriptor of the @@ -125,7 +141,18 @@ func (g *GraveyardIterationPrm) SetOffset(offset oid.Address) { // // If h returns ErrInterruptIterator, nil returns immediately. // Returns other errors of h directly. -func (db *DB) IterateOverGraveyard(p GraveyardIterationPrm) error { +func (db *DB) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverGraveyard", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverGraveyard") + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -232,7 +259,18 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) { // graveyard bucket. // // Returns any error appeared during deletion process. -func (db *DB) DropGraves(tss []TombstonedObject) error { +func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves") + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() diff --git a/pkg/local_object_storage/metabase/graveyard_test.go b/pkg/local_object_storage/metabase/graveyard_test.go index 8cd09e3f7..7476608f2 100644 --- a/pkg/local_object_storage/metabase/graveyard_test.go +++ b/pkg/local_object_storage/metabase/graveyard_test.go @@ -23,7 +23,7 @@ func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) { return nil }) - err := db.IterateOverGraveyard(iterGravePRM) + err := db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) require.Zero(t, counter) @@ -33,7 +33,7 @@ func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) { return nil }) - err = db.IterateOverGarbage(iterGCPRM) + err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) require.Zero(t, counter) } @@ -83,7 +83,7 @@ func TestDB_Iterate_OffsetNotFound(t *testing.T) { return nil }) - err = db.IterateOverGarbage(iterGCPRM) + err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) // the second object would be put after the @@ -99,7 +99,7 @@ func TestDB_Iterate_OffsetNotFound(t *testing.T) { return nil }) - err = db.IterateOverGarbage(iterGCPRM) + err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) // the third object would be put before the @@ -164,7 +164,7 @@ func TestDB_IterateDeletedObjects(t *testing.T) { return nil }) - err = db.IterateOverGraveyard(iterGravePRM) + err = db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) var iterGCPRM meta.GarbageIterationPrm @@ -175,7 +175,7 @@ func TestDB_IterateDeletedObjects(t *testing.T) { return nil }) - err = db.IterateOverGarbage(iterGCPRM) + err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) // objects covered with a tombstone @@ -255,7 +255,7 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) { return nil }) - err = db.IterateOverGraveyard(iterGraveyardPrm) + err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm) require.NoError(t, err) require.Equal(t, firstIterationSize, counter) require.Equal(t, firstIterationSize, len(gotGraveyard)) @@ -272,7 +272,7 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) { return nil }) - err = db.IterateOverGraveyard(iterGraveyardPrm) + err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm) require.NoError(t, err) require.Equal(t, len(expectedGraveyard), counter) require.ElementsMatch(t, gotGraveyard, expectedGraveyard) @@ -287,7 +287,7 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) { return nil }) - err = db.IterateOverGraveyard(iterGraveyardPrm) + err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm) require.NoError(t, err) require.False(t, iWasCalled) } @@ -348,7 +348,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) { return nil }) - err = db.IterateOverGarbage(iterGarbagePrm) + err = db.IterateOverGarbage(context.Background(), iterGarbagePrm) require.NoError(t, err) require.Equal(t, firstIterationSize, counter) require.Equal(t, firstIterationSize, len(gotGarbage)) @@ -363,7 +363,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) { return nil }) - err = db.IterateOverGarbage(iterGarbagePrm) + err = db.IterateOverGarbage(context.Background(), iterGarbagePrm) require.NoError(t, err) require.Equal(t, len(expectedGarbage), counter) require.ElementsMatch(t, gotGarbage, expectedGarbage) @@ -378,7 +378,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) { return nil }) - err = db.IterateOverGarbage(iterGarbagePrm) + err = db.IterateOverGarbage(context.Background(), iterGarbagePrm) require.NoError(t, err) require.False(t, iWasCalled) } @@ -418,11 +418,11 @@ func TestDB_DropGraves(t *testing.T) { return nil }) - err = db.IterateOverGraveyard(iterGravePRM) + err = db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) require.Equal(t, 2, counter) - err = db.DropGraves(buriedTS) + err = db.DropGraves(context.Background(), buriedTS) require.NoError(t, err) counter = 0 @@ -431,7 +431,7 @@ func TestDB_DropGraves(t *testing.T) { return nil }) - err = db.IterateOverGraveyard(iterGravePRM) + err = db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) require.Zero(t, counter) } diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 7865a8771..76744aa33 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -122,6 +123,13 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal") // NOTE: Marks any object with GC mark (despite any prohibitions on operations // with that object) if WithForceGCMark option has been provided. func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("Inhume", time.Since(startedAt), success) + }() _, span := tracing.StartSpanFromContext(ctx, "metabase.Inhume") defer span.End() @@ -138,7 +146,7 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err err err = db.boltDB.Update(func(tx *bbolt.Tx) error { return db.inhumeTx(tx, currEpoch, prm, &res) }) - + success = err == nil return res, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index dfa048621..78bfd2914 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -1,17 +1,22 @@ package meta import ( + "context" "errors" "fmt" "strconv" + "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // ExpiredObject is a descriptor of expired object from DB. @@ -44,7 +49,20 @@ var ErrInterruptIterator = logicerr.New("iterator is interrupted") // // If h returns ErrInterruptIterator, nil returns immediately. // Returns other errors of h directly. -func (db *DB) IterateExpired(epoch uint64, h ExpiredObjectHandler) error { +func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired", + trace.WithAttributes( + attribute.String("epoch", strconv.FormatUint(epoch, 10)), + )) + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -52,9 +70,11 @@ func (db *DB) IterateExpired(epoch uint64, h ExpiredObjectHandler) error { return ErrDegradedMode } - return metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { return db.iterateExpired(tx, epoch, h) })) + success = err == nil + return err } func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error { @@ -125,7 +145,17 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) // Returns other errors of h directly. // // Does not modify tss. -func (db *DB) IterateCoveredByTombstones(tss map[string]oid.Address, h func(oid.Address) error) error { +func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid.Address, h func(oid.Address) error) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateCoveredByTombstones", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateCoveredByTombstones") + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() diff --git a/pkg/local_object_storage/metabase/iterators_test.go b/pkg/local_object_storage/metabase/iterators_test.go index 69bf2bee5..e07184eb6 100644 --- a/pkg/local_object_storage/metabase/iterators_test.go +++ b/pkg/local_object_storage/metabase/iterators_test.go @@ -36,7 +36,7 @@ func TestDB_IterateExpired(t *testing.T) { require.NoError(t, db.Lock(context.Background(), expiredLocked.Container(), oidtest.ID(), []oid.ID{expiredLocked.Object()})) - err := db.IterateExpired(epoch, func(exp *meta.ExpiredObject) error { + err := db.IterateExpired(context.Background(), epoch, func(exp *meta.ExpiredObject) error { if addr, ok := mAlive[exp.Type()]; ok { require.NotEqual(t, addr, exp.Address()) } @@ -96,7 +96,7 @@ func TestDB_IterateCoveredByTombstones(t *testing.T) { ts.EncodeToString(): ts, } - err = db.IterateCoveredByTombstones(tss, func(addr oid.Address) error { + err = db.IterateCoveredByTombstones(context.Background(), tss, func(addr oid.Address) error { handled = append(handled, addr) return nil }) @@ -112,7 +112,7 @@ func TestDB_IterateCoveredByTombstones(t *testing.T) { handled = handled[:0] - err = db.IterateCoveredByTombstones(tss, func(addr oid.Address) error { + err = db.IterateCoveredByTombstones(context.Background(), tss, func(addr oid.Address) error { handled = append(handled, addr) return nil }) diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 7c77cd703..337265318 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -1,13 +1,19 @@ package meta import ( + "context" + "time" + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // ErrEndOfListing is returned from object listing with cursor @@ -61,7 +67,21 @@ func (l ListRes) Cursor() *Cursor { // // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) { +func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ListWithCursor", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.ListWithCursor", + trace.WithAttributes( + attribute.Int("count", prm.count), + attribute.Bool("has_cursor", prm.cursor != nil), + )) + defer span.End() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -75,7 +95,7 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) { res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) return err }) - + success = err == nil return res, metaerr.Wrap(err) } diff --git a/pkg/local_object_storage/metabase/list_test.go b/pkg/local_object_storage/metabase/list_test.go index 4bf3ca827..abb55c9d1 100644 --- a/pkg/local_object_storage/metabase/list_test.go +++ b/pkg/local_object_storage/metabase/list_test.go @@ -1,6 +1,7 @@ package meta_test import ( + "context" "errors" "sort" "testing" @@ -51,7 +52,7 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - res, err := db.ListWithCursor(prm) + res, err := db.ListWithCursor(context.Background(), prm) if err != nil { if err != meta.ErrEndOfListing { b.Fatalf("error: %v", err) @@ -225,6 +226,6 @@ func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]objec listPrm.SetCount(count) listPrm.SetCursor(cursor) - r, err := db.ListWithCursor(listPrm) + r, err := db.ListWithCursor(context.Background(), listPrm) return r.AddressList(), r.Cursor(), err } diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index b84e0bf46..f3388daf0 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -36,6 +37,14 @@ func bucketNameLockers(idCnr cid.ID, key []byte) []byte { // // Locked list should be unique. Panics if it is empty. func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.ID) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("Lock", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.Lock", trace.WithAttributes( attribute.String("container_id", cnr.EncodeToString()), @@ -57,7 +66,12 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid. panic("empty locked list") } - // check if all objects are regular + err := db.lockInternal(locked, cnr, locker) + success = err == nil + return err +} + +func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error { bucketKeysLocked := make([][]byte, len(locked)) for i := range locked { bucketKeysLocked[i] = objectKey(locked[i], make([]byte, objectKeySize)) @@ -83,7 +97,6 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid. loop: for i := range bucketKeysLocked { - // decode list of already existing lockers exLockers, err = decodeList(bucketLockedContainer.Get(bucketKeysLocked[i])) if err != nil { return fmt.Errorf("decode list of object lockers: %w", err) @@ -95,14 +108,11 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid. } } - // update the list of lockers updLockers, err = encodeList(append(exLockers, keyLocker)) if err != nil { - // maybe continue for the best effort? return fmt.Errorf("encode list of object lockers: %w", err) } - // write updated list of lockers err = bucketLockedContainer.Put(bucketKeysLocked[i], updLockers) if err != nil { return fmt.Errorf("update list of object lockers: %w", err) @@ -116,6 +126,14 @@ func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid. // FreeLockedBy unlocks all objects in DB which are locked by lockers. // Returns slice of unlocked object ID's or an error. func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("FreeLockedBy", time.Since(startedAt), success) + }() + db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -138,6 +156,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { }); err != nil { return nil, metaerr.Wrap(err) } + success = true return unlockedObjects, nil } @@ -280,6 +299,14 @@ func (i IsLockedRes) Locked() bool { // // Returns only non-logical errors related to underlying database. func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IsLocked", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IsLocked", trace.WithAttributes( attribute.String("address", prm.addr.EncodeToString()), @@ -292,9 +319,10 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e if db.mode.NoMetabase() { return res, ErrDegradedMode } - - return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object()) return nil })) + success = err == nil + return res, err } diff --git a/pkg/local_object_storage/metabase/metrics.go b/pkg/local_object_storage/metabase/metrics.go new file mode 100644 index 000000000..120579e4c --- /dev/null +++ b/pkg/local_object_storage/metabase/metrics.go @@ -0,0 +1,20 @@ +package meta + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" +) + +type Metrics interface { + SetMode(m mode.Mode) + Close() + + AddMethodDuration(method string, d time.Duration, success bool) +} + +type noopMetrics struct{} + +func (m *noopMetrics) SetMode(mode.Mode) {} +func (m *noopMetrics) Close() {} +func (m *noopMetrics) AddMethodDuration(string, time.Duration, bool) {} diff --git a/pkg/local_object_storage/metabase/mode.go b/pkg/local_object_storage/metabase/mode.go index dd1cdc900..28beca8f3 100644 --- a/pkg/local_object_storage/metabase/mode.go +++ b/pkg/local_object_storage/metabase/mode.go @@ -40,5 +40,6 @@ func (db *DB) SetMode(m mode.Mode) error { } db.mode = m + db.metrics.SetMode(m) return nil } diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index b7cff68dc..7a24485b3 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" gio "io" + "time" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" @@ -58,6 +59,14 @@ var ( // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("Put", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.Put", trace.WithAttributes( attribute.String("address", objectCore.AddressOf(prm.obj).EncodeToString()), @@ -79,6 +88,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { return db.put(tx, prm.obj, prm.id, nil, currEpoch) }) if err == nil { + success = true storagelog.Write(db.log, storagelog.AddressField(objectCore.AddressOf(prm.obj)), storagelog.OpField("metabase PUT")) diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 466476d4e..1f32d2cde 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "time" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -62,6 +63,14 @@ func (r SelectRes) AddressList() []oid.Address { // Select returns list of addresses of objects that match search filters. func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("Select", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.Select", trace.WithAttributes( attribute.String("container_id", prm.cnr.EncodeToString()), @@ -76,6 +85,7 @@ func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err err } if blindlyProcess(prm.filters) { + success = true return res, nil } @@ -83,7 +93,7 @@ func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err err return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch) - + success = err == nil return err })) } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 84efa1d31..bc514933b 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -138,7 +138,7 @@ func (s *Shard) Init(ctx context.Context) error { } } - s.updateMetrics() + s.updateMetrics(ctx) s.gc = &gc{ gcCfg: &s.gcCfg, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 2580173c2..b8d516871 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -270,7 +270,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) { // iterate over metabase's objects with GC mark // (no more than s.rmBatchSize objects) - err := s.metaBase.IterateOverGarbage(iterPrm) + err := s.metaBase.IterateOverGarbage(ctx, iterPrm) if err != nil { s.log.Warn(logs.ShardIteratorOverMetabaseGraveyardFailed, zap.String("error", err.Error()), @@ -382,7 +382,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) return } - expired, err := s.getExpiredWithLinked(expired) + expired, err := s.getExpiredWithLinked(ctx, expired) if err != nil { s.log.Warn(logs.ShardGCFailedToGetExpiredWithLinked, zap.Error(err)) return @@ -414,9 +414,9 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) } } -func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error) { +func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) { result := make([]oid.Address, 0, len(source)) - parentToChildren, err := s.metaBase.GetChildren(source) + parentToChildren, err := s.metaBase.GetChildren(ctx, source) if err != nil { return nil, err } @@ -469,7 +469,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { return } - err = s.metaBase.IterateOverGraveyard(iterPrm) + err = s.metaBase.IterateOverGraveyard(ctx, iterPrm) if err != nil { log.Error(logs.ShardIteratorOverGraveyardFailed, zap.Error(err)) s.m.RUnlock() @@ -560,7 +560,7 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo return ErrDegradedMode } - err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error { + err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error { select { case <-ctx.Done(): return meta.ErrInterruptIterator @@ -628,7 +628,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston // drop just processed expired tombstones // from graveyard - err = s.metaBase.DropGraves(tss) + err = s.metaBase.DropGraves(ctx, tss) if err != nil { s.log.Warn(logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err)) } diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index aaa1112cd..dd21745cc 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -7,8 +7,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -65,7 +68,13 @@ func (r ListWithCursorRes) Cursor() *Cursor { } // List returns all objects physically stored in the Shard. -func (s *Shard) List() (res SelectRes, err error) { +func (s *Shard) List(ctx context.Context) (res SelectRes, err error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.List", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + )) + defer span.End() + s.m.RLock() defer s.m.RUnlock() @@ -73,7 +82,7 @@ func (s *Shard) List() (res SelectRes, err error) { return SelectRes{}, ErrDegradedMode } - lst, err := s.metaBase.Containers() + lst, err := s.metaBase.Containers(ctx) if err != nil { return res, fmt.Errorf("can't list stored containers: %w", err) } @@ -86,7 +95,7 @@ func (s *Shard) List() (res SelectRes, err error) { sPrm.SetContainerID(lst[i]) sPrm.SetFilters(filters) - sRes, err := s.metaBase.Select(context.TODO(), sPrm) // consider making List in metabase + sRes, err := s.metaBase.Select(ctx, sPrm) // consider making List in metabase if err != nil { s.log.Debug(logs.ShardCantSelectAllObjects, zap.Stringer("cid", lst[i]), @@ -101,12 +110,18 @@ func (s *Shard) List() (res SelectRes, err error) { return res, nil } -func (s *Shard) ListContainers(_ ListContainersPrm) (ListContainersRes, error) { +func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListContainersRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ListContainers", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + )) + defer span.End() + if s.GetMode().NoMetabase() { return ListContainersRes{}, ErrDegradedMode } - containers, err := s.metaBase.Containers() + containers, err := s.metaBase.Containers(ctx) if err != nil { return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err) } @@ -122,7 +137,14 @@ func (s *Shard) ListContainers(_ ListContainersPrm) (ListContainersRes, error) { // // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (s *Shard) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) { +func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "shard.ListWithCursor", + trace.WithAttributes( + attribute.Int64("count", int64(prm.count)), + attribute.Bool("has_cursor", prm.cursor != nil), + )) + defer span.End() + if s.GetMode().NoMetabase() { return ListWithCursorRes{}, ErrDegradedMode } @@ -130,7 +152,7 @@ func (s *Shard) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) var metaPrm meta.ListPrm metaPrm.SetCount(prm.count) metaPrm.SetCursor(prm.cursor) - res, err := s.metaBase.ListWithCursor(metaPrm) + res, err := s.metaBase.ListWithCursor(ctx, metaPrm) if err != nil { return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err) } diff --git a/pkg/local_object_storage/shard/list_test.go b/pkg/local_object_storage/shard/list_test.go index bbce28430..63e7651c8 100644 --- a/pkg/local_object_storage/shard/list_test.go +++ b/pkg/local_object_storage/shard/list_test.go @@ -71,7 +71,7 @@ func testShardList(t *testing.T, sh *shard.Shard) { } require.NoError(t, errG.Wait()) - res, err := sh.List() + res, err := sh.List(context.Background()) require.NoError(t, err) for _, objID := range res.AddressList() { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 1d2cab9f2..10c1acd40 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -370,7 +370,7 @@ const ( logical = "logic" ) -func (s *Shard) updateMetrics() { +func (s *Shard) updateMetrics(ctx context.Context) { if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() { cc, err := s.metaBase.ObjectCounters() if err != nil { @@ -384,7 +384,7 @@ func (s *Shard) updateMetrics() { s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) - cnrList, err := s.metaBase.Containers() + cnrList, err := s.metaBase.Containers(ctx) if err != nil { s.log.Warn(logs.ShardMetaCantReadContainerList, zap.Error(err)) return diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 4a40f00ba..60e924755 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -34,7 +34,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { default: } - addrs, cursor, err = p.jobQueue.Select(cursor, p.batchSize) + addrs, cursor, err = p.jobQueue.Select(ctx, cursor, p.batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { time.Sleep(time.Second) // finished whole cycle, sleep a bit diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go index b8af44049..22012c835 100644 --- a/pkg/services/policer/queue.go +++ b/pkg/services/policer/queue.go @@ -1,6 +1,7 @@ package policer import ( + "context" "fmt" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -11,12 +12,12 @@ type jobQueue struct { localStorage *engine.StorageEngine } -func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) { +func (q *jobQueue) Select(ctx context.Context, cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) { var prm engine.ListWithCursorPrm prm.WithCursor(cursor) prm.WithCount(count) - res, err := q.localStorage.ListWithCursor(prm) + res, err := q.localStorage.ListWithCursor(ctx, prm) if err != nil { return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err) }