From 0920d848d0ee8e069b95c8db5bf1043991221d8d Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 13 Mar 2023 14:37:35 +0300 Subject: [PATCH] [#135] get-object: Add tracing spans Signed-off-by: Dmitrii Stepanov --- .../internal/blobovnicza/inspect.go | 2 +- cmd/frostfs-node/main.go | 2 +- cmd/frostfs-node/notificator.go | 11 +++-- go.mod | 6 +-- .../blobovnicza/blobovnicza_test.go | 3 +- pkg/local_object_storage/blobovnicza/get.go | 12 ++++- .../blobovnicza/get_test.go | 3 +- .../blobstor/blobovniczatree/exists.go | 16 ++++++- .../blobstor/blobovniczatree/exists_test.go | 5 +- .../blobstor/blobovniczatree/get.go | 31 +++++++++---- .../blobstor/blobovniczatree/get_range.go | 33 +++++++++---- .../blobstor/blobstor_test.go | 5 +- .../blobstor/common/storage.go | 12 +++-- pkg/local_object_storage/blobstor/exists.go | 21 +++++++-- .../blobstor/exists_test.go | 9 ++-- .../blobstor/fstree/fstree.go | 46 ++++++++++++++++--- pkg/local_object_storage/blobstor/get.go | 21 +++++++-- .../blobstor/get_range.go | 23 ++++++++-- .../blobstor/internal/blobstortest/control.go | 3 +- .../blobstor/internal/blobstortest/delete.go | 9 ++-- .../blobstor/internal/blobstortest/exists.go | 7 +-- .../blobstor/internal/blobstortest/get.go | 9 ++-- .../internal/blobstortest/get_range.go | 15 +++--- .../blobstor/memstore/memstore.go | 9 ++-- .../blobstor/memstore/memstore_test.go | 9 ++-- .../blobstor/perf_test.go | 3 +- .../blobstor/teststore/teststore.go | 13 +++--- .../engine/control_test.go | 6 +-- pkg/local_object_storage/engine/delete.go | 2 +- .../engine/delete_test.go | 2 +- pkg/local_object_storage/engine/error_test.go | 20 ++++---- pkg/local_object_storage/engine/evacuate.go | 19 ++++---- .../engine/evacuate_test.go | 20 ++++---- pkg/local_object_storage/engine/exists.go | 3 +- pkg/local_object_storage/engine/get.go | 32 ++++++++----- pkg/local_object_storage/engine/head.go | 21 +++++---- pkg/local_object_storage/engine/head_test.go | 3 +- pkg/local_object_storage/engine/inhume.go | 2 +- pkg/local_object_storage/engine/lock.go | 3 +- pkg/local_object_storage/engine/put.go | 7 +-- pkg/local_object_storage/engine/range.go | 35 +++++++++----- .../engine/remove_copies.go | 2 +- .../shard/control_test.go | 10 ++-- pkg/local_object_storage/shard/delete_test.go | 7 +-- pkg/local_object_storage/shard/dump_test.go | 3 +- pkg/local_object_storage/shard/exists.go | 6 ++- pkg/local_object_storage/shard/gc_test.go | 3 +- pkg/local_object_storage/shard/get.go | 18 ++++++-- pkg/local_object_storage/shard/get_test.go | 5 +- pkg/local_object_storage/shard/head.go | 17 ++++++- pkg/local_object_storage/shard/head_test.go | 7 +-- pkg/local_object_storage/shard/inhume_test.go | 2 +- pkg/local_object_storage/shard/lock_test.go | 3 +- pkg/local_object_storage/shard/range.go | 22 +++++++-- pkg/local_object_storage/shard/range_test.go | 3 +- pkg/local_object_storage/shard/reload_test.go | 2 +- pkg/local_object_storage/shard/shard_test.go | 1 + .../shard/shutdown_test.go | 3 +- .../writecache/flush_test.go | 11 +++-- pkg/local_object_storage/writecache/get.go | 25 ++++++++-- pkg/local_object_storage/writecache/init.go | 3 +- .../writecache/options.go | 3 +- .../writecache/writecache.go | 5 +- pkg/services/control/server/evacuate.go | 4 +- pkg/services/notificator/deps.go | 4 +- pkg/services/notificator/service.go | 5 +- pkg/services/object/acl/eacl/v2/eacl_test.go | 3 +- pkg/services/object/acl/eacl/v2/headers.go | 5 +- pkg/services/object/acl/eacl/v2/localstore.go | 5 +- pkg/services/object/get/get_test.go | 2 +- pkg/services/object/get/local.go | 8 +++- pkg/services/object/get/remote.go | 4 ++ pkg/services/object/get/service.go | 2 +- pkg/services/object/get/util.go | 8 ++-- pkg/services/object/get/v2/get_forwarder.go | 8 ++++ .../object/get/v2/get_range_forwarder.go | 8 ++++ pkg/services/object/get/v2/head_forwarder.go | 8 ++++ pkg/services/object/get/v2/service.go | 2 +- pkg/services/object/get/v2/util.go | 2 +- pkg/services/replicator/process.go | 2 +- 80 files changed, 523 insertions(+), 231 deletions(-) diff --git a/cmd/frostfs-lens/internal/blobovnicza/inspect.go b/cmd/frostfs-lens/internal/blobovnicza/inspect.go index 3f4e8cfe..13442a4b 100644 --- a/cmd/frostfs-lens/internal/blobovnicza/inspect.go +++ b/cmd/frostfs-lens/internal/blobovnicza/inspect.go @@ -33,7 +33,7 @@ func inspectFunc(cmd *cobra.Command, _ []string) { var prm blobovnicza.GetPrm prm.SetAddress(addr) - res, err := blz.Get(prm) + res, err := blz.Get(cmd.Context(), prm) common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err)) data := res.Object() diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index 2f4c9853..a97ad387 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -102,7 +102,7 @@ func initApp(ctx context.Context, c *cfg) { initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) }) initAndLog(c, "session", initSessionService) initAndLog(c, "reputation", func(c *cfg) { initReputationService(ctx, c) }) - initAndLog(c, "notification", initNotifications) + initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) }) initAndLog(c, "object", initObjectService) initAndLog(c, "tree", initTreeService) initAndLog(c, "control", initControlService) diff --git a/cmd/frostfs-node/notificator.go b/cmd/frostfs-node/notificator.go index d5cb1ded..4a310e5b 100644 --- a/cmd/frostfs-node/notificator.go +++ b/cmd/frostfs-node/notificator.go @@ -23,7 +23,7 @@ type notificationSource struct { defaultTopic string } -func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, addr oid.Address)) { +func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) { log := n.l.With(zap.Uint64("epoch", epoch)) listRes, err := n.e.ListContainers(engine.ListContainersPrm{}) @@ -51,7 +51,7 @@ func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, ad } for _, a := range selectRes.AddressList() { - err = n.processAddress(a, handler) + err = n.processAddress(ctx, a, handler) if err != nil { log.Error("notificator: could not process object", zap.Stringer("address", a), @@ -66,13 +66,14 @@ func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, ad } func (n *notificationSource) processAddress( + ctx context.Context, a oid.Address, h func(topic string, addr oid.Address), ) error { var prm engine.HeadPrm prm.WithAddress(a) - res, err := n.e.Head(prm) + res, err := n.e.Head(ctx, prm) if err != nil { return err } @@ -108,7 +109,7 @@ func (n notificationWriter) Notify(topic string, address oid.Address) { } } -func initNotifications(c *cfg) { +func initNotifications(ctx context.Context, c *cfg) { if nodeconfig.Notification(c.appCfg).Enabled() { topic := nodeconfig.Notification(c.appCfg).DefaultTopic() pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey()) @@ -151,7 +152,7 @@ func initNotifications(c *cfg) { addNewEpochAsyncNotificationHandler(c, func(e event.Event) { ev := e.(netmap.NewEpoch) - n.ProcessEpoch(ev.EpochNumber()) + n.ProcessEpoch(ctx, ev.EpochNumber()) }) } } diff --git a/go.mod b/go.mod index af85e69d..301be693 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,6 @@ require ( github.com/multiformats/go-multiaddr v0.8.0 github.com/nats-io/nats.go v1.22.1 github.com/nspcc-dev/neo-go v0.100.1 - github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb // indirect github.com/olekukonko/tablewriter v0.0.5 github.com/panjf2000/ants/v2 v2.4.0 github.com/paulmach/orb v0.2.2 @@ -31,6 +30,8 @@ require ( github.com/spf13/viper v1.15.0 github.com/stretchr/testify v1.8.2 go.etcd.io/bbolt v1.3.6 + go.opentelemetry.io/otel v1.14.0 + go.opentelemetry.io/otel/trace v1.14.0 go.uber.org/atomic v1.10.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 @@ -80,6 +81,7 @@ require ( github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 // indirect + github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect github.com/pelletier/go-toml/v2 v2.0.7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -94,13 +96,11 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect github.com/twmb/murmur3 v1.1.5 // indirect github.com/urfave/cli v1.22.5 // indirect - go.opentelemetry.io/otel v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect go.opentelemetry.io/otel/sdk v1.14.0 // indirect - go.opentelemetry.io/otel/trace v1.14.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.4.0 // indirect diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go index 4499c5d1..853628fb 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "context" "errors" "math/rand" "os" @@ -39,7 +40,7 @@ func testGet(t *testing.T, blz *Blobovnicza, addr oid.Address, expObj []byte, as pGet.SetAddress(addr) // try to read object from Blobovnicza - res, err := blz.Get(pGet) + res, err := blz.Get(context.Background(), pGet) if assertErr != nil { require.True(t, assertErr(err)) } else { diff --git a/pkg/local_object_storage/blobovnicza/get.go b/pkg/local_object_storage/blobovnicza/get.go index 776f08d2..c1cd19e5 100644 --- a/pkg/local_object_storage/blobovnicza/get.go +++ b/pkg/local_object_storage/blobovnicza/get.go @@ -1,12 +1,16 @@ package blobovnicza import ( + "context" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/nspcc-dev/neo-go/pkg/util/slice" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // GetPrm groups the parameters of Get operation. @@ -39,7 +43,13 @@ var errInterruptForEach = errors.New("interrupt for-each") // // Returns an error of type apistatus.ObjectNotFound if the requested object is not // presented in Blobovnicza. -func (b *Blobovnicza) Get(prm GetPrm) (GetRes, error) { +func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Get", + trace.WithAttributes( + attribute.String("address", prm.addr.EncodeToString()), + )) + defer span.End() + var ( data []byte addrKey = addressKey(prm.addr) diff --git a/pkg/local_object_storage/blobovnicza/get_test.go b/pkg/local_object_storage/blobovnicza/get_test.go index 98097e9c..ad30e8d9 100644 --- a/pkg/local_object_storage/blobovnicza/get_test.go +++ b/pkg/local_object_storage/blobovnicza/get_test.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "context" "os" "path/filepath" "testing" @@ -56,7 +57,7 @@ func TestBlobovnicza_Get(t *testing.T) { prmGet.SetAddress(addr) checkObj := func() { - res, err := blz.Get(prmGet) + res, err := blz.Get(context.Background(), prmGet) require.NoError(t, err) require.Equal(t, obj, res.Object()) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go index e13e4935..748843ee 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go @@ -1,15 +1,27 @@ package blobovniczatree import ( + "context" + "encoding/hex" "path/filepath" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) // Exists implements common.Storage. -func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { +func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Exists", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + )) + defer span.End() + if prm.StorageID != nil { id := blobovnicza.NewIDFromBytes(prm.StorageID) blz, err := b.openBlobovnicza(id.String()) @@ -32,7 +44,7 @@ func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { _, ok := activeCache[dirPath] - _, err := b.getObjectFromLevel(gPrm, p, !ok) + _, err := b.getObjectFromLevel(ctx, gPrm, p, !ok) if err != nil { if !blobovnicza.IsErrNotFound(err) { b.log.Debug("could not get object from level", diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/exists_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/exists_test.go index 4f466a81..08fd2223 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/exists_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/exists_test.go @@ -1,6 +1,7 @@ package blobovniczatree import ( + "context" "os" "path/filepath" "testing" @@ -44,7 +45,7 @@ func TestExistsInvalidStorageID(t *testing.T) { storageID[0]-- } - res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID}) + res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: storageID}) require.NoError(t, err) require.False(t, res.Exists) }) @@ -57,7 +58,7 @@ func TestExistsInvalidStorageID(t *testing.T) { require.NoError(t, os.Chmod(badDir, 0)) t.Cleanup(func() { _ = os.Chmod(filepath.Join(dir, "9"), os.ModePerm) }) - res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID}) + res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: storageID}) require.Error(t, err) require.False(t, res.Exists) }) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/get.go b/pkg/local_object_storage/blobstor/blobovniczatree/get.go index 89ea9b64..bb84db08 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/get.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/get.go @@ -1,14 +1,19 @@ package blobovniczatree import ( + "context" + "encoding/hex" "fmt" "path/filepath" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -16,7 +21,15 @@ import ( // // If blobocvnicza ID is specified, only this blobovnicza is processed. // Otherwise, all Blobovniczas are processed descending weight. -func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) { +func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.GetRes, err error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Get", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + attribute.Bool("raw", prm.Raw), + )) + defer span.End() + var bPrm blobovnicza.GetPrm bPrm.SetAddress(prm.Address) @@ -27,7 +40,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) { return res, err } - return b.getObject(blz, bPrm) + return b.getObject(ctx, blz, bPrm) } activeCache := make(map[string]struct{}) @@ -37,7 +50,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) { _, ok := activeCache[dirPath] - res, err = b.getObjectFromLevel(bPrm, p, !ok) + res, err = b.getObjectFromLevel(ctx, bPrm, p, !ok) if err != nil { if !blobovnicza.IsErrNotFound(err) { b.log.Debug("could not get object from level", @@ -64,7 +77,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) { // tries to read object from particular blobovnicza. // // returns error if object could not be read from any blobovnicza of the same level. -func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) { +func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) { lvlPath := filepath.Dir(blzPath) // try to read from blobovnicza if it is opened @@ -72,7 +85,7 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string v, ok := b.opened.Get(blzPath) b.lruMtx.Unlock() if ok { - if res, err := b.getObject(v, prm); err == nil { + if res, err := b.getObject(ctx, v, prm); err == nil { return res, err } else if !blobovnicza.IsErrNotFound(err) { b.log.Debug("could not read object from opened blobovnicza", @@ -92,7 +105,7 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string b.activeMtx.RUnlock() if ok && tryActive { - if res, err := b.getObject(active.blz, prm); err == nil { + if res, err := b.getObject(ctx, active.blz, prm); err == nil { return res, err } else if !blobovnicza.IsErrNotFound(err) { b.log.Debug("could not get object from active blobovnicza", @@ -117,12 +130,12 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string return common.GetRes{}, err } - return b.getObject(blz, prm) + return b.getObject(ctx, blz, prm) } // reads object from blobovnicza and returns GetSmallRes. -func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (common.GetRes, error) { - res, err := blz.Get(prm) +func (b *Blobovniczas) getObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (common.GetRes, error) { + res, err := blz.Get(ctx, prm) if err != nil { return common.GetRes{}, err } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go b/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go index 29df2394..b12cb32d 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go @@ -1,14 +1,20 @@ package blobovniczatree import ( + "context" + "encoding/hex" "fmt" "path/filepath" + "strconv" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -16,7 +22,16 @@ import ( // // If blobocvnicza ID is specified, only this blobovnicza is processed. // Otherwise, all Blobovniczas are processed descending weight. -func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes, err error) { +func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (res common.GetRangeRes, err error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.GetRange", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)), + attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)), + )) + defer span.End() + if prm.StorageID != nil { id := blobovnicza.NewIDFromBytes(prm.StorageID) blz, err := b.openBlobovnicza(id.String()) @@ -24,7 +39,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes, return common.GetRangeRes{}, err } - return b.getObjectRange(blz, prm) + return b.getObjectRange(ctx, blz, prm) } activeCache := make(map[string]struct{}) @@ -35,7 +50,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes, _, ok := activeCache[dirPath] - res, err = b.getRangeFromLevel(prm, p, !ok) + res, err = b.getRangeFromLevel(ctx, prm, p, !ok) if err != nil { outOfBounds := isErrOutOfRange(err) if !outOfBounds && !blobovnicza.IsErrNotFound(err) { @@ -68,7 +83,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes, // tries to read range of object payload data from particular blobovnicza. // // returns error if object could not be read from any blobovnicza of the same level. -func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) { +func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) { lvlPath := filepath.Dir(blzPath) // try to read from blobovnicza if it is opened @@ -76,7 +91,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string, v, ok := b.opened.Get(blzPath) b.lruMtx.Unlock() if ok { - res, err := b.getObjectRange(v, prm) + res, err := b.getObjectRange(ctx, v, prm) switch { case err == nil, isErrOutOfRange(err): @@ -101,7 +116,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string, b.activeMtx.RUnlock() if ok && tryActive { - res, err := b.getObjectRange(active.blz, prm) + res, err := b.getObjectRange(ctx, active.blz, prm) switch { case err == nil, isErrOutOfRange(err): @@ -131,11 +146,11 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string, return common.GetRangeRes{}, err } - return b.getObjectRange(blz, prm) + return b.getObjectRange(ctx, blz, prm) } // reads range of object payload data from blobovnicza and returns GetRangeSmallRes. -func (b *Blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm common.GetRangePrm) (common.GetRangeRes, error) { +func (b *Blobovniczas) getObjectRange(ctx context.Context, blz *blobovnicza.Blobovnicza, prm common.GetRangePrm) (common.GetRangeRes, error) { var gPrm blobovnicza.GetPrm gPrm.SetAddress(prm.Address) @@ -143,7 +158,7 @@ func (b *Blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm common.G // stores data that is compressed on BlobStor side. // If blobovnicza learns to do the compression itself, // we can start using GetRange. - res, err := blz.Get(gPrm) + res, err := blz.Get(ctx, gPrm) if err != nil { return common.GetRangeRes{}, err } diff --git a/pkg/local_object_storage/blobstor/blobstor_test.go b/pkg/local_object_storage/blobstor/blobstor_test.go index df001a36..738cd7ee 100644 --- a/pkg/local_object_storage/blobstor/blobstor_test.go +++ b/pkg/local_object_storage/blobstor/blobstor_test.go @@ -1,6 +1,7 @@ package blobstor import ( + "context" "path/filepath" "testing" @@ -62,11 +63,11 @@ func TestCompression(t *testing.T) { } testGet := func(t *testing.T, b *BlobStor, i int) { - res1, err := b.Get(common.GetPrm{Address: object.AddressOf(smallObj[i])}) + res1, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(smallObj[i])}) require.NoError(t, err) require.Equal(t, smallObj[i], res1.Object) - res2, err := b.Get(common.GetPrm{Address: object.AddressOf(bigObj[i])}) + res2, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(bigObj[i])}) require.NoError(t, err) require.Equal(t, bigObj[i], res2.Object) } diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index 76dd6d96..b5d18624 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -1,6 +1,10 @@ package common -import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" +) // Storage represents key-value object storage. // It is used as a building block for a blobstor of a shard. @@ -16,9 +20,9 @@ type Storage interface { // This function MUST be called before Open. SetReportErrorFunc(f func(string, error)) - Get(GetPrm) (GetRes, error) - GetRange(GetRangePrm) (GetRangeRes, error) - Exists(ExistsPrm) (ExistsRes, error) + Get(context.Context, GetPrm) (GetRes, error) + GetRange(context.Context, GetRangePrm) (GetRangeRes, error) + Exists(context.Context, ExistsPrm) (ExistsRes, error) Put(PutPrm) (PutRes, error) Delete(DeletePrm) (DeleteRes, error) Iterate(IteratePrm) (IterateRes, error) diff --git a/pkg/local_object_storage/blobstor/exists.go b/pkg/local_object_storage/blobstor/exists.go index 7a5a0095..5882c33e 100644 --- a/pkg/local_object_storage/blobstor/exists.go +++ b/pkg/local_object_storage/blobstor/exists.go @@ -1,7 +1,13 @@ package blobstor import ( + "context" + "encoding/hex" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -9,15 +15,22 @@ import ( // // Returns any error encountered that did not allow // to completely check object existence. -func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { +func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Exists", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + )) + defer span.End() + b.modeMtx.RLock() defer b.modeMtx.RUnlock() if prm.StorageID != nil { if len(prm.StorageID) == 0 { - return b.storage[len(b.storage)-1].Storage.Exists(prm) + return b.storage[len(b.storage)-1].Storage.Exists(ctx, prm) } - return b.storage[0].Storage.Exists(prm) + return b.storage[0].Storage.Exists(ctx, prm) } // If there was an error during existence check below, @@ -31,7 +44,7 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { // error | error | log the first error, return the second var errors []error for i := range b.storage { - res, err := b.storage[i].Storage.Exists(prm) + res, err := b.storage[i].Storage.Exists(ctx, prm) if err == nil && res.Exists { return res, nil } else if err != nil { diff --git a/pkg/local_object_storage/blobstor/exists_test.go b/pkg/local_object_storage/blobstor/exists_test.go index 4bbc256a..805d7829 100644 --- a/pkg/local_object_storage/blobstor/exists_test.go +++ b/pkg/local_object_storage/blobstor/exists_test.go @@ -1,6 +1,7 @@ package blobstor import ( + "context" "os" "testing" @@ -43,13 +44,13 @@ func TestExists(t *testing.T) { for i := range objects { prm.Address = objectCore.AddressOf(objects[i]) - res, err := b.Exists(prm) + res, err := b.Exists(context.Background(), prm) require.NoError(t, err) require.True(t, res.Exists) } prm.Address = oidtest.Address() - res, err := b.Exists(prm) + res, err := b.Exists(context.Background(), prm) require.NoError(t, err) require.False(t, res.Exists) @@ -60,13 +61,13 @@ func TestExists(t *testing.T) { // Object exists, first error is logged. prm.Address = objectCore.AddressOf(objects[0]) - res, err := b.Exists(prm) + res, err := b.Exists(context.Background(), prm) require.NoError(t, err) require.True(t, res.Exists) // Object doesn't exist, first error is returned. prm.Address = objectCore.AddressOf(objects[1]) - _, err = b.Exists(prm) + _, err = b.Exists(context.Background(), prm) require.Error(t, err) require.ErrorIs(t, err, teststore.ErrDiskExploded) }) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 99484860..462fbd63 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -1,6 +1,7 @@ package fstree import ( + "context" "crypto/sha256" "errors" "fmt" @@ -11,6 +12,7 @@ import ( "strings" "syscall" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -19,6 +21,8 @@ import ( cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // FSTree represents an object storage as a filesystem tree. @@ -208,7 +212,13 @@ func (t *FSTree) Delete(prm common.DeletePrm) (common.DeleteRes, error) { // Exists returns the path to the file with object contents if it exists in the storage // and an error otherwise. -func (t *FSTree) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { +func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "FSTree.Exists", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + )) + defer span.End() + p := t.treePath(prm.Address) _, err := os.Stat(p) @@ -336,16 +346,30 @@ func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error } // Get returns an object from the storage by address. -func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) { +func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.Get", + trace.WithAttributes( + attribute.Bool("raw", prm.Raw), + attribute.String("address", prm.Address.EncodeToString()), + )) + defer span.End() + p := t.treePath(prm.Address) if _, err := os.Stat(p); os.IsNotExist(err) { return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{}) } - data, err := os.ReadFile(p) - if err != nil { - return common.GetRes{}, err + var data []byte + var err error + { + _, span := tracing.StartSpanFromContext(ctx, "FSTree.Get.ReadFile") + defer span.End() + + data, err = os.ReadFile(p) + if err != nil { + return common.GetRes{}, err + } } data, err = t.Decompress(data) @@ -362,8 +386,16 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) { } // GetRange implements common.Storage. -func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { - res, err := t.Get(common.GetPrm{Address: prm.Address}) +func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.GetRange", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)), + attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)), + )) + defer span.End() + + res, err := t.Get(ctx, common.GetPrm{Address: prm.Address}) if err != nil { return common.GetRangeRes{}, err } diff --git a/pkg/local_object_storage/blobstor/get.go b/pkg/local_object_storage/blobstor/get.go index 6caa61b8..65bc87c0 100644 --- a/pkg/local_object_storage/blobstor/get.go +++ b/pkg/local_object_storage/blobstor/get.go @@ -1,23 +1,36 @@ package blobstor import ( + "context" + "encoding/hex" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // Get reads the object from b. // If the descriptor is present, only one sub-storage is tried, // Otherwise, each sub-storage is tried in order. -func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) { +func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Get", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.Bool("raw", prm.Raw), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + )) + defer span.End() + b.modeMtx.RLock() defer b.modeMtx.RUnlock() if prm.StorageID == nil { for i := range b.storage { - res, err := b.storage[i].Storage.Get(prm) + res, err := b.storage[i].Storage.Get(ctx, prm) if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { return res, err } @@ -26,7 +39,7 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) { return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{}) } if len(prm.StorageID) == 0 { - return b.storage[len(b.storage)-1].Storage.Get(prm) + return b.storage[len(b.storage)-1].Storage.Get(ctx, prm) } - return b.storage[0].Storage.Get(prm) + return b.storage[0].Storage.Get(ctx, prm) } diff --git a/pkg/local_object_storage/blobstor/get_range.go b/pkg/local_object_storage/blobstor/get_range.go index 93939cab..ff9e72e9 100644 --- a/pkg/local_object_storage/blobstor/get_range.go +++ b/pkg/local_object_storage/blobstor/get_range.go @@ -1,23 +1,38 @@ package blobstor import ( + "context" + "encoding/hex" "errors" + "strconv" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // GetRange reads object payload data from b. // If the descriptor is present, only one sub-storage is tried, // Otherwise, each sub-storage is tried in order. -func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { +func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.GetRange", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)), + attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)), + )) + defer span.End() + b.modeMtx.RLock() defer b.modeMtx.RUnlock() if prm.StorageID == nil { for i := range b.storage { - res, err := b.storage[i].Storage.GetRange(prm) + res, err := b.storage[i].Storage.GetRange(ctx, prm) if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { return res, err } @@ -26,7 +41,7 @@ func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectNotFound{}) } if len(prm.StorageID) == 0 { - return b.storage[len(b.storage)-1].Storage.GetRange(prm) + return b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm) } - return b.storage[0].Storage.GetRange(prm) + return b.storage[0].Storage.GetRange(ctx, prm) } diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/control.go b/pkg/local_object_storage/blobstor/internal/blobstortest/control.go index 0a74495d..350bea96 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/control.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/control.go @@ -1,6 +1,7 @@ package blobstortest import ( + "context" "math/rand" "testing" @@ -26,7 +27,7 @@ func TestControl(t *testing.T, cons Constructor, min, max uint64) { prm.StorageID = objects[i].storageID prm.Raw = true - _, err := s.Get(prm) + _, err := s.Get(context.Background(), prm) require.NoError(t, err) } diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/delete.go b/pkg/local_object_storage/blobstor/internal/blobstortest/delete.go index f3bb4c3f..ad004531 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/delete.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/delete.go @@ -1,6 +1,7 @@ package blobstortest import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -35,18 +36,18 @@ func TestDelete(t *testing.T, cons Constructor, min, max uint64) { t.Run("exists fail", func(t *testing.T) { prm := common.ExistsPrm{Address: oidtest.Address()} - res, err := s.Exists(prm) + res, err := s.Exists(context.Background(), prm) require.NoError(t, err) require.False(t, res.Exists) }) t.Run("get fail", func(t *testing.T) { prm := common.GetPrm{Address: oidtest.Address()} - _, err := s.Get(prm) + _, err := s.Get(context.Background(), prm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) t.Run("getrange fail", func(t *testing.T) { prm := common.GetRangePrm{Address: oidtest.Address()} - _, err := s.GetRange(prm) + _, err := s.GetRange(context.Background(), prm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) }) @@ -75,7 +76,7 @@ func TestDelete(t *testing.T, cons Constructor, min, max uint64) { prm.Address = objects[3].addr prm.Raw = true - res, err := s.Get(prm) + res, err := s.Get(context.Background(), prm) require.NoError(t, err) require.Equal(t, objects[3].raw, res.RawData) }) diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/exists.go b/pkg/local_object_storage/blobstor/internal/blobstortest/exists.go index ee16ddcb..99f6a79e 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/exists.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/exists.go @@ -1,6 +1,7 @@ package blobstortest import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -18,7 +19,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) { t.Run("missing object", func(t *testing.T) { prm := common.ExistsPrm{Address: oidtest.Address()} - res, err := s.Exists(prm) + res, err := s.Exists(context.Background(), prm) require.NoError(t, err) require.False(t, res.Exists) }) @@ -29,7 +30,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) { t.Run("without storage ID", func(t *testing.T) { prm.StorageID = nil - res, err := s.Exists(prm) + res, err := s.Exists(context.Background(), prm) require.NoError(t, err) require.True(t, res.Exists) }) @@ -37,7 +38,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) { t.Run("with storage ID", func(t *testing.T) { prm.StorageID = objects[0].storageID - res, err := s.Exists(prm) + res, err := s.Exists(context.Background(), prm) require.NoError(t, err) require.True(t, res.Exists) }) diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/get.go b/pkg/local_object_storage/blobstor/internal/blobstortest/get.go index cc3da6b4..c5755dfb 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/get.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/get.go @@ -1,6 +1,7 @@ package blobstortest import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -19,7 +20,7 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) { t.Run("missing object", func(t *testing.T) { gPrm := common.GetPrm{Address: oidtest.Address()} - _, err := s.Get(gPrm) + _, err := s.Get(context.Background(), gPrm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) @@ -29,13 +30,13 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) { // With storage ID. gPrm.StorageID = objects[i].storageID - res, err := s.Get(gPrm) + res, err := s.Get(context.Background(), gPrm) require.NoError(t, err) require.Equal(t, objects[i].obj, res.Object) // Without storage ID. gPrm.StorageID = nil - res, err = s.Get(gPrm) + res, err = s.Get(context.Background(), gPrm) require.NoError(t, err) require.Equal(t, objects[i].obj, res.Object) @@ -43,7 +44,7 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) { gPrm.StorageID = objects[i].storageID gPrm.Raw = true - res, err = s.Get(gPrm) + res, err = s.Get(context.Background(), gPrm) require.NoError(t, err) require.Equal(t, objects[i].raw, res.RawData) } diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/get_range.go b/pkg/local_object_storage/blobstor/internal/blobstortest/get_range.go index e105fe6e..b0c8aa95 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/get_range.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/get_range.go @@ -1,6 +1,7 @@ package blobstortest import ( + "context" "math" "testing" @@ -20,7 +21,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) { t.Run("missing object", func(t *testing.T) { gPrm := common.GetRangePrm{Address: oidtest.Address()} - _, err := s.GetRange(gPrm) + _, err := s.GetRange(context.Background(), gPrm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) @@ -38,14 +39,14 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) { t.Run("without storage ID", func(t *testing.T) { // Without storage ID. - res, err := s.GetRange(gPrm) + res, err := s.GetRange(context.Background(), gPrm) require.NoError(t, err) require.Equal(t, payload[start:stop], res.Data) }) t.Run("with storage ID", func(t *testing.T) { gPrm.StorageID = objects[0].storageID - res, err := s.GetRange(gPrm) + res, err := s.GetRange(context.Background(), gPrm) require.NoError(t, err) require.Equal(t, payload[start:stop], res.Data) }) @@ -54,7 +55,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) { gPrm.Range.SetOffset(uint64(len(payload) + 10)) gPrm.Range.SetLength(10) - _, err := s.GetRange(gPrm) + _, err := s.GetRange(context.Background(), gPrm) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) @@ -62,7 +63,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) { gPrm.Range.SetOffset(10) gPrm.Range.SetLength(uint64(len(payload))) - _, err := s.GetRange(gPrm) + _, err := s.GetRange(context.Background(), gPrm) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) @@ -70,7 +71,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) { gPrm.Range.SetOffset(0) gPrm.Range.SetLength(1 << 63) - _, err := s.GetRange(gPrm) + _, err := s.GetRange(context.Background(), gPrm) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) @@ -78,7 +79,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) { gPrm.Range.SetOffset(10) gPrm.Range.SetLength(math.MaxUint64 - 2) - _, err := s.GetRange(gPrm) + _, err := s.GetRange(context.Background(), gPrm) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) } diff --git a/pkg/local_object_storage/blobstor/memstore/memstore.go b/pkg/local_object_storage/blobstor/memstore/memstore.go index 5f623847..4068d742 100644 --- a/pkg/local_object_storage/blobstor/memstore/memstore.go +++ b/pkg/local_object_storage/blobstor/memstore/memstore.go @@ -2,6 +2,7 @@ package memstore import ( + "context" "fmt" "sync" @@ -32,7 +33,7 @@ func New(opts ...Option) common.Storage { return st } -func (s *memstoreImpl) Get(req common.GetPrm) (common.GetRes, error) { +func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes, error) { key := req.Address.EncodeToString() s.mu.RLock() @@ -58,8 +59,8 @@ func (s *memstoreImpl) Get(req common.GetPrm) (common.GetRes, error) { return common.GetRes{Object: obj, RawData: data}, nil } -func (s *memstoreImpl) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) { - getResp, err := s.Get(common.GetPrm{ +func (s *memstoreImpl) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) { + getResp, err := s.Get(ctx, common.GetPrm{ Address: req.Address, StorageID: req.StorageID, }) @@ -80,7 +81,7 @@ func (s *memstoreImpl) GetRange(req common.GetRangePrm) (common.GetRangeRes, err }, nil } -func (s *memstoreImpl) Exists(req common.ExistsPrm) (common.ExistsRes, error) { +func (s *memstoreImpl) Exists(_ context.Context, req common.ExistsPrm) (common.ExistsRes, error) { key := req.Address.EncodeToString() s.mu.RLock() diff --git a/pkg/local_object_storage/blobstor/memstore/memstore_test.go b/pkg/local_object_storage/blobstor/memstore/memstore_test.go index 531a7d9e..6482b2cf 100644 --- a/pkg/local_object_storage/blobstor/memstore/memstore_test.go +++ b/pkg/local_object_storage/blobstor/memstore/memstore_test.go @@ -1,6 +1,7 @@ package memstore import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -32,13 +33,13 @@ func TestSimpleLifecycle(t *testing.T) { } { - resp, err := s.Exists(common.ExistsPrm{Address: addr}) + resp, err := s.Exists(context.Background(), common.ExistsPrm{Address: addr}) require.NoError(t, err) require.True(t, resp.Exists) } { - resp, err := s.Get(common.GetPrm{Address: addr}) + resp, err := s.Get(context.Background(), common.GetPrm{Address: addr}) require.NoError(t, err) require.Equal(t, obj.Payload(), resp.Object.Payload()) } @@ -47,7 +48,7 @@ func TestSimpleLifecycle(t *testing.T) { var objRange objectSDK.Range objRange.SetOffset(256) objRange.SetLength(512) - resp, err := s.GetRange(common.GetRangePrm{ + resp, err := s.GetRange(context.Background(), common.GetRangePrm{ Address: addr, Range: objRange, }) @@ -61,7 +62,7 @@ func TestSimpleLifecycle(t *testing.T) { } { - resp, err := s.Exists(common.ExistsPrm{Address: addr}) + resp, err := s.Exists(context.Background(), common.ExistsPrm{Address: addr}) require.NoError(t, err) require.False(t, resp.Exists) } diff --git a/pkg/local_object_storage/blobstor/perf_test.go b/pkg/local_object_storage/blobstor/perf_test.go index 0351eb56..d2359335 100644 --- a/pkg/local_object_storage/blobstor/perf_test.go +++ b/pkg/local_object_storage/blobstor/perf_test.go @@ -1,6 +1,7 @@ package blobstor import ( + "context" "fmt" "os" "testing" @@ -127,7 +128,7 @@ func BenchmarkSubstorageReadPerf(b *testing.B) { b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, err := st.Get(common.GetPrm{Address: addrGen.Next()}) + _, err := st.Get(context.Background(), common.GetPrm{Address: addrGen.Next()}) require.NoError(b, err) } }) diff --git a/pkg/local_object_storage/blobstor/teststore/teststore.go b/pkg/local_object_storage/blobstor/teststore/teststore.go index f6ab4607..03f64f0f 100644 --- a/pkg/local_object_storage/blobstor/teststore/teststore.go +++ b/pkg/local_object_storage/blobstor/teststore/teststore.go @@ -13,6 +13,7 @@ package teststore import ( + "context" "errors" "fmt" "sync" @@ -140,36 +141,36 @@ func (s *TestStore) SetReportErrorFunc(f func(string, error)) { } } -func (s *TestStore) Get(req common.GetPrm) (common.GetRes, error) { +func (s *TestStore) Get(ctx context.Context, req common.GetPrm) (common.GetRes, error) { switch { case s.overrides.Get != nil: return s.overrides.Get(req) case s.st != nil: - return s.st.Get(req) + return s.st.Get(ctx, req) default: panic(fmt.Sprintf("unexpected storage call: Get(%+v)", req)) } } -func (s *TestStore) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) { +func (s *TestStore) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) { s.mu.RLock() defer s.mu.RUnlock() switch { case s.overrides.GetRange != nil: return s.overrides.GetRange(req) case s.st != nil: - return s.st.GetRange(req) + return s.st.GetRange(ctx, req) default: panic(fmt.Sprintf("unexpected storage call: GetRange(%+v)", req)) } } -func (s *TestStore) Exists(req common.ExistsPrm) (common.ExistsRes, error) { +func (s *TestStore) Exists(ctx context.Context, req common.ExistsPrm) (common.ExistsRes, error) { switch { case s.overrides.Exists != nil: return s.overrides.Exists(req) case s.st != nil: - return s.st.Exists(req) + return s.st.Exists(ctx, req) default: panic(fmt.Sprintf("unexpected storage call: Exists(%+v)", req)) } diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 12771340..91bec63a 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -212,20 +212,20 @@ func TestExecBlocks(t *testing.T) { require.NoError(t, e.BlockExecution(errBlock)) // try to exec some op - _, err := Head(e, addr) + _, err := Head(context.Background(), e, addr) require.ErrorIs(t, err, errBlock) // resume executions require.NoError(t, e.ResumeExecution()) - _, err = Head(e, addr) // can be any data-related op + _, err = Head(context.Background(), e, addr) // can be any data-related op require.NoError(t, err) // close require.NoError(t, e.Close()) // try exec after close - _, err = Head(e, addr) + _, err = Head(context.Background(), e, addr) require.Error(t, err) // try to resume diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 6ea5728b..2105c452 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -72,7 +72,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e var existsPrm shard.ExistsPrm existsPrm.SetAddress(prm.addr) - resExists, err := sh.Exists(existsPrm) + resExists, err := sh.Exists(ctx, existsPrm) if err != nil { if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { return true diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 54d73cee..259a40a7 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -93,7 +93,7 @@ func checkGetError(t *testing.T, e *StorageEngine, addr oid.Address, expected an var getPrm GetPrm getPrm.WithAddress(addr) - _, err := e.Get(getPrm) + _, err := e.Get(context.Background(), getPrm) if expected != nil { require.ErrorAs(t, err, expected) } else { diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index c9b194f6..4ff019e4 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -102,7 +102,7 @@ func TestErrorReporting(t *testing.T) { te.ng.mtx.RUnlock() require.NoError(t, err) - _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) require.NoError(t, err) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) @@ -115,7 +115,7 @@ func TestErrorReporting(t *testing.T) { } for i := uint32(1); i < 3; i++ { - _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) require.Error(t, err) checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) @@ -136,7 +136,7 @@ func TestErrorReporting(t *testing.T) { te.ng.mtx.RUnlock() require.NoError(t, err) - _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) require.NoError(t, err) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) @@ -149,14 +149,14 @@ func TestErrorReporting(t *testing.T) { } for i := uint32(1); i < errThreshold; i++ { - _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) require.Error(t, err) checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) } for i := uint32(0); i < 2; i++ { - _, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) require.Error(t, err) checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) @@ -193,9 +193,9 @@ func TestBlobstorFailback(t *testing.T) { for i := range objs { addr := object.AddressOf(objs[i]) - _, err = te.ng.Get(GetPrm{addr: addr}) + _, err = te.ng.Get(context.Background(), GetPrm{addr: addr}) require.NoError(t, err) - _, err = te.ng.GetRange(RngPrm{addr: addr}) + _, err = te.ng.GetRange(context.Background(), RngPrm{addr: addr}) require.NoError(t, err) } @@ -213,15 +213,15 @@ func TestBlobstorFailback(t *testing.T) { for i := range objs { addr := object.AddressOf(objs[i]) - getRes, err := te.ng.Get(GetPrm{addr: addr}) + getRes, err := te.ng.Get(context.Background(), GetPrm{addr: addr}) require.NoError(t, err) require.Equal(t, objs[i], getRes.Object()) - rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10}) + rngRes, err := te.ng.GetRange(context.Background(), RngPrm{addr: addr, off: 1, ln: 10}) require.NoError(t, err) require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload()) - _, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1}) + _, err = te.ng.GetRange(context.Background(), RngPrm{addr: addr, off: errSmallSize + 10, ln: 1}) require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{}) } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 457228bb..f16413ea 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "fmt" @@ -58,7 +59,7 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. -func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) { +func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) { shardIDs := make([]string, len(prm.shardID)) for i := range prm.shardID { shardIDs[i] = prm.shardID[i].String() @@ -83,7 +84,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) var res EvacuateShardRes for _, shardID := range shardIDs { - if err = e.evacuateShard(shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil { + if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil { return res, err } } @@ -92,7 +93,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) return res, nil } -func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, +func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { var listPrm shard.ListWithCursorPrm listPrm.WithCount(defaultEvacuateBatchSize) @@ -113,7 +114,7 @@ func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res return err } - if err = e.evacuateObjects(sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil { + if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil { return err } @@ -160,7 +161,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) return shards, weights, nil } -func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, +func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { for i := range toEvacuate { addr := toEvacuate[i].Address @@ -168,7 +169,7 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add var getPrm shard.GetPrm getPrm.SetAddress(addr) - getRes, err := sh.Get(getPrm) + getRes, err := sh.Get(ctx, getPrm) if err != nil { if prm.ignoreErrors { continue @@ -176,7 +177,7 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add return err } - if e.tryEvacuateObject(addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) { + if e.tryEvacuateObject(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) { continue } @@ -195,14 +196,14 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add return nil } -func (e *StorageEngine) tryEvacuateObject(addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes, +func (e *StorageEngine) tryEvacuateObject(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) bool { hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString()))) for j := range shards { if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, object) + putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 51abc4b1..c116aeff 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -91,7 +91,7 @@ func TestEvacuateShard(t *testing.T) { var prm GetPrm prm.WithAddress(objectCore.AddressOf(objects[i])) - _, err := e.Get(prm) + _, err := e.Get(context.Background(), prm) require.NoError(t, err) } } @@ -102,14 +102,14 @@ func TestEvacuateShard(t *testing.T) { prm.WithShardIDList(ids[2:3]) t.Run("must be read-only", func(t *testing.T) { - res, err := e.Evacuate(prm) + res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, shard.ErrMustBeReadOnly) require.Equal(t, 0, res.Count()) }) require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) - res, err := e.Evacuate(prm) + res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) require.Equal(t, objPerShard, res.count) @@ -120,7 +120,7 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. - res, err = e.Evacuate(prm) + res, err = e.Evacuate(context.Background(), prm) require.NoError(t, err) require.Equal(t, 0, res.count) @@ -165,13 +165,13 @@ func TestEvacuateNetwork(t *testing.T) { var prm EvacuateShardPrm prm.shardID = ids[0:1] - res, err := e.Evacuate(prm) + res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) require.Equal(t, 0, res.Count()) prm.handler = acceptOneOf(objects, 2) - res, err = e.Evacuate(prm) + res, err = e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) require.Equal(t, 2, res.Count()) }) @@ -185,14 +185,14 @@ func TestEvacuateNetwork(t *testing.T) { prm.shardID = ids[1:2] prm.handler = acceptOneOf(objects, 2) - res, err := e.Evacuate(prm) + res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) require.Equal(t, 2, res.Count()) t.Run("no errors", func(t *testing.T) { prm.handler = acceptOneOf(objects, 3) - res, err := e.Evacuate(prm) + res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) require.Equal(t, 3, res.Count()) }) @@ -217,14 +217,14 @@ func TestEvacuateNetwork(t *testing.T) { prm.shardID = evacuateIDs prm.handler = acceptOneOf(objects, totalCount-1) - res, err := e.Evacuate(prm) + res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) require.Equal(t, totalCount-1, res.Count()) t.Run("no errors", func(t *testing.T) { prm.handler = acceptOneOf(objects, totalCount) - res, err := e.Evacuate(prm) + res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) require.Equal(t, totalCount, res.Count()) }) diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index a43c7f23..3a8e09a6 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" @@ -16,7 +17,7 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) { exists := false e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - res, err := sh.Exists(shPrm) + res, err := sh.Exists(context.TODO(), shPrm) if err != nil { if shard.IsErrRemoved(err) { alreadyRemoved = true diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 4d0a30bc..7d17b50f 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -1,14 +1,18 @@ package engine import ( + "context" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -43,16 +47,22 @@ func (r GetRes) Object() *objectSDK.Object { // Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Get(prm GetPrm) (res GetRes, err error) { +func (e *StorageEngine) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { err = e.execIfNotBlocked(func() error { - res, err = e.get(prm) + res, err = e.get(ctx, prm) return err }) return } -func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { +func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.get", + trace.WithAttributes( + attribute.String("address", prm.addr.EncodeToString()), + )) + defer span.End() + if e.metrics != nil { defer elapsed(e.metrics.AddGetDuration)() } @@ -69,7 +79,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { Engine: e, } - it.tryGetWithMeta() + it.tryGetWithMeta(ctx) if it.SplitInfo != nil { return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) @@ -84,7 +94,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { return GetRes{}, it.OutError } - it.tryGetFromBlobstore() + it.tryGetFromBlobstore(ctx) if it.Object == nil { return GetRes{}, it.OutError @@ -116,14 +126,14 @@ type getShardIterator struct { splitInfoErr *objectSDK.SplitInfoError } -func (i *getShardIterator) tryGetWithMeta() { +func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() i.ShardPrm.SetIgnoreMeta(noMeta) i.HasDegraded = i.HasDegraded || noMeta - res, err := sh.Get(i.ShardPrm) + res, err := sh.Get(ctx, i.ShardPrm) if err == nil { i.Object = res.Object() return true @@ -162,7 +172,7 @@ func (i *getShardIterator) tryGetWithMeta() { }) } -func (i *getShardIterator) tryGetFromBlobstore() { +func (i *getShardIterator) tryGetFromBlobstore(ctx context.Context) { // If the object is not found but is present in metabase, // try to fetch it from blobstor directly. If it is found in any // blobstor, increase the error counter for the shard which contains the meta. @@ -174,18 +184,18 @@ func (i *getShardIterator) tryGetFromBlobstore() { return false } - res, err := sh.Get(i.ShardPrm) + res, err := sh.Get(ctx, i.ShardPrm) i.Object = res.Object() return err == nil }) } // Get reads object from local storage by provided address. -func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { +func Get(ctx context.Context, storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { var getPrm GetPrm getPrm.WithAddress(addr) - res, err := storage.Get(getPrm) + res, err := storage.Get(ctx, getPrm) if err != nil { return nil, err } diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 689b46de..130e76c3 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -1,8 +1,10 @@ package engine import ( + "context" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -52,16 +54,19 @@ func (r HeadRes) Header() *objectSDK.Object { // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object was inhumed. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Head(prm HeadPrm) (res HeadRes, err error) { +func (e *StorageEngine) Head(ctx context.Context, prm HeadPrm) (res HeadRes, err error) { err = e.execIfNotBlocked(func() error { - res, err = e.head(prm) + res, err = e.head(ctx, prm) return err }) return } -func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { +func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.head") + defer span.End() + if e.metrics != nil { defer elapsed(e.metrics.AddHeadDuration)() } @@ -81,7 +86,7 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { shPrm.SetRaw(prm.raw) e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { - res, err := sh.Head(shPrm) + res, err := sh.Head(ctx, shPrm) if err != nil { switch { case shard.IsErrNotFound(err): @@ -139,11 +144,11 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { } // Head reads object header from local storage by provided address. -func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { +func Head(ctx context.Context, storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { var headPrm HeadPrm headPrm.WithAddress(addr) - res, err := storage.Head(headPrm) + res, err := storage.Head(ctx, headPrm) if err != nil { return nil, err } @@ -153,12 +158,12 @@ func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { // HeadRaw reads object header from local storage by provided address and raw // flag. -func HeadRaw(storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) { +func HeadRaw(ctx context.Context, storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) { var headPrm HeadPrm headPrm.WithAddress(addr) headPrm.WithRaw(raw) - res, err := storage.Head(headPrm) + res, err := storage.Head(ctx, headPrm) if err != nil { return nil, err } diff --git a/pkg/local_object_storage/engine/head_test.go b/pkg/local_object_storage/engine/head_test.go index e2a1edc9..e5fd4b04 100644 --- a/pkg/local_object_storage/engine/head_test.go +++ b/pkg/local_object_storage/engine/head_test.go @@ -1,6 +1,7 @@ package engine import ( + "context" "os" "testing" @@ -66,7 +67,7 @@ func TestHeadRaw(t *testing.T) { headPrm.WithAddress(parentAddr) headPrm.WithRaw(true) - _, err = e.Head(headPrm) + _, err = e.Head(context.Background(), headPrm) require.Error(t, err) var si *object.SplitInfoError diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 2ecca525..db998833 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -134,7 +134,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh if checkExists { existPrm.SetAddress(addr) - exRes, err := sh.Exists(existPrm) + exRes, err := sh.Exists(ctx, existPrm) if err != nil { if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { // inhumed once - no need to be inhumed again diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 20a4d68e..60a1d9c9 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" @@ -69,7 +70,7 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi var existsPrm shard.ExistsPrm existsPrm.SetAddress(addrLocked) - exRes, err := sh.Exists(existsPrm) + exRes, err := sh.Exists(context.TODO(), existsPrm) if err != nil { var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 68a4467f..5f9105ef 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -72,7 +73,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return false } - putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj) + putDone, exists := e.putToShard(context.TODO(), sh, ind, pool, addr, prm.obj) finished = putDone || exists return finished }) @@ -87,7 +88,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) { +func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) { var putSuccess, alreadyExists bool exitCh := make(chan struct{}) @@ -98,7 +99,7 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool var existPrm shard.ExistsPrm existPrm.SetAddress(addr) - exists, err := sh.Exists(existPrm) + exists, err := sh.Exists(ctx, existPrm) if err != nil { if shard.IsErrObjectExpired(err) { // object is already found but diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 25b533bd..3d119ac6 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -1,14 +1,19 @@ package engine import ( + "context" "errors" + "strconv" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -56,16 +61,24 @@ func (r RngRes) Object() *objectSDK.Object { // Returns ErrRangeOutOfBounds if the requested object range is out of bounds. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) GetRange(prm RngPrm) (res RngRes, err error) { +func (e *StorageEngine) GetRange(ctx context.Context, prm RngPrm) (res RngRes, err error) { err = e.execIfNotBlocked(func() error { - res, err = e.getRange(prm) + res, err = e.getRange(ctx, prm) return err }) return } -func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { +func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getRange", + trace.WithAttributes( + attribute.String("address", prm.addr.EncodeToString()), + attribute.String("offset", strconv.FormatUint(prm.off, 10)), + attribute.String("length", strconv.FormatUint(prm.ln, 10)), + )) + defer span.End() + if e.metrics != nil { defer elapsed(e.metrics.AddRangeDuration)() } @@ -83,7 +96,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { Engine: e, } - it.tryGetWithMeta() + it.tryGetWithMeta(ctx) if it.SplitInfo != nil { return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) @@ -96,7 +109,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { return RngRes{}, it.OutError } - it.tryGetFromBlobstor() + it.tryGetFromBlobstor(ctx) if it.Object == nil { return RngRes{}, it.OutError @@ -114,12 +127,12 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { } // GetRange reads object payload range from local storage by provided address. -func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) { +func GetRange(ctx context.Context, storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) { var rangePrm RngPrm rangePrm.WithAddress(addr) rangePrm.WithPayloadRange(rng) - res, err := storage.GetRange(rangePrm) + res, err := storage.GetRange(ctx, rangePrm) if err != nil { return nil, err } @@ -141,13 +154,13 @@ type getRangeShardIterator struct { Engine *StorageEngine } -func (i *getRangeShardIterator) tryGetWithMeta() { +func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) { i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() i.HasDegraded = i.HasDegraded || noMeta i.ShardPrm.SetIgnoreMeta(noMeta) - res, err := sh.GetRange(i.ShardPrm) + res, err := sh.GetRange(ctx, i.ShardPrm) if err == nil { i.Object = res.Object() return true @@ -185,7 +198,7 @@ func (i *getRangeShardIterator) tryGetWithMeta() { }) } -func (i *getRangeShardIterator) tryGetFromBlobstor() { +func (i *getRangeShardIterator) tryGetFromBlobstor(ctx context.Context) { // If the object is not found but is present in metabase, // try to fetch it from blobstor directly. If it is found in any // blobstor, increase the error counter for the shard which contains the meta. @@ -197,7 +210,7 @@ func (i *getRangeShardIterator) tryGetFromBlobstor() { return false } - res, err := sh.GetRange(i.ShardPrm) + res, err := sh.GetRange(ctx, i.ShardPrm) if shard.IsErrOutOfRange(err) { var errOutOfRange apistatus.ObjectOutOfRange diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go index d881a52d..c50c0844 100644 --- a/pkg/local_object_storage/engine/remove_copies.go +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -116,7 +116,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address var existsPrm shard.ExistsPrm existsPrm.SetAddress(addr) - res, err := shards[i].Exists(existsPrm) + res, err := shards[i].Exists(ctx, existsPrm) if err != nil { return err } else if !res.Exists() { diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index df7e536c..50ea20bb 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -63,6 +63,7 @@ func TestShardOpen(t *testing.T) { newShard := func() *Shard { return New( + WithID(NewIDFromBytes([]byte{})), WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), WithBlobStorOptions( blobstor.WithStorages([]blobstor.SubStorage{ @@ -146,6 +147,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { require.NoError(t, err) sh = New( + WithID(NewIDFromBytes([]byte{})), WithBlobStorOptions(blobOpts...), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), @@ -155,7 +157,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { var getPrm GetPrm getPrm.SetAddress(addr) - _, err = sh.Get(getPrm) + _, err = sh.Get(context.Background(), getPrm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) require.NoError(t, sh.Close()) } @@ -176,6 +178,7 @@ func TestRefillMetabase(t *testing.T) { } sh := New( + WithID(NewIDFromBytes([]byte{})), WithBlobStorOptions(blobOpts...), WithMetaBaseOptions( meta.WithPath(filepath.Join(p, "meta")), @@ -277,7 +280,7 @@ func TestRefillMetabase(t *testing.T) { checkObj := func(addr oid.Address, expObj *objectSDK.Object) { headPrm.SetAddress(addr) - res, err := sh.Head(headPrm) + res, err := sh.Head(context.Background(), headPrm) if expObj == nil { require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) @@ -302,7 +305,7 @@ func TestRefillMetabase(t *testing.T) { for _, member := range tombMembers { headPrm.SetAddress(member) - _, err := sh.Head(headPrm) + _, err := sh.Head(context.Background(), headPrm) if exists { require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) @@ -343,6 +346,7 @@ func TestRefillMetabase(t *testing.T) { require.NoError(t, err) sh = New( + WithID(NewIDFromBytes([]byte{})), WithBlobStorOptions(blobOpts...), WithMetaBaseOptions( meta.WithPath(filepath.Join(p, "meta_restored")), diff --git a/pkg/local_object_storage/shard/delete_test.go b/pkg/local_object_storage/shard/delete_test.go index 9115f3e0..c37dfa28 100644 --- a/pkg/local_object_storage/shard/delete_test.go +++ b/pkg/local_object_storage/shard/delete_test.go @@ -1,6 +1,7 @@ package shard_test import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -51,7 +52,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) { _, err = sh.Delete(delPrm) require.NoError(t, err) - _, err = sh.Get(getPrm) + _, err = sh.Get(context.Background(), getPrm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) @@ -69,13 +70,13 @@ func testShardDelete(t *testing.T, hasWriteCache bool) { _, err := sh.Put(putPrm) require.NoError(t, err) - _, err = sh.Get(getPrm) + _, err = sh.Get(context.Background(), getPrm) require.NoError(t, err) _, err = sh.Delete(delPrm) require.NoError(t, err) - _, err = sh.Get(getPrm) + _, err = sh.Get(context.Background(), getPrm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) } diff --git a/pkg/local_object_storage/shard/dump_test.go b/pkg/local_object_storage/shard/dump_test.go index 65427dd5..9d585cc0 100644 --- a/pkg/local_object_storage/shard/dump_test.go +++ b/pkg/local_object_storage/shard/dump_test.go @@ -2,6 +2,7 @@ package shard_test import ( "bytes" + "context" "io" "math/rand" "os" @@ -276,7 +277,7 @@ func checkRestore(t *testing.T, sh *shard.Shard, prm shard.RestorePrm, objects [ for i := range objects { getPrm.SetAddress(object.AddressOf(objects[i])) - res, err := sh.Get(getPrm) + res, err := sh.Get(context.Background(), getPrm) require.NoError(t, err) require.Equal(t, objects[i], res.Object()) } diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 35b9cba9..76e4347d 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -1,6 +1,8 @@ package shard import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -33,7 +35,7 @@ func (p ExistsRes) Exists() bool { // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. -func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) { +func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) { var exists bool var err error @@ -45,7 +47,7 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) { p.Address = prm.addr var res common.ExistsRes - res, err = s.blobStor.Exists(p) + res, err = s.blobStor.Exists(ctx, p) exists = res.Exists } else { var existsPrm meta.ExistsPrm diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index e7aa3614..8012e60f 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -33,6 +33,7 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) { rootPath := t.TempDir() opts := []shard.Option{ + shard.WithID(shard.NewIDFromBytes([]byte{})), shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}), shard.WithBlobStorOptions( blobstor.WithStorages([]blobstor.SubStorage{ @@ -115,7 +116,7 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) { var getPrm shard.GetPrm getPrm.SetAddress(objectCore.AddressOf(obj)) require.Eventually(t, func() bool { - _, err = sh.Get(getPrm) + _, err = sh.Get(context.Background(), getPrm) return shard.IsErrNotFound(err) }, 3*time.Second, 1*time.Second, "expired object must be deleted") } diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 4231c01d..3406b933 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -1,8 +1,10 @@ package shard import ( + "context" "fmt" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" @@ -11,6 +13,8 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -61,7 +65,15 @@ func (r GetRes) HasMeta() bool { // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. -func (s *Shard) Get(prm GetPrm) (GetRes, error) { +func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Get", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("address", prm.addr.EncodeToString()), + attribute.Bool("skip_meta", prm.skipMeta), + )) + defer span.End() + s.m.RLock() defer s.m.RUnlock() @@ -70,7 +82,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) { getPrm.Address = prm.addr getPrm.StorageID = id - res, err := stor.Get(getPrm) + res, err := stor.Get(ctx, getPrm) if err != nil { return nil, err } @@ -79,7 +91,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) { } wc := func(c writecache.Cache) (*objectSDK.Object, error) { - return c.Get(prm.addr) + return c.Get(ctx, prm.addr) } skipMeta := prm.skipMeta || s.info.Mode.NoMetabase() diff --git a/pkg/local_object_storage/shard/get_test.go b/pkg/local_object_storage/shard/get_test.go index 9d197533..f670b286 100644 --- a/pkg/local_object_storage/shard/get_test.go +++ b/pkg/local_object_storage/shard/get_test.go @@ -2,6 +2,7 @@ package shard_test import ( "bytes" + "context" "errors" "testing" "time" @@ -111,11 +112,11 @@ func testShardGet(t *testing.T, hasWriteCache bool) { } func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (shard.GetRes, error) { - res, err := sh.Get(getPrm) + res, err := sh.Get(context.Background(), getPrm) if hasWriteCache { require.Eventually(t, func() bool { if shard.IsErrNotFound(err) { - res, err = sh.Get(getPrm) + res, err = sh.Get(context.Background(), getPrm) } return !shard.IsErrNotFound(err) }, time.Second, time.Millisecond*100) diff --git a/pkg/local_object_storage/shard/head.go b/pkg/local_object_storage/shard/head.go index 6913d316..8e8ff943 100644 --- a/pkg/local_object_storage/shard/head.go +++ b/pkg/local_object_storage/shard/head.go @@ -1,9 +1,14 @@ package shard import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // HeadPrm groups the parameters of Head operation. @@ -43,7 +48,15 @@ func (r HeadRes) Object() *objectSDK.Object { // Returns an error of type apistatus.ObjectNotFound if object is missing in Shard. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. -func (s *Shard) Head(prm HeadPrm) (HeadRes, error) { +func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Head", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("address", prm.addr.EncodeToString()), + attribute.Bool("raw", prm.raw), + )) + defer span.End() + var obj *objectSDK.Object var err error if s.GetMode().NoMetabase() { @@ -52,7 +65,7 @@ func (s *Shard) Head(prm HeadPrm) (HeadRes, error) { getPrm.SetIgnoreMeta(true) var res GetRes - res, err = s.Get(getPrm) + res, err = s.Get(ctx, getPrm) obj = res.Object() } else { var headParams meta.GetPrm diff --git a/pkg/local_object_storage/shard/head_test.go b/pkg/local_object_storage/shard/head_test.go index 36c8915b..449626e9 100644 --- a/pkg/local_object_storage/shard/head_test.go +++ b/pkg/local_object_storage/shard/head_test.go @@ -1,6 +1,7 @@ package shard_test import ( + "context" "errors" "testing" "time" @@ -75,18 +76,18 @@ func testShardHead(t *testing.T, hasWriteCache bool) { headPrm.SetAddress(object.AddressOf(parent)) headPrm.SetRaw(false) - head, err := sh.Head(headPrm) + head, err := sh.Head(context.Background(), headPrm) require.NoError(t, err) require.Equal(t, parent.CutPayload(), head.Object()) }) } func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (shard.HeadRes, error) { - res, err := sh.Head(headPrm) + res, err := sh.Head(context.Background(), headPrm) if hasWriteCache { require.Eventually(t, func() bool { if shard.IsErrNotFound(err) { - res, err = sh.Head(headPrm) + res, err = sh.Head(context.Background(), headPrm) } return !shard.IsErrNotFound(err) }, time.Second, time.Millisecond*100) diff --git a/pkg/local_object_storage/shard/inhume_test.go b/pkg/local_object_storage/shard/inhume_test.go index 191afab0..41845c41 100644 --- a/pkg/local_object_storage/shard/inhume_test.go +++ b/pkg/local_object_storage/shard/inhume_test.go @@ -51,6 +51,6 @@ func testShardInhume(t *testing.T, hasWriteCache bool) { _, err = sh.Inhume(context.Background(), inhPrm) require.NoError(t, err) - _, err = sh.Get(getPrm) + _, err = sh.Get(context.Background(), getPrm) require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) } diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index 995aa147..2bee6629 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -27,6 +27,7 @@ func TestShard_Lock(t *testing.T) { rootPath := t.TempDir() opts := []shard.Option{ + shard.WithID(shard.NewIDFromBytes([]byte{})), shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}), shard.WithBlobStorOptions( blobstor.WithStorages([]blobstor.SubStorage{ @@ -137,7 +138,7 @@ func TestShard_Lock(t *testing.T) { var getPrm shard.GetPrm getPrm.SetAddress(objectcore.AddressOf(obj)) - _, err = sh.Get(getPrm) + _, err = sh.Get(context.Background(), getPrm) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) } diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index c9106c23..4355c31a 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -1,6 +1,10 @@ package shard import ( + "context" + "strconv" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -8,6 +12,8 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // RngPrm groups the parameters of GetRange operation. @@ -66,7 +72,17 @@ func (r RngRes) HasMeta() bool { // Returns an error of type apistatus.ObjectNotFound if the requested object is missing. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. // Returns the object.ErrObjectIsExpired if the object is presented but already expired. -func (s *Shard) GetRange(prm RngPrm) (RngRes, error) { +func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetRange", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("address", prm.addr.EncodeToString()), + attribute.Bool("skip_meta", prm.skipMeta), + attribute.String("offset", strconv.FormatUint(prm.off, 10)), + attribute.String("length", strconv.FormatUint(prm.ln, 10)), + )) + defer span.End() + s.m.RLock() defer s.m.RUnlock() @@ -77,7 +93,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) { getRngPrm.Range.SetLength(prm.ln) getRngPrm.StorageID = id - res, err := stor.GetRange(getRngPrm) + res, err := stor.GetRange(ctx, getRngPrm) if err != nil { return nil, err } @@ -89,7 +105,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) { } wc := func(c writecache.Cache) (*object.Object, error) { - res, err := c.Get(prm.addr) + res, err := c.Get(ctx, prm.addr) if err != nil { return nil, err } diff --git a/pkg/local_object_storage/shard/range_test.go b/pkg/local_object_storage/shard/range_test.go index 6782dca1..16418121 100644 --- a/pkg/local_object_storage/shard/range_test.go +++ b/pkg/local_object_storage/shard/range_test.go @@ -1,6 +1,7 @@ package shard_test import ( + "context" "math" "path/filepath" "testing" @@ -105,7 +106,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) { rngPrm.SetAddress(addr) rngPrm.SetRange(tc.rng.GetOffset(), tc.rng.GetLength()) - res, err := sh.GetRange(rngPrm) + res, err := sh.GetRange(context.Background(), rngPrm) if tc.hasErr { require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{}) } else { diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index 7aa331c7..1bfa33dd 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -66,7 +66,7 @@ func TestShardReload(t *testing.T) { var prm ExistsPrm prm.SetAddress(objects[i].addr) - res, err := sh.Exists(prm) + res, err := sh.Exists(context.Background(), prm) require.NoError(t, err) require.Equal(t, exists, res.Exists(), "object #%d is missing", i) } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 027e6ca7..fea34276 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -63,6 +63,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts } opts := []shard.Option{ + shard.WithID(shard.NewIDFromBytes([]byte{})), shard.WithLogger(&logger.Logger{Logger: zap.L()}), shard.WithBlobStorOptions(bsOpts...), shard.WithMetaBaseOptions( diff --git a/pkg/local_object_storage/shard/shutdown_test.go b/pkg/local_object_storage/shard/shutdown_test.go index 57a98268..5fd13221 100644 --- a/pkg/local_object_storage/shard/shutdown_test.go +++ b/pkg/local_object_storage/shard/shutdown_test.go @@ -1,6 +1,7 @@ package shard_test import ( + "context" "math/rand" "testing" @@ -55,7 +56,7 @@ func TestWriteCacheObjectLoss(t *testing.T) { for i := range objects { getPrm.SetAddress(object.AddressOf(objects[i])) - _, err := sh.Get(getPrm) + _, err := sh.Get(context.Background(), getPrm) require.NoError(t, err, i) } } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index a6c2035d..9dc216fb 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -1,6 +1,7 @@ package writecache import ( + "context" "os" "path/filepath" "testing" @@ -95,7 +96,7 @@ func TestFlush(t *testing.T) { prm.Address = objects[i].addr prm.StorageID = mRes.StorageID() - res, err := bs.Get(prm) + res, err := bs.Get(context.Background(), prm) require.NoError(t, err) require.Equal(t, objects[i].obj, res.Object) } @@ -119,7 +120,7 @@ func TestFlush(t *testing.T) { _, err := mb.Get(mPrm) require.Error(t, err) - _, err = bs.Get(common.GetPrm{Address: objects[i].addr}) + _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) require.Error(t, err) } @@ -149,7 +150,7 @@ func TestFlush(t *testing.T) { _, err := mb.Get(mPrm) require.Error(t, err) - _, err = bs.Get(common.GetPrm{Address: objects[i].addr}) + _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) require.Error(t, err) } @@ -266,7 +267,7 @@ func TestFlush(t *testing.T) { require.NoError(t, wc.Open(true)) initWC(t, wc) for i := range objects { - _, err := wc.Get(objects[i].addr) + _, err := wc.Get(context.Background(), objects[i].addr) require.NoError(t, err, i) } require.NoError(t, wc.Close()) @@ -275,7 +276,7 @@ func TestFlush(t *testing.T) { require.NoError(t, wc.Open(false)) initWC(t, wc) for i := range objects { - _, err := wc.Get(objects[i].addr) + _, err := wc.Get(context.Background(), objects[i].addr) if i < 2 { require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i) } else { diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index a15f42e1..6af1bd18 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -1,6 +1,9 @@ package writecache import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -8,14 +11,22 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/nspcc-dev/neo-go/pkg/util/slice" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // Get returns object from write-cache. // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. -func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) { +func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { saddr := addr.EncodeToString() + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get", + trace.WithAttributes( + attribute.String("address", saddr), + )) + defer span.End() + value, err := Get(c.db, []byte(saddr)) if err == nil { obj := objectSDK.New() @@ -23,7 +34,7 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) { return obj, obj.Unmarshal(value) } - res, err := c.fsTree.Get(common.GetPrm{Address: addr}) + res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr}) if err != nil { return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) } @@ -35,8 +46,14 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) { // Head returns object header from write-cache. // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. -func (c *cache) Head(addr oid.Address) (*objectSDK.Object, error) { - obj, err := c.Get(addr) +func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head", + trace.WithAttributes( + attribute.String("address", addr.EncodeToString()), + )) + defer span.End() + + obj, err := c.Get(ctx, addr) if err != nil { return nil, err } diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index 56b27ec4..ffe7a012 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -1,6 +1,7 @@ package writecache import ( + "context" "errors" "sync" @@ -177,6 +178,6 @@ func (c *cache) flushStatus(addr oid.Address) (bool, bool) { prm.SetAddress(addr) mRes, _ := c.metabase.StorageID(prm) - res, err := c.blobstor.Exists(common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()}) + res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()}) return err == nil && res.Exists, false } diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 59c7c9d4..cca8986b 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -1,6 +1,7 @@ package writecache import ( + "context" "io/fs" "os" "time" @@ -27,7 +28,7 @@ type metabase interface { type blob interface { Put(common.PutPrm) (common.PutRes, error) NeedsCompression(obj *objectSDK.Object) bool - Exists(res common.ExistsPrm) (common.ExistsRes, error) + Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error) } type options struct { diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index a5b8ff0a..24070dbd 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -1,6 +1,7 @@ package writecache import ( + "context" "os" "sync" @@ -23,8 +24,8 @@ type Info struct { // Cache represents write-cache for objects. type Cache interface { - Get(address oid.Address) (*object.Object, error) - Head(oid.Address) (*object.Object, error) + Get(ctx context.Context, address oid.Address) (*object.Object, error) + Head(context.Context, oid.Address) (*object.Object, error) // Delete removes object referenced by the given oid.Address from the // Cache. Returns any error encountered that prevented the object to be // removed. diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 6c064efa..b64a9188 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -19,7 +19,7 @@ import ( "google.golang.org/grpc/status" ) -func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) { +func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) { err := s.isValidRequest(req) if err != nil { return nil, status.Error(codes.PermissionDenied, err.Error()) @@ -30,7 +30,7 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) prm.WithFaultHandler(s.replicate) - res, err := s.s.Evacuate(prm) + res, err := s.s.Evacuate(ctx, prm) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/notificator/deps.go b/pkg/services/notificator/deps.go index ded4b4b7..d6330f78 100644 --- a/pkg/services/notificator/deps.go +++ b/pkg/services/notificator/deps.go @@ -1,6 +1,8 @@ package notificator import ( + "context" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) @@ -8,7 +10,7 @@ import ( type NotificationSource interface { // Iterate must iterate over all notifications for the // provided epoch and call handler for all of them. - Iterate(epoch uint64, handler func(topic string, addr oid.Address)) + Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) } // NotificationWriter notifies all the subscribers diff --git a/pkg/services/notificator/service.go b/pkg/services/notificator/service.go index 09661830..0a8a5d96 100644 --- a/pkg/services/notificator/service.go +++ b/pkg/services/notificator/service.go @@ -1,6 +1,7 @@ package notificator import ( + "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -71,11 +72,11 @@ func New(prm *Prm) *Notificator { // ProcessEpoch looks for all objects with defined epoch in the storage // and passes their addresses to the NotificationWriter. -func (n *Notificator) ProcessEpoch(epoch uint64) { +func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) { logger := n.l.With(zap.Uint64("epoch", epoch)) logger.Debug("notificator: start processing object notifications") - n.ns.Iterate(epoch, func(topic string, addr oid.Address) { + n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) { n.l.Debug("notificator: processing object notification", zap.String("topic", topic), zap.Stringer("address", addr), diff --git a/pkg/services/object/acl/eacl/v2/eacl_test.go b/pkg/services/object/acl/eacl/v2/eacl_test.go index 4570e271..ce5d98d5 100644 --- a/pkg/services/object/acl/eacl/v2/eacl_test.go +++ b/pkg/services/object/acl/eacl/v2/eacl_test.go @@ -1,6 +1,7 @@ package v2 import ( + "context" "crypto/ecdsa" "errors" "testing" @@ -26,7 +27,7 @@ type testLocalStorage struct { err error } -func (s *testLocalStorage) Head(addr oid.Address) (*object.Object, error) { +func (s *testLocalStorage) Head(ctx context.Context, addr oid.Address) (*object.Object, error) { require.True(s.t, addr.Container().Equals(s.expAddr.Container())) require.True(s.t, addr.Object().Equals(s.expAddr.Object())) diff --git a/pkg/services/object/acl/eacl/v2/headers.go b/pkg/services/object/acl/eacl/v2/headers.go index 736c0576..09581084 100644 --- a/pkg/services/object/acl/eacl/v2/headers.go +++ b/pkg/services/object/acl/eacl/v2/headers.go @@ -1,6 +1,7 @@ package v2 import ( + "context" "errors" "fmt" @@ -27,7 +28,7 @@ type cfg struct { } type ObjectStorage interface { - Head(oid.Address) (*object.Object, error) + Head(context.Context, oid.Address) (*object.Object, error) } type Request interface { @@ -207,7 +208,7 @@ func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, b addr.SetContainer(cnr) addr.SetObject(*idObj) - obj, err := h.storage.Head(addr) + obj, err := h.storage.Head(context.TODO(), addr) if err == nil { return headersFromObject(obj, cnr, idObj), true } diff --git a/pkg/services/object/acl/eacl/v2/localstore.go b/pkg/services/object/acl/eacl/v2/localstore.go index 40271f1c..0f23e988 100644 --- a/pkg/services/object/acl/eacl/v2/localstore.go +++ b/pkg/services/object/acl/eacl/v2/localstore.go @@ -1,6 +1,7 @@ package v2 import ( + "context" "io" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" @@ -12,10 +13,10 @@ type localStorage struct { ls *engine.StorageEngine } -func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) { +func (s *localStorage) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { if s.ls == nil { return nil, io.ErrUnexpectedEOF } - return engine.Head(s.ls, addr) + return engine.Head(ctx, s.ls, addr) } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 3d1a95cb..319bc6b5 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -139,7 +139,7 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro }{obj: obj, err: err} } -func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) { +func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) { var ( ok bool obj *objectSDK.Object diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index a6a77729..82ed911e 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -4,15 +4,21 @@ import ( "context" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" ) func (exec *execCtx) executeLocal(ctx context.Context) { + ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal") + defer func() { + span.End() + }() + var err error - exec.collectedObject, err = exec.svc.localStorage.get(exec) + exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec) var errSplitInfo *objectSDK.SplitInfoError var errRemoved apistatus.ObjectAlreadyRemoved diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index 1532bade..697e48ee 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -11,6 +12,9 @@ import ( ) func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool { + ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode") + defer span.End() + exec.log.Debug("processing node...") client, ok := exec.remoteClient(info) diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index dfa3b48a..a9391d01 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -31,7 +31,7 @@ type cfg struct { log *logger.Logger localStorage interface { - get(*execCtx) (*object.Object, error) + get(context.Context, *execCtx) (*object.Object, error) } clientCache interface { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 09d8c67a..dd4ace40 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -200,13 +200,13 @@ func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.Priva return res.Object(), nil } -func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { +func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) { if exec.headOnly() { var headPrm engine.HeadPrm headPrm.WithAddress(exec.address()) headPrm.WithRaw(exec.isRaw()) - r, err := e.engine.Head(headPrm) + r, err := e.engine.Head(ctx, headPrm) if err != nil { return nil, err } @@ -217,7 +217,7 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { getRange.WithAddress(exec.address()) getRange.WithPayloadRange(rng) - r, err := e.engine.GetRange(getRange) + r, err := e.engine.GetRange(ctx, getRange) if err != nil { return nil, err } @@ -227,7 +227,7 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { var getPrm engine.GetPrm getPrm.WithAddress(exec.address()) - r, err := e.engine.Get(getPrm) + r, err := e.engine.Get(ctx, getPrm) if err != nil { return nil, err } diff --git a/pkg/services/object/get/v2/get_forwarder.go b/pkg/services/object/get/v2/get_forwarder.go index 330a0642..8163ae92 100644 --- a/pkg/services/object/get/v2/get_forwarder.go +++ b/pkg/services/object/get/v2/get_forwarder.go @@ -9,6 +9,7 @@ import ( "sync" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" @@ -18,6 +19,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type getRequestForwarder struct { @@ -30,6 +33,11 @@ type getRequestForwarder struct { } func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "getRequestForwarder.forwardRequestToNode", + trace.WithAttributes(attribute.String("address", addr.String())), + ) + defer span.End() + var err error // once compose and resign forwarding request diff --git a/pkg/services/object/get/v2/get_range_forwarder.go b/pkg/services/object/get/v2/get_range_forwarder.go index 5893f8de..9cf6384e 100644 --- a/pkg/services/object/get/v2/get_range_forwarder.go +++ b/pkg/services/object/get/v2/get_range_forwarder.go @@ -9,6 +9,7 @@ import ( "sync" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" @@ -18,6 +19,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type getRangeRequestForwarder struct { @@ -29,6 +32,11 @@ type getRangeRequestForwarder struct { } func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "getRangeRequestForwarder.forwardRequestToNode", + trace.WithAttributes(attribute.String("address", addr.String())), + ) + defer span.End() + var err error // once compose and resign forwarding request diff --git a/pkg/services/object/get/v2/head_forwarder.go b/pkg/services/object/get/v2/head_forwarder.go index 45c0174f..e1d4c02d 100644 --- a/pkg/services/object/get/v2/head_forwarder.go +++ b/pkg/services/object/get/v2/head_forwarder.go @@ -8,6 +8,7 @@ import ( "sync" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" @@ -19,6 +20,8 @@ import ( frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type headRequestForwarder struct { @@ -30,6 +33,11 @@ type headRequestForwarder struct { } func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "headRequestForwarder.forwardRequestToNode", + trace.WithAttributes(attribute.String("address", addr.String())), + ) + defer span.End() + var err error // once compose and resign forwarding request diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index 3fd8cd04..1bd8befa 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -96,7 +96,7 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV resp := new(objectV2.HeadResponse) resp.SetBody(new(objectV2.HeadResponseBody)) - p, err := s.toHeadPrm(ctx, req, resp) + p, err := s.toHeadPrm(req, resp) if err != nil { return nil, err } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 3a50a6ca..69bed23f 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -215,7 +215,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object) return nil } -func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { +func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { body := req.GetBody() addrV2 := body.GetAddress() diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 476a5bc0..53df81b7 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -27,7 +27,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) if task.obj == nil { var err error - task.obj, err = engine.Get(p.localStorage, task.addr) + task.obj, err = engine.Get(ctx, p.localStorage, task.addr) if err != nil { p.log.Error("could not get object from local storage", zap.Stringer("object", task.addr),