Tracing for get object #135

Merged
fyrchik merged 5 commits from dstepanov-yadro/frostfs-node:feat/OBJECT-3310 into master 2023-04-12 06:52:02 +00:00
95 changed files with 766 additions and 255 deletions

View file

@ -15,6 +15,7 @@ Changelog for FrostFS Node
- Multiple configs support (#44)
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
- Tree service now saves the last synchronization height which persists across restarts (#82)
- Add tracing support (#135)
### Changed
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects

View file

@ -5,6 +5,7 @@ import (
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
@ -22,8 +23,15 @@ func _client(ctx context.Context) (tree.TreeServiceClient, error) {
return nil, err
}
opts := make([]grpc.DialOption, 1, 2)
opts[0] = grpc.WithBlock()
opts := []grpc.DialOption{

params 3, 4 seems a little bit inconvinient :). 3 is enough (keeping a room for grpc.DialOption doesn't make big profit)

How about to initialize it like that?

opts = []grpc.DialOption {  
     grpc.WithBlock(), 
     grpc.WithChainUnaryInterceptor(
		tracing.NewGRPCUnaryClientInteceptor(),
	 ),
     grpc.WithChainStreamInterceptor(
		tracing.NewGRPCStreamClientInterceptor(),
	),
}
params `3, 4` seems a little bit inconvinient :). `3` is enough (keeping a room for `grpc.DialOption` doesn't make big profit) How about to initialize it like that? ``` opts = []grpc.DialOption { grpc.WithBlock(), grpc.WithChainUnaryInterceptor( tracing.NewGRPCUnaryClientInteceptor(), ), grpc.WithChainStreamInterceptor( tracing.NewGRPCStreamClientInterceptor(), ), } ```

Agree, fixed. I hope it wasn't the most important optimization.

Agree, fixed. I hope it wasn't the most important optimization.
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor(
tracing.NewGRPCUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
tracing.NewGRPCStreamClientInterceptor(),
),
}
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))

View file

@ -33,7 +33,7 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
var prm blobovnicza.GetPrm
prm.SetAddress(addr)
res, err := blz.Get(prm)
res, err := blz.Get(cmd.Context(), prm)
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))
data := res.Object()

View file

@ -16,6 +16,7 @@ import (
"time"
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
@ -27,6 +28,7 @@ import (
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -1055,6 +1057,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
}
components = append(components, dCmp{"logger", logPrm.Reload})
components = append(components, dCmp{"tracing", func() error {
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
if updated {
c.log.Info("tracing configation updated")
}
return err
}})
if cmp, updated := metricsComponent(c); updated {
if cmp.enabled {
cmp.preReload = enableMetricsSvc

View file

@ -0,0 +1,31 @@
package tracing
import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
)
const (
subsection = "tracing"
)
// ToTracingConfig extracts tracing config.
func ToTracingConfig(c *config.Config) *tracing.Config {
return &tracing.Config{
Enabled: config.BoolSafe(c.Sub(subsection), "enabled"),
Exporter: tracing.Exporter(config.StringSafe(c.Sub(subsection), "exporter")),
Endpoint: config.StringSafe(c.Sub(subsection), "endpoint"),
Service: "frostfs-node",
InstanceID: getInstanceIDOrDefault(c),
Version: misc.Version,
}
}
func getInstanceIDOrDefault(c *config.Config) string {
s := config.StringSlice(c.Sub("node"), "addresses")
if len(s) > 0 {
return s[0]
}
return ""
}

View file

@ -7,6 +7,7 @@ import (
"net"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
grpcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
@ -19,6 +20,12 @@ func initGRPC(c *cfg) {
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
grpc.ChainUnaryInterceptor(
tracing.NewGRPCUnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
tracing.NewGRPCStreamServerInterceptor(),
),
}
tlsCfg := sc.TLS()

View file

@ -87,6 +87,8 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, pprof.name, pprof.init)
initAndLog(c, metrics.name, metrics.init)
initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) })
initLocalStorage(c)
initAndLog(c, "storage engine", func(c *cfg) {
@ -100,7 +102,7 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
initAndLog(c, "session", initSessionService)
initAndLog(c, "reputation", func(c *cfg) { initReputationService(ctx, c) })
initAndLog(c, "notification", initNotifications)
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService)

View file

@ -23,7 +23,7 @@ type notificationSource struct {
defaultTopic string
}
func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, addr oid.Address)) {
func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) {
log := n.l.With(zap.Uint64("epoch", epoch))
listRes, err := n.e.ListContainers(engine.ListContainersPrm{})
@ -51,7 +51,7 @@ func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, ad
}
for _, a := range selectRes.AddressList() {
err = n.processAddress(a, handler)
err = n.processAddress(ctx, a, handler)
if err != nil {
log.Error("notificator: could not process object",
zap.Stringer("address", a),
@ -66,13 +66,14 @@ func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, ad
}
func (n *notificationSource) processAddress(
ctx context.Context,
a oid.Address,
h func(topic string, addr oid.Address),
) error {
var prm engine.HeadPrm
prm.WithAddress(a)
res, err := n.e.Head(prm)
res, err := n.e.Head(ctx, prm)
if err != nil {
return err
}
@ -108,7 +109,7 @@ func (n notificationWriter) Notify(topic string, address oid.Address) {
}
}
func initNotifications(c *cfg) {
func initNotifications(ctx context.Context, c *cfg) {
if nodeconfig.Notification(c.appCfg).Enabled() {
topic := nodeconfig.Notification(c.appCfg).DefaultTopic()
pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey())
@ -151,7 +152,7 @@ func initNotifications(c *cfg) {
addNewEpochAsyncNotificationHandler(c, func(e event.Event) {
ev := e.(netmap.NewEpoch)
n.ProcessEpoch(ev.EpochNumber())
n.ProcessEpoch(ctx, ev.EpochNumber())
})
}
}

View file

@ -0,0 +1,31 @@
package main
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
"go.uber.org/zap"
)
func initTracing(ctx context.Context, c *cfg) {
conf := tracingconfig.ToTracingConfig(c.appCfg)
_, err := tracing.Setup(ctx, *conf)
if err != nil {
c.log.Error("failed init tracing", zap.Error(err))
}
c.closers = append(c.closers, closer{
name: "tracing",
fn: func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
err := tracing.Shutdown(ctx) //cfg context cancels before close
fyrchik marked this conversation as resolved Outdated

What is the comment about? Possible problems?

What is the comment about? Possible problems?

It's about using context.Background() for shutting down. If e use the main context of the application, then it will already be canceled at this point.

It's about using `context.Background()` for shutting down. If e use the main context of the application, then it will already be canceled at this point.
if err != nil {
c.log.Error("failed shutdown tracing", zap.Error(err))
}
},
})
}

View file

@ -184,3 +184,7 @@ FROSTFS_STORAGE_SHARD_1_PILORAMA_MAX_BATCH_SIZE=100
FROSTFS_STORAGE_SHARD_1_GC_REMOVER_BATCH_SIZE=200
#### Sleep interval between data remover tacts
FROSTFS_STORAGE_SHARD_1_GC_REMOVER_SLEEP_INTERVAL=5m
FROSTFS_TRACING_ENABLED=true
FROSTFS_TRACING_ENDPOINT="localhost"
FROSTFS_TRACING_EXPORTER="otlp_grpc"

View file

@ -243,5 +243,10 @@
}
}
}
},
"tracing": {
"enabled": true,
"endpoint": "localhost:9090",
"exporter": "otlp_grpc"
}
}

View file

@ -214,3 +214,9 @@ storage:
path: tmp/1/blob/pilorama.db
no_sync: true # USE WITH CAUTION. Return to user before pages have been persisted.
perm: 0644 # permission to use for the database file and intermediate directories
tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost"

28
go.mod
View file

@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.18
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.11.2-0.20230315095236-9dc375346703
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
git.frostfs.info/TrueCloudLab/hrw v1.2.0
@ -20,7 +20,6 @@ require (
github.com/multiformats/go-multiaddr v0.8.0
github.com/nats-io/nats.go v1.22.1
github.com/nspcc-dev/neo-go v0.100.1
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb // indirect
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.4.0
github.com/paulmach/orb v0.2.2
@ -29,14 +28,16 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
golang.org/x/sync v0.1.0
golang.org/x/term v0.3.0
google.golang.org/grpc v1.52.0
golang.org/x/term v0.5.0
google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.1
)
@ -47,15 +48,19 @@ require (
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
@ -76,6 +81,7 @@ require (
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@ -90,12 +96,18 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
github.com/twmb/murmur3 v1.1.5 // indirect
github.com/urfave/cli v1.22.5 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)

BIN
go.sum

Binary file not shown.

View file

@ -1,6 +1,7 @@
package blobovnicza
import (
"context"
"errors"
"math/rand"
"os"
@ -39,7 +40,7 @@ func testGet(t *testing.T, blz *Blobovnicza, addr oid.Address, expObj []byte, as
pGet.SetAddress(addr)
// try to read object from Blobovnicza
res, err := blz.Get(pGet)
res, err := blz.Get(context.Background(), pGet)
if assertErr != nil {
require.True(t, assertErr(err))
} else {

View file

@ -1,12 +1,16 @@
package blobovnicza
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// GetPrm groups the parameters of Get operation.
@ -39,7 +43,13 @@ var errInterruptForEach = errors.New("interrupt for-each")
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
// presented in Blobovnicza.
func (b *Blobovnicza) Get(prm GetPrm) (GetRes, error) {
func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Get",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
))
defer span.End()
var (
data []byte
addrKey = addressKey(prm.addr)

View file

@ -1,6 +1,7 @@
package blobovnicza
import (
"context"
"os"
"path/filepath"
"testing"
@ -56,7 +57,7 @@ func TestBlobovnicza_Get(t *testing.T) {
prmGet.SetAddress(addr)
checkObj := func() {
res, err := blz.Get(prmGet)
res, err := blz.Get(context.Background(), prmGet)
require.NoError(t, err)
require.Equal(t, obj, res.Object())

View file

@ -1,15 +1,27 @@
package blobovniczatree
import (
"context"
"encoding/hex"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// Exists implements common.Storage.
func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Exists",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
))
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String())
@ -32,7 +44,7 @@ func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
_, ok := activeCache[dirPath]
_, err := b.getObjectFromLevel(gPrm, p, !ok)
_, err := b.getObjectFromLevel(ctx, gPrm, p, !ok)
if err != nil {
if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not get object from level",

View file

@ -1,6 +1,7 @@
package blobovniczatree
import (
"context"
"os"
"path/filepath"
"testing"
@ -44,7 +45,7 @@ func TestExistsInvalidStorageID(t *testing.T) {
storageID[0]--
}
res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID})
res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: storageID})
require.NoError(t, err)
require.False(t, res.Exists)
})
@ -57,7 +58,7 @@ func TestExistsInvalidStorageID(t *testing.T) {
require.NoError(t, os.Chmod(badDir, 0))
t.Cleanup(func() { _ = os.Chmod(filepath.Join(dir, "9"), os.ModePerm) })
res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID})
res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: storageID})
require.Error(t, err)
require.False(t, res.Exists)
})

View file

@ -1,14 +1,19 @@
package blobovniczatree
import (
"context"
"encoding/hex"
"fmt"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -16,7 +21,15 @@ import (
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
// Otherwise, all Blobovniczas are processed descending weight.
func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.GetRes, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Get",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.Bool("raw", prm.Raw),
))
defer span.End()
var bPrm blobovnicza.GetPrm
bPrm.SetAddress(prm.Address)
@ -27,7 +40,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
return res, err
}
return b.getObject(blz, bPrm)
return b.getObject(ctx, blz, bPrm)
}
activeCache := make(map[string]struct{})
@ -37,7 +50,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
_, ok := activeCache[dirPath]
res, err = b.getObjectFromLevel(bPrm, p, !ok)
res, err = b.getObjectFromLevel(ctx, bPrm, p, !ok)
if err != nil {
if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not get object from level",
@ -64,7 +77,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
// tries to read object from particular blobovnicza.
//
// returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) {
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
@ -72,7 +85,7 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
v, ok := b.opened.Get(blzPath)
b.lruMtx.Unlock()
if ok {
if res, err := b.getObject(v, prm); err == nil {
if res, err := b.getObject(ctx, v, prm); err == nil {
return res, err
} else if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not read object from opened blobovnicza",
@ -92,7 +105,7 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
b.activeMtx.RUnlock()
if ok && tryActive {
if res, err := b.getObject(active.blz, prm); err == nil {
if res, err := b.getObject(ctx, active.blz, prm); err == nil {
return res, err
} else if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not get object from active blobovnicza",
@ -117,12 +130,12 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
return common.GetRes{}, err
}
return b.getObject(blz, prm)
return b.getObject(ctx, blz, prm)
}
// reads object from blobovnicza and returns GetSmallRes.
func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (common.GetRes, error) {
res, err := blz.Get(prm)
func (b *Blobovniczas) getObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (common.GetRes, error) {
res, err := blz.Get(ctx, prm)
if err != nil {
return common.GetRes{}, err
}

View file

@ -1,14 +1,20 @@
package blobovniczatree
import (
"context"
"encoding/hex"
"fmt"
"path/filepath"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -16,7 +22,16 @@ import (
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
// Otherwise, all Blobovniczas are processed descending weight.
func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes, err error) {
func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (res common.GetRangeRes, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.GetRange",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String())
@ -24,7 +39,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes,
return common.GetRangeRes{}, err
}
return b.getObjectRange(blz, prm)
return b.getObjectRange(ctx, blz, prm)
}
activeCache := make(map[string]struct{})
@ -35,7 +50,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes,
_, ok := activeCache[dirPath]
res, err = b.getRangeFromLevel(prm, p, !ok)
res, err = b.getRangeFromLevel(ctx, prm, p, !ok)
if err != nil {
outOfBounds := isErrOutOfRange(err)
if !outOfBounds && !blobovnicza.IsErrNotFound(err) {
@ -68,7 +83,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes,
// tries to read range of object payload data from particular blobovnicza.
//
// returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) {
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
@ -76,7 +91,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
v, ok := b.opened.Get(blzPath)
b.lruMtx.Unlock()
if ok {
res, err := b.getObjectRange(v, prm)
res, err := b.getObjectRange(ctx, v, prm)
switch {
case err == nil,
isErrOutOfRange(err):
@ -101,7 +116,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
b.activeMtx.RUnlock()
if ok && tryActive {
res, err := b.getObjectRange(active.blz, prm)
res, err := b.getObjectRange(ctx, active.blz, prm)
switch {
case err == nil,
isErrOutOfRange(err):
@ -131,11 +146,11 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
return common.GetRangeRes{}, err
}
return b.getObjectRange(blz, prm)
return b.getObjectRange(ctx, blz, prm)
}
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
func (b *Blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm common.GetRangePrm) (common.GetRangeRes, error) {
func (b *Blobovniczas) getObjectRange(ctx context.Context, blz *blobovnicza.Blobovnicza, prm common.GetRangePrm) (common.GetRangeRes, error) {
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.Address)
@ -143,7 +158,7 @@ func (b *Blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm common.G
// stores data that is compressed on BlobStor side.
// If blobovnicza learns to do the compression itself,
// we can start using GetRange.
res, err := blz.Get(gPrm)
res, err := blz.Get(ctx, gPrm)
if err != nil {
return common.GetRangeRes{}, err
}

View file

@ -1,6 +1,7 @@
package blobstor
import (
"context"
"path/filepath"
"testing"
@ -62,11 +63,11 @@ func TestCompression(t *testing.T) {
}
testGet := func(t *testing.T, b *BlobStor, i int) {
res1, err := b.Get(common.GetPrm{Address: object.AddressOf(smallObj[i])})
res1, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(smallObj[i])})
require.NoError(t, err)
require.Equal(t, smallObj[i], res1.Object)
res2, err := b.Get(common.GetPrm{Address: object.AddressOf(bigObj[i])})
res2, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(bigObj[i])})
require.NoError(t, err)
require.Equal(t, bigObj[i], res2.Object)
}

View file

@ -1,6 +1,10 @@
package common
import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
)
// Storage represents key-value object storage.
// It is used as a building block for a blobstor of a shard.
@ -16,9 +20,9 @@ type Storage interface {
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)
Exists(ExistsPrm) (ExistsRes, error)
Get(context.Context, GetPrm) (GetRes, error)
GetRange(context.Context, GetRangePrm) (GetRangeRes, error)
Exists(context.Context, ExistsPrm) (ExistsRes, error)
Put(PutPrm) (PutRes, error)
Delete(DeletePrm) (DeleteRes, error)
Iterate(IteratePrm) (IterateRes, error)

View file

@ -1,7 +1,13 @@
package blobstor
import (
"context"
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -9,15 +15,22 @@ import (
//
// Returns any error encountered that did not allow
// to completely check object existence.
func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Exists",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
))
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID != nil {
if len(prm.StorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.Exists(prm)
return b.storage[len(b.storage)-1].Storage.Exists(ctx, prm)
}
return b.storage[0].Storage.Exists(prm)
return b.storage[0].Storage.Exists(ctx, prm)
}
// If there was an error during existence check below,
@ -31,7 +44,7 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
// error | error | log the first error, return the second
var errors []error
for i := range b.storage {
res, err := b.storage[i].Storage.Exists(prm)
res, err := b.storage[i].Storage.Exists(ctx, prm)
if err == nil && res.Exists {
return res, nil
} else if err != nil {

View file

@ -1,6 +1,7 @@
package blobstor
import (
"context"
"os"
"testing"
@ -43,13 +44,13 @@ func TestExists(t *testing.T) {
for i := range objects {
prm.Address = objectCore.AddressOf(objects[i])
res, err := b.Exists(prm)
res, err := b.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
}
prm.Address = oidtest.Address()
res, err := b.Exists(prm)
res, err := b.Exists(context.Background(), prm)
require.NoError(t, err)
require.False(t, res.Exists)
@ -60,13 +61,13 @@ func TestExists(t *testing.T) {
// Object exists, first error is logged.
prm.Address = objectCore.AddressOf(objects[0])
res, err := b.Exists(prm)
res, err := b.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
// Object doesn't exist, first error is returned.
prm.Address = objectCore.AddressOf(objects[1])
_, err = b.Exists(prm)
_, err = b.Exists(context.Background(), prm)
require.Error(t, err)
require.ErrorIs(t, err, teststore.ErrDiskExploded)
})

View file

@ -1,6 +1,7 @@
package fstree
import (
"context"
"crypto/sha256"
"errors"
"fmt"
@ -11,6 +12,7 @@ import (
"strings"
"syscall"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -19,6 +21,8 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// FSTree represents an object storage as a filesystem tree.
@ -208,7 +212,13 @@ func (t *FSTree) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
// Exists returns the path to the file with object contents if it exists in the storage
// and an error otherwise.
func (t *FSTree) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Exists",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
))
defer span.End()
p := t.treePath(prm.Address)
_, err := os.Stat(p)
@ -336,16 +346,30 @@ func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error
}
// Get returns an object from the storage by address.
func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.Get",
trace.WithAttributes(
attribute.Bool("raw", prm.Raw),
attribute.String("address", prm.Address.EncodeToString()),
))
defer span.End()
fyrchik marked this conversation as resolved Outdated

Can we create a custom attributes for this? Like zap.Stringer.
When tracing is disabled it seems not worth doing this allocations.

Also, storage_id is not needed for FSTree.

Can we create a custom attributes for this? Like `zap.Stringer`. When tracing is disabled it seems not worth doing this allocations. Also, `storage_id` is not needed for FSTree.

Can we create a custom attributes for this? Like zap.Stringer.
When tracing is disabled it seems not worth doing this allocations.

Also, storage_id is not needed for FSTree.

Can we create a custom attributes for this? Like `zap.Stringer`. When tracing is disabled it seems not worth doing this allocations. Also, `storage_id` is not needed for FSTree.

Can we create a custom attributes for this? Like zap.Stringer.
When tracing is disabled it seems not worth doing this allocations.

WithAttributes receives KeyValue struct. attributes.Stringer already exists, but has such implementation:

// Stringer creates a new key-value pair with a passed name and a string
// value generated by the passed Stringer interface.
func Stringer(k string, v fmt.Stringer) KeyValue {
	return Key(k).String(v.String())
}
> Can we create a custom attributes for this? Like zap.Stringer. When tracing is disabled it seems not worth doing this allocations. `WithAttributes` receives `KeyValue` struct. `attributes.Stringer` already exists, but has such implementation: ``` // Stringer creates a new key-value pair with a passed name and a string // value generated by the passed Stringer interface. func Stringer(k string, v fmt.Stringer) KeyValue { return Key(k).String(v.String()) } ```

Also, storage_id is not needed for FSTree.

fixed

> Also, storage_id is not needed for FSTree. fixed
p := t.treePath(prm.Address)
if _, err := os.Stat(p); os.IsNotExist(err) {
return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
data, err := os.ReadFile(p)
if err != nil {
return common.GetRes{}, err
var data []byte
var err error
{
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Get.ReadFile")
defer span.End()
data, err = os.ReadFile(p)
if err != nil {
return common.GetRes{}, err
}
}
data, err = t.Decompress(data)
@ -362,8 +386,16 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
}
// GetRange implements common.Storage.
func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
res, err := t.Get(common.GetPrm{Address: prm.Address})
func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.GetRange",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
res, err := t.Get(ctx, common.GetPrm{Address: prm.Address})
if err != nil {
return common.GetRangeRes{}, err
}

View file

@ -1,23 +1,36 @@
package blobstor
import (
"context"
"encoding/hex"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Get reads the object from b.
// If the descriptor is present, only one sub-storage is tried,
// Otherwise, each sub-storage is tried in order.
func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Get",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.Bool("raw", prm.Raw),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
))
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID == nil {
for i := range b.storage {
res, err := b.storage[i].Storage.Get(prm)
res, err := b.storage[i].Storage.Get(ctx, prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
}
@ -26,7 +39,7 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
if len(prm.StorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.Get(prm)
return b.storage[len(b.storage)-1].Storage.Get(ctx, prm)
}
return b.storage[0].Storage.Get(prm)
return b.storage[0].Storage.Get(ctx, prm)
}

View file

@ -1,23 +1,38 @@
package blobstor
import (
"context"
"encoding/hex"
"errors"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// GetRange reads object payload data from b.
// If the descriptor is present, only one sub-storage is tried,
// Otherwise, each sub-storage is tried in order.
func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.GetRange",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID == nil {
for i := range b.storage {
res, err := b.storage[i].Storage.GetRange(prm)
res, err := b.storage[i].Storage.GetRange(ctx, prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
}
@ -26,7 +41,7 @@ func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error)
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
if len(prm.StorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.GetRange(prm)
return b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm)
}
return b.storage[0].Storage.GetRange(prm)
return b.storage[0].Storage.GetRange(ctx, prm)
}

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"math/rand"
"testing"
@ -26,7 +27,7 @@ func TestControl(t *testing.T, cons Constructor, min, max uint64) {
prm.StorageID = objects[i].storageID
prm.Raw = true
_, err := s.Get(prm)
_, err := s.Get(context.Background(), prm)
require.NoError(t, err)
}

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -35,18 +36,18 @@ func TestDelete(t *testing.T, cons Constructor, min, max uint64) {
t.Run("exists fail", func(t *testing.T) {
prm := common.ExistsPrm{Address: oidtest.Address()}
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.False(t, res.Exists)
})
t.Run("get fail", func(t *testing.T) {
prm := common.GetPrm{Address: oidtest.Address()}
_, err := s.Get(prm)
_, err := s.Get(context.Background(), prm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
t.Run("getrange fail", func(t *testing.T) {
prm := common.GetRangePrm{Address: oidtest.Address()}
_, err := s.GetRange(prm)
_, err := s.GetRange(context.Background(), prm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
})
@ -75,7 +76,7 @@ func TestDelete(t *testing.T, cons Constructor, min, max uint64) {
prm.Address = objects[3].addr
prm.Raw = true
res, err := s.Get(prm)
res, err := s.Get(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objects[3].raw, res.RawData)
})

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -18,7 +19,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) {
t.Run("missing object", func(t *testing.T) {
prm := common.ExistsPrm{Address: oidtest.Address()}
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.False(t, res.Exists)
})
@ -29,7 +30,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) {
t.Run("without storage ID", func(t *testing.T) {
prm.StorageID = nil
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
})
@ -37,7 +38,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) {
t.Run("with storage ID", func(t *testing.T) {
prm.StorageID = objects[0].storageID
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
})

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -19,7 +20,7 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
t.Run("missing object", func(t *testing.T) {
gPrm := common.GetPrm{Address: oidtest.Address()}
_, err := s.Get(gPrm)
_, err := s.Get(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
@ -29,13 +30,13 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
// With storage ID.
gPrm.StorageID = objects[i].storageID
res, err := s.Get(gPrm)
res, err := s.Get(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
// Without storage ID.
gPrm.StorageID = nil
res, err = s.Get(gPrm)
res, err = s.Get(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
@ -43,7 +44,7 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
gPrm.StorageID = objects[i].storageID
gPrm.Raw = true
res, err = s.Get(gPrm)
res, err = s.Get(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].raw, res.RawData)
}

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"math"
"testing"
@ -20,7 +21,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
t.Run("missing object", func(t *testing.T) {
gPrm := common.GetRangePrm{Address: oidtest.Address()}
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
@ -38,14 +39,14 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
t.Run("without storage ID", func(t *testing.T) {
// Without storage ID.
res, err := s.GetRange(gPrm)
res, err := s.GetRange(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, payload[start:stop], res.Data)
})
t.Run("with storage ID", func(t *testing.T) {
gPrm.StorageID = objects[0].storageID
res, err := s.GetRange(gPrm)
res, err := s.GetRange(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, payload[start:stop], res.Data)
})
@ -54,7 +55,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(uint64(len(payload) + 10))
gPrm.Range.SetLength(10)
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
@ -62,7 +63,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(10)
gPrm.Range.SetLength(uint64(len(payload)))
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
@ -70,7 +71,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(0)
gPrm.Range.SetLength(1 << 63)
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
@ -78,7 +79,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(10)
gPrm.Range.SetLength(math.MaxUint64 - 2)
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
}

View file

@ -2,6 +2,7 @@
package memstore
import (
"context"
"fmt"
"sync"
@ -32,7 +33,7 @@ func New(opts ...Option) common.Storage {
return st
}
func (s *memstoreImpl) Get(req common.GetPrm) (common.GetRes, error) {
func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()
@ -58,8 +59,8 @@ func (s *memstoreImpl) Get(req common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, nil
}
func (s *memstoreImpl) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(common.GetPrm{
func (s *memstoreImpl) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(ctx, common.GetPrm{
Address: req.Address,
StorageID: req.StorageID,
})
@ -80,7 +81,7 @@ func (s *memstoreImpl) GetRange(req common.GetRangePrm) (common.GetRangeRes, err
}, nil
}
func (s *memstoreImpl) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
func (s *memstoreImpl) Exists(_ context.Context, req common.ExistsPrm) (common.ExistsRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()

View file

@ -1,6 +1,7 @@
package memstore
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -32,13 +33,13 @@ func TestSimpleLifecycle(t *testing.T) {
}
{
resp, err := s.Exists(common.ExistsPrm{Address: addr})
resp, err := s.Exists(context.Background(), common.ExistsPrm{Address: addr})
require.NoError(t, err)
require.True(t, resp.Exists)
}
{
resp, err := s.Get(common.GetPrm{Address: addr})
resp, err := s.Get(context.Background(), common.GetPrm{Address: addr})
require.NoError(t, err)
require.Equal(t, obj.Payload(), resp.Object.Payload())
}
@ -47,7 +48,7 @@ func TestSimpleLifecycle(t *testing.T) {
var objRange objectSDK.Range
objRange.SetOffset(256)
objRange.SetLength(512)
resp, err := s.GetRange(common.GetRangePrm{
resp, err := s.GetRange(context.Background(), common.GetRangePrm{
Address: addr,
Range: objRange,
})
@ -61,7 +62,7 @@ func TestSimpleLifecycle(t *testing.T) {
}
{
resp, err := s.Exists(common.ExistsPrm{Address: addr})
resp, err := s.Exists(context.Background(), common.ExistsPrm{Address: addr})
require.NoError(t, err)
require.False(t, resp.Exists)
}

View file

@ -1,6 +1,7 @@
package blobstor
import (
"context"
"fmt"
"os"
"testing"
@ -119,7 +120,7 @@ func BenchmarkSubstorageReadPerf(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := st.Get(common.GetPrm{Address: addrGen.Next()})
_, err := st.Get(context.Background(), common.GetPrm{Address: addrGen.Next()})
require.NoError(b, err)
}
})

View file

@ -13,6 +13,7 @@
package teststore
import (
"context"
"errors"
"fmt"
"sync"
@ -140,36 +141,36 @@ func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
}
}
func (s *TestStore) Get(req common.GetPrm) (common.GetRes, error) {
func (s *TestStore) Get(ctx context.Context, req common.GetPrm) (common.GetRes, error) {
switch {
case s.overrides.Get != nil:
return s.overrides.Get(req)
case s.st != nil:
return s.st.Get(req)
return s.st.Get(ctx, req)
default:
panic(fmt.Sprintf("unexpected storage call: Get(%+v)", req))
}
}
func (s *TestStore) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
func (s *TestStore) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.GetRange != nil:
return s.overrides.GetRange(req)
case s.st != nil:
return s.st.GetRange(req)
return s.st.GetRange(ctx, req)
default:
panic(fmt.Sprintf("unexpected storage call: GetRange(%+v)", req))
}
}
func (s *TestStore) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
func (s *TestStore) Exists(ctx context.Context, req common.ExistsPrm) (common.ExistsRes, error) {
switch {
case s.overrides.Exists != nil:
return s.overrides.Exists(req)
case s.st != nil:
return s.st.Exists(req)
return s.st.Exists(ctx, req)
default:
panic(fmt.Sprintf("unexpected storage call: Exists(%+v)", req))
}

View file

@ -212,20 +212,20 @@ func TestExecBlocks(t *testing.T) {
require.NoError(t, e.BlockExecution(errBlock))
// try to exec some op
_, err := Head(e, addr)
_, err := Head(context.Background(), e, addr)
require.ErrorIs(t, err, errBlock)
// resume executions
require.NoError(t, e.ResumeExecution())
_, err = Head(e, addr) // can be any data-related op
_, err = Head(context.Background(), e, addr) // can be any data-related op
require.NoError(t, err)
// close
require.NoError(t, e.Close())
// try exec after close
_, err = Head(e, addr)
_, err = Head(context.Background(), e, addr)
require.Error(t, err)
// try to resume

View file

@ -72,7 +72,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(prm.addr)
resExists, err := sh.Exists(existsPrm)
resExists, err := sh.Exists(ctx, existsPrm)
if err != nil {
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
return true

View file

@ -93,7 +93,7 @@ func checkGetError(t *testing.T, e *StorageEngine, addr oid.Address, expected an
var getPrm GetPrm
getPrm.WithAddress(addr)
_, err := e.Get(getPrm)
_, err := e.Get(context.Background(), getPrm)
if expected != nil {
require.ErrorAs(t, err, expected)
} else {

View file

@ -102,7 +102,7 @@ func TestErrorReporting(t *testing.T) {
te.ng.mtx.RUnlock()
require.NoError(t, err)
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
@ -115,7 +115,7 @@ func TestErrorReporting(t *testing.T) {
}
for i := uint32(1); i < 3; i++ {
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
@ -136,7 +136,7 @@ func TestErrorReporting(t *testing.T) {
te.ng.mtx.RUnlock()
require.NoError(t, err)
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
@ -149,14 +149,14 @@ func TestErrorReporting(t *testing.T) {
}
for i := uint32(1); i < errThreshold; i++ {
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
for i := uint32(0); i < 2; i++ {
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
@ -193,9 +193,9 @@ func TestBlobstorFailback(t *testing.T) {
for i := range objs {
addr := object.AddressOf(objs[i])
_, err = te.ng.Get(GetPrm{addr: addr})
_, err = te.ng.Get(context.Background(), GetPrm{addr: addr})
require.NoError(t, err)
_, err = te.ng.GetRange(RngPrm{addr: addr})
_, err = te.ng.GetRange(context.Background(), RngPrm{addr: addr})
require.NoError(t, err)
}
@ -213,15 +213,15 @@ func TestBlobstorFailback(t *testing.T) {
for i := range objs {
addr := object.AddressOf(objs[i])
getRes, err := te.ng.Get(GetPrm{addr: addr})
getRes, err := te.ng.Get(context.Background(), GetPrm{addr: addr})
require.NoError(t, err)
require.Equal(t, objs[i], getRes.Object())
rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
rngRes, err := te.ng.GetRange(context.Background(), RngPrm{addr: addr, off: 1, ln: 10})
require.NoError(t, err)
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
_, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
_, err = te.ng.GetRange(context.Background(), RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
}

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"fmt"
@ -58,7 +59,7 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID {
shardIDs[i] = prm.shardID[i].String()
@ -83,7 +84,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
var res EvacuateShardRes
for _, shardID := range shardIDs {
if err = e.evacuateShard(shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
return res, err
}
}
@ -92,7 +93,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
return res, nil
}
func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
@ -113,7 +114,7 @@ func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res
return err
}
if err = e.evacuateObjects(sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
@ -160,7 +161,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
return shards, weights, nil
}
func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
for i := range toEvacuate {
addr := toEvacuate[i].Address
@ -168,7 +169,7 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getRes, err := sh.Get(getPrm)
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
continue
@ -176,7 +177,7 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add
return err
}
if e.tryEvacuateObject(addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) {
if e.tryEvacuateObject(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) {
continue
}
@ -195,14 +196,14 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add
return nil
}
func (e *StorageEngine) tryEvacuateObject(addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
func (e *StorageEngine) tryEvacuateObject(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) bool {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, object)
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",

View file

@ -91,7 +91,7 @@ func TestEvacuateShard(t *testing.T) {
var prm GetPrm
prm.WithAddress(objectCore.AddressOf(objects[i]))
_, err := e.Get(prm)
_, err := e.Get(context.Background(), prm)
require.NoError(t, err)
}
}
@ -102,14 +102,14 @@ func TestEvacuateShard(t *testing.T) {
prm.WithShardIDList(ids[2:3])
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, shard.ErrMustBeReadOnly)
require.Equal(t, 0, res.Count())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objPerShard, res.count)
@ -120,7 +120,7 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t)
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(prm)
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 0, res.count)
@ -165,13 +165,13 @@ func TestEvacuateNetwork(t *testing.T) {
var prm EvacuateShardPrm
prm.shardID = ids[0:1]
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, 0, res.Count())
prm.handler = acceptOneOf(objects, 2)
res, err = e.Evacuate(prm)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
})
@ -185,14 +185,14 @@ func TestEvacuateNetwork(t *testing.T) {
prm.shardID = ids[1:2]
prm.handler = acceptOneOf(objects, 2)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, 3)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 3, res.Count())
})
@ -217,14 +217,14 @@ func TestEvacuateNetwork(t *testing.T) {
prm.shardID = evacuateIDs
prm.handler = acceptOneOf(objects, totalCount-1)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Count())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Count())
})

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -16,7 +17,7 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) {
exists := false
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(shPrm)
res, err := sh.Exists(context.TODO(), shPrm)
if err != nil {
if shard.IsErrRemoved(err) {
alreadyRemoved = true

View file

@ -1,14 +1,18 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -43,16 +47,22 @@ func (r GetRes) Object() *objectSDK.Object {
// Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Get(prm GetPrm) (res GetRes, err error) {
func (e *StorageEngine) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
err = e.execIfNotBlocked(func() error {
res, err = e.get(prm)
res, err = e.get(ctx, prm)
return err
})
return
}
func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.get",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
))
defer span.End()
if e.metrics != nil {
defer elapsed(e.metrics.AddGetDuration)()
}
@ -69,7 +79,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
Engine: e,
}
it.tryGetWithMeta()
it.tryGetWithMeta(ctx)
if it.SplitInfo != nil {
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
@ -84,7 +94,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
return GetRes{}, it.OutError
}
it.tryGetFromBlobstore()
it.tryGetFromBlobstore(ctx)
if it.Object == nil {
return GetRes{}, it.OutError
@ -116,14 +126,14 @@ type getShardIterator struct {
splitInfoErr *objectSDK.SplitInfoError
}
func (i *getShardIterator) tryGetWithMeta() {
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) {
noMeta := sh.GetMode().NoMetabase()
i.ShardPrm.SetIgnoreMeta(noMeta)
i.HasDegraded = i.HasDegraded || noMeta
res, err := sh.Get(i.ShardPrm)
res, err := sh.Get(ctx, i.ShardPrm)
if err == nil {
i.Object = res.Object()
return true
@ -162,7 +172,7 @@ func (i *getShardIterator) tryGetWithMeta() {
})
}
func (i *getShardIterator) tryGetFromBlobstore() {
func (i *getShardIterator) tryGetFromBlobstore(ctx context.Context) {
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
@ -174,18 +184,18 @@ func (i *getShardIterator) tryGetFromBlobstore() {
return false
}
res, err := sh.Get(i.ShardPrm)
res, err := sh.Get(ctx, i.ShardPrm)
i.Object = res.Object()
return err == nil
})
}
// Get reads object from local storage by provided address.
func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
func Get(ctx context.Context, storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
var getPrm GetPrm
getPrm.WithAddress(addr)
res, err := storage.Get(getPrm)
res, err := storage.Get(ctx, getPrm)
if err != nil {
return nil, err
}

View file

@ -1,8 +1,10 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -52,16 +54,19 @@ func (r HeadRes) Header() *objectSDK.Object {
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object was inhumed.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Head(prm HeadPrm) (res HeadRes, err error) {
func (e *StorageEngine) Head(ctx context.Context, prm HeadPrm) (res HeadRes, err error) {
err = e.execIfNotBlocked(func() error {
res, err = e.head(prm)
res, err = e.head(ctx, prm)
return err
})
return
}
func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.head")
defer span.End()
if e.metrics != nil {
defer elapsed(e.metrics.AddHeadDuration)()
}
@ -81,7 +86,7 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
shPrm.SetRaw(prm.raw)
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Head(shPrm)
res, err := sh.Head(ctx, shPrm)
if err != nil {
switch {
case shard.IsErrNotFound(err):
@ -139,11 +144,11 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
}
// Head reads object header from local storage by provided address.
func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
func Head(ctx context.Context, storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
var headPrm HeadPrm
headPrm.WithAddress(addr)
res, err := storage.Head(headPrm)
res, err := storage.Head(ctx, headPrm)
if err != nil {
return nil, err
}
@ -153,12 +158,12 @@ func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
// HeadRaw reads object header from local storage by provided address and raw
// flag.
func HeadRaw(storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) {
func HeadRaw(ctx context.Context, storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) {
var headPrm HeadPrm
headPrm.WithAddress(addr)
headPrm.WithRaw(raw)
res, err := storage.Head(headPrm)
res, err := storage.Head(ctx, headPrm)
if err != nil {
return nil, err
}

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"os"
"testing"
@ -66,7 +67,7 @@ func TestHeadRaw(t *testing.T) {
headPrm.WithAddress(parentAddr)
headPrm.WithRaw(true)
_, err = e.Head(headPrm)
_, err = e.Head(context.Background(), headPrm)
require.Error(t, err)
var si *object.SplitInfoError

View file

@ -134,7 +134,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
if checkExists {
existPrm.SetAddress(addr)
exRes, err := sh.Exists(existPrm)
exRes, err := sh.Exists(ctx, existPrm)
if err != nil {
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
// inhumed once - no need to be inhumed again

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -69,7 +70,7 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addrLocked)
exRes, err := sh.Exists(existsPrm)
exRes, err := sh.Exists(context.TODO(), existsPrm)
if err != nil {
var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) {

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -72,7 +73,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
return false
}
putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj)
putDone, exists := e.putToShard(context.TODO(), sh, ind, pool, addr, prm.obj)
finished = putDone || exists
return finished
})
@ -87,7 +88,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
var putSuccess, alreadyExists bool
exitCh := make(chan struct{})
@ -98,7 +99,7 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool
var existPrm shard.ExistsPrm
existPrm.SetAddress(addr)
exists, err := sh.Exists(existPrm)
exists, err := sh.Exists(ctx, existPrm)
if err != nil {
if shard.IsErrObjectExpired(err) {
// object is already found but

View file

@ -1,14 +1,19 @@
package engine
import (
"context"
"errors"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -56,16 +61,24 @@ func (r RngRes) Object() *objectSDK.Object {
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) GetRange(prm RngPrm) (res RngRes, err error) {
func (e *StorageEngine) GetRange(ctx context.Context, prm RngPrm) (res RngRes, err error) {
err = e.execIfNotBlocked(func() error {
res, err = e.getRange(prm)
res, err = e.getRange(ctx, prm)
return err
})
return
}
func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getRange",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
attribute.String("offset", strconv.FormatUint(prm.off, 10)),
attribute.String("length", strconv.FormatUint(prm.ln, 10)),
))
defer span.End()
if e.metrics != nil {
defer elapsed(e.metrics.AddRangeDuration)()
}
@ -83,7 +96,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
Engine: e,
}
it.tryGetWithMeta()
it.tryGetWithMeta(ctx)
if it.SplitInfo != nil {
return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
@ -96,7 +109,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
return RngRes{}, it.OutError
}
it.tryGetFromBlobstor()
it.tryGetFromBlobstor(ctx)
if it.Object == nil {
return RngRes{}, it.OutError
@ -114,12 +127,12 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
}
// GetRange reads object payload range from local storage by provided address.
func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) {
func GetRange(ctx context.Context, storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) {
var rangePrm RngPrm
rangePrm.WithAddress(addr)
rangePrm.WithPayloadRange(rng)
res, err := storage.GetRange(rangePrm)
res, err := storage.GetRange(ctx, rangePrm)
if err != nil {
return nil, err
}
@ -141,13 +154,13 @@ type getRangeShardIterator struct {
Engine *StorageEngine
}
func (i *getRangeShardIterator) tryGetWithMeta() {
func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) {
noMeta := sh.GetMode().NoMetabase()
i.HasDegraded = i.HasDegraded || noMeta
i.ShardPrm.SetIgnoreMeta(noMeta)
res, err := sh.GetRange(i.ShardPrm)
res, err := sh.GetRange(ctx, i.ShardPrm)
if err == nil {
i.Object = res.Object()
return true
@ -185,7 +198,7 @@ func (i *getRangeShardIterator) tryGetWithMeta() {
})
}
func (i *getRangeShardIterator) tryGetFromBlobstor() {
func (i *getRangeShardIterator) tryGetFromBlobstor(ctx context.Context) {
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
@ -197,7 +210,7 @@ func (i *getRangeShardIterator) tryGetFromBlobstor() {
return false
}
res, err := sh.GetRange(i.ShardPrm)
res, err := sh.GetRange(ctx, i.ShardPrm)
if shard.IsErrOutOfRange(err) {
var errOutOfRange apistatus.ObjectOutOfRange

View file

@ -116,7 +116,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addr)
res, err := shards[i].Exists(existsPrm)
res, err := shards[i].Exists(ctx, existsPrm)
if err != nil {
return err
} else if !res.Exists() {

View file

@ -63,6 +63,7 @@ func TestShardOpen(t *testing.T) {
newShard := func() *Shard {
return New(
WithID(NewIDFromBytes([]byte{})),
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
@ -146,6 +147,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
require.NoError(t, err)
sh = New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
@ -155,7 +157,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
var getPrm GetPrm
getPrm.SetAddress(addr)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
require.NoError(t, sh.Close())
}
@ -176,6 +178,7 @@ func TestRefillMetabase(t *testing.T) {
}
sh := New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta")),
@ -277,7 +280,7 @@ func TestRefillMetabase(t *testing.T) {
checkObj := func(addr oid.Address, expObj *objectSDK.Object) {
headPrm.SetAddress(addr)
res, err := sh.Head(headPrm)
res, err := sh.Head(context.Background(), headPrm)
if expObj == nil {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
@ -302,7 +305,7 @@ func TestRefillMetabase(t *testing.T) {
for _, member := range tombMembers {
headPrm.SetAddress(member)
_, err := sh.Head(headPrm)
_, err := sh.Head(context.Background(), headPrm)
if exists {
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
@ -343,6 +346,7 @@ func TestRefillMetabase(t *testing.T) {
require.NoError(t, err)
sh = New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta_restored")),

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -51,7 +52,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = sh.Delete(delPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
@ -69,13 +70,13 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err := sh.Put(putPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err)
_, err = sh.Delete(delPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
}

View file

@ -2,6 +2,7 @@ package shard_test
import (
"bytes"
"context"
"io"
"math/rand"
"os"
@ -276,7 +277,7 @@ func checkRestore(t *testing.T, sh *shard.Shard, prm shard.RestorePrm, objects [
for i := range objects {
getPrm.SetAddress(object.AddressOf(objects[i]))
res, err := sh.Get(getPrm)
res, err := sh.Get(context.Background(), getPrm)
require.NoError(t, err)
require.Equal(t, objects[i], res.Object())
}

View file

@ -1,6 +1,8 @@
package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,7 +35,7 @@ func (p ExistsRes) Exists() bool {
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
var exists bool
var err error
@ -45,7 +47,7 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
p.Address = prm.addr
var res common.ExistsRes
res, err = s.blobStor.Exists(p)
res, err = s.blobStor.Exists(ctx, p)
exists = res.Exists
} else {
var existsPrm meta.ExistsPrm

View file

@ -33,6 +33,7 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
rootPath := t.TempDir()
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
@ -115,7 +116,7 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
var getPrm shard.GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj))
require.Eventually(t, func() bool {
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired object must be deleted")
}

View file

@ -1,8 +1,10 @@
package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -11,6 +13,8 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -61,7 +65,15 @@ func (r GetRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Get",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Bool("skip_meta", prm.skipMeta),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
@ -70,7 +82,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
getPrm.Address = prm.addr
getPrm.StorageID = id
res, err := stor.Get(getPrm)
res, err := stor.Get(ctx, getPrm)
if err != nil {
return nil, err
}
@ -79,7 +91,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
}
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
return c.Get(prm.addr)
return c.Get(ctx, prm.addr)
}
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()

View file

@ -2,6 +2,7 @@ package shard_test
import (
"bytes"
"context"
"errors"
"testing"
"time"
@ -111,11 +112,11 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
}
func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (shard.GetRes, error) {
res, err := sh.Get(getPrm)
res, err := sh.Get(context.Background(), getPrm)
if hasWriteCache {
require.Eventually(t, func() bool {
if shard.IsErrNotFound(err) {
res, err = sh.Get(getPrm)
res, err = sh.Get(context.Background(), getPrm)
}
return !shard.IsErrNotFound(err)
}, time.Second, time.Millisecond*100)

View file

@ -1,9 +1,14 @@
package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// HeadPrm groups the parameters of Head operation.
@ -43,7 +48,15 @@ func (r HeadRes) Object() *objectSDK.Object {
// Returns an error of type apistatus.ObjectNotFound if object is missing in Shard.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Head",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Bool("raw", prm.raw),
))
defer span.End()
var obj *objectSDK.Object
var err error
if s.GetMode().NoMetabase() {
@ -52,7 +65,7 @@ func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
getPrm.SetIgnoreMeta(true)
var res GetRes
res, err = s.Get(getPrm)
res, err = s.Get(ctx, getPrm)
obj = res.Object()
} else {
var headParams meta.GetPrm

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"errors"
"testing"
"time"
@ -75,18 +76,18 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
headPrm.SetAddress(object.AddressOf(parent))
headPrm.SetRaw(false)
head, err := sh.Head(headPrm)
head, err := sh.Head(context.Background(), headPrm)
require.NoError(t, err)
require.Equal(t, parent.CutPayload(), head.Object())
})
}
func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (shard.HeadRes, error) {
res, err := sh.Head(headPrm)
res, err := sh.Head(context.Background(), headPrm)
if hasWriteCache {
require.Eventually(t, func() bool {
if shard.IsErrNotFound(err) {
res, err = sh.Head(headPrm)
res, err = sh.Head(context.Background(), headPrm)
}
return !shard.IsErrNotFound(err)
}, time.Second, time.Millisecond*100)

View file

@ -51,6 +51,6 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
_, err = sh.Inhume(context.Background(), inhPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
}

View file

@ -27,6 +27,7 @@ func TestShard_Lock(t *testing.T) {
rootPath := t.TempDir()
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
@ -137,7 +138,7 @@ func TestShard_Lock(t *testing.T) {
var getPrm shard.GetPrm
getPrm.SetAddress(objectcore.AddressOf(obj))
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
}

View file

@ -1,6 +1,10 @@
package shard
import (
"context"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -8,6 +12,8 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// RngPrm groups the parameters of GetRange operation.
@ -66,7 +72,17 @@ func (r RngRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetRange",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Bool("skip_meta", prm.skipMeta),
attribute.String("offset", strconv.FormatUint(prm.off, 10)),
attribute.String("length", strconv.FormatUint(prm.ln, 10)),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
@ -77,7 +93,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
getRngPrm.Range.SetLength(prm.ln)
getRngPrm.StorageID = id
res, err := stor.GetRange(getRngPrm)
res, err := stor.GetRange(ctx, getRngPrm)
if err != nil {
return nil, err
}
@ -89,7 +105,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
}
wc := func(c writecache.Cache) (*object.Object, error) {
res, err := c.Get(prm.addr)
res, err := c.Get(ctx, prm.addr)
if err != nil {
return nil, err
}

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"math"
"path/filepath"
"testing"
@ -105,7 +106,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
rngPrm.SetAddress(addr)
rngPrm.SetRange(tc.rng.GetOffset(), tc.rng.GetLength())
res, err := sh.GetRange(rngPrm)
res, err := sh.GetRange(context.Background(), rngPrm)
if tc.hasErr {
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
} else {

View file

@ -66,7 +66,7 @@ func TestShardReload(t *testing.T) {
var prm ExistsPrm
prm.SetAddress(objects[i].addr)
res, err := sh.Exists(prm)
res, err := sh.Exists(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, exists, res.Exists(), "object #%d is missing", i)
}

View file

@ -63,6 +63,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
}
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(bsOpts...),
shard.WithMetaBaseOptions(

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"math/rand"
"testing"
@ -55,7 +56,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
for i := range objects {
getPrm.SetAddress(object.AddressOf(objects[i]))
_, err := sh.Get(getPrm)
_, err := sh.Get(context.Background(), getPrm)
require.NoError(t, err, i)
}
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"os"
"path/filepath"
"testing"
@ -95,7 +96,7 @@ func TestFlush(t *testing.T) {
prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID()
res, err := bs.Get(prm)
res, err := bs.Get(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
}
@ -119,7 +120,7 @@ func TestFlush(t *testing.T) {
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
@ -149,7 +150,7 @@ func TestFlush(t *testing.T) {
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
@ -266,7 +267,7 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.Open(true))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
_, err := wc.Get(context.Background(), objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
@ -275,7 +276,7 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.Open(false))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
_, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {

View file

@ -1,6 +1,9 @@
package writecache
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -8,14 +11,22 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Get returns object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
saddr := addr.EncodeToString()
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
trace.WithAttributes(
attribute.String("address", saddr),
))
defer span.End()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
@ -23,7 +34,7 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
return obj, obj.Unmarshal(value)
}
res, err := c.fsTree.Get(common.GetPrm{Address: addr})
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if err != nil {
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
@ -35,8 +46,14 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
// Head returns object header from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Head(addr oid.Address) (*objectSDK.Object, error) {
obj, err := c.Get(addr)
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
obj, err := c.Get(ctx, addr)
if err != nil {
return nil, err
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"errors"
"sync"
@ -177,6 +178,6 @@ func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
prm.SetAddress(addr)
mRes, _ := c.metabase.StorageID(prm)
res, err := c.blobstor.Exists(common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"io/fs"
"os"
"time"
@ -27,7 +28,7 @@ type metabase interface {
type blob interface {
Put(common.PutPrm) (common.PutRes, error)
NeedsCompression(obj *objectSDK.Object) bool
Exists(res common.ExistsPrm) (common.ExistsRes, error)
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
}
type options struct {

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"os"
"sync"
@ -23,8 +24,8 @@ type Info struct {
// Cache represents write-cache for objects.
type Cache interface {
Get(address oid.Address) (*object.Object, error)
Head(oid.Address) (*object.Object, error)
Get(ctx context.Context, address oid.Address) (*object.Object, error)
Head(context.Context, oid.Address) (*object.Object, error)
// Delete removes object referenced by the given oid.Address from the
// Cache. Returns any error encountered that prevented the object to be
// removed.

View file

@ -19,7 +19,7 @@ import (
"google.golang.org/grpc/status"
)
func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) {
func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
@ -30,7 +30,7 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)
res, err := s.s.Evacuate(prm)
res, err := s.s.Evacuate(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

View file

@ -1,6 +1,8 @@
package notificator
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -8,7 +10,7 @@ import (
type NotificationSource interface {
// Iterate must iterate over all notifications for the
// provided epoch and call handler for all of them.
Iterate(epoch uint64, handler func(topic string, addr oid.Address))
Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address))
}
// NotificationWriter notifies all the subscribers

View file

@ -1,6 +1,7 @@
package notificator
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -71,11 +72,11 @@ func New(prm *Prm) *Notificator {
// ProcessEpoch looks for all objects with defined epoch in the storage
// and passes their addresses to the NotificationWriter.
func (n *Notificator) ProcessEpoch(epoch uint64) {
func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) {
logger := n.l.With(zap.Uint64("epoch", epoch))
logger.Debug("notificator: start processing object notifications")
n.ns.Iterate(epoch, func(topic string, addr oid.Address) {
n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) {
n.l.Debug("notificator: processing object notification",
zap.String("topic", topic),
zap.Stringer("address", addr),

View file

@ -1,6 +1,7 @@
package v2
import (
"context"
"crypto/ecdsa"
"errors"
"testing"
@ -26,7 +27,7 @@ type testLocalStorage struct {
err error
}
func (s *testLocalStorage) Head(addr oid.Address) (*object.Object, error) {
func (s *testLocalStorage) Head(ctx context.Context, addr oid.Address) (*object.Object, error) {
require.True(s.t, addr.Container().Equals(s.expAddr.Container()))
require.True(s.t, addr.Object().Equals(s.expAddr.Object()))

View file

@ -1,6 +1,7 @@
package v2
import (
"context"
"errors"
"fmt"
@ -27,7 +28,7 @@ type cfg struct {
}
type ObjectStorage interface {
Head(oid.Address) (*object.Object, error)
Head(context.Context, oid.Address) (*object.Object, error)
}
type Request interface {
@ -207,7 +208,7 @@ func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, b
addr.SetContainer(cnr)
addr.SetObject(*idObj)
obj, err := h.storage.Head(addr)
obj, err := h.storage.Head(context.TODO(), addr)
if err == nil {
return headersFromObject(obj, cnr, idObj), true
}

View file

@ -1,6 +1,7 @@
package v2
import (
"context"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
@ -12,10 +13,10 @@ type localStorage struct {
ls *engine.StorageEngine
}
func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) {
func (s *localStorage) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
if s.ls == nil {
return nil, io.ErrUnexpectedEOF
}
return engine.Head(s.ls, addr)
return engine.Head(ctx, s.ls, addr)
}

View file

@ -6,6 +6,7 @@ import (
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
@ -111,6 +112,14 @@ func New(opts ...Option) Service {
// Get implements ServiceServer interface, makes ACL checks and calls
// next Get method in the ServiceServer pipeline.
func (b Service) Get(request *objectV2.GetRequest, stream object.GetObjectStream) error {
spanClosed := false
_, span := tracing.StartSpanFromContext(stream.Context(), "checkACL")
defer func() {
if !spanClosed {
span.End()
}
}()
cnr, err := getContainerIDFromRequest(request)
if err != nil {
return err
@ -158,6 +167,9 @@ func (b Service) Get(request *objectV2.GetRequest, stream object.GetObjectStream
return eACLErr(reqInfo, err)
}
span.End()
spanClosed = true
return b.next.Get(request, &getStreamBasicChecker{
GetObjectStream: stream,
info: reqInfo,
@ -177,6 +189,14 @@ func (b Service) Put() (object.PutObjectStream, error) {
func (b Service) Head(
ctx context.Context,
request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
spanClosed := false
_, span := tracing.StartSpanFromContext(ctx, "checkACL")
defer func() {
if !spanClosed {
span.End()
}
}()
cnr, err := getContainerIDFromRequest(request)
if err != nil {
return nil, err
@ -224,6 +244,9 @@ func (b Service) Head(
return nil, eACLErr(reqInfo, err)
}
span.End()
spanClosed = true
resp, err := b.next.Head(ctx, request)
if err == nil {
if err = b.checker.CheckEACL(resp, reqInfo); err != nil {
@ -235,6 +258,14 @@ func (b Service) Head(
}
func (b Service) Search(request *objectV2.SearchRequest, stream object.SearchStream) error {
spanClosed := false
_, span := tracing.StartSpanFromContext(stream.Context(), "checkACL")
defer func() {
if !spanClosed {
span.End()
}
}()
id, err := getContainerIDFromRequest(request)
if err != nil {
return err
@ -275,6 +306,9 @@ func (b Service) Search(request *objectV2.SearchRequest, stream object.SearchStr
return eACLErr(reqInfo, err)
}
span.End()
spanClosed = true
return b.next.Search(request, &searchStreamBasicChecker{
checker: b.checker,
SearchStream: stream,
@ -285,6 +319,14 @@ func (b Service) Search(request *objectV2.SearchRequest, stream object.SearchStr
func (b Service) Delete(
ctx context.Context,
request *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) {
spanClosed := false
_, span := tracing.StartSpanFromContext(ctx, "checkACL")
defer func() {
if !spanClosed {
span.End()
}
}()
cnr, err := getContainerIDFromRequest(request)
if err != nil {
return nil, err
@ -332,10 +374,21 @@ func (b Service) Delete(
return nil, eACLErr(reqInfo, err)
}
span.End()
spanClosed = true
return b.next.Delete(ctx, request)
}
func (b Service) GetRange(request *objectV2.GetRangeRequest, stream object.GetObjectRangeStream) error {
spanClosed := false
_, span := tracing.StartSpanFromContext(stream.Context(), "checkACL")
defer func() {
if !spanClosed {
span.End()
}
}()
cnr, err := getContainerIDFromRequest(request)
if err != nil {
return err
@ -383,6 +436,9 @@ func (b Service) GetRange(request *objectV2.GetRangeRequest, stream object.GetOb
return eACLErr(reqInfo, err)
}
span.End()
spanClosed = true
return b.next.GetRange(request, &rangeStreamBasicChecker{
checker: b.checker,
GetObjectRangeStream: stream,
@ -393,6 +449,14 @@ func (b Service) GetRange(request *objectV2.GetRangeRequest, stream object.GetOb
func (b Service) GetRangeHash(
ctx context.Context,
request *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
spanClosed := false
_, span := tracing.StartSpanFromContext(ctx, "checkACL")
defer func() {
if !spanClosed {
span.End()
}
}()
cnr, err := getContainerIDFromRequest(request)
if err != nil {
return nil, err
@ -440,10 +504,21 @@ func (b Service) GetRangeHash(
return nil, eACLErr(reqInfo, err)
}
span.End()
spanClosed = true
return b.next.GetRangeHash(ctx, request)
}
func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
spanClosed := false
_, span := tracing.StartSpanFromContext(ctx, "checkACL")
defer func() {
if !spanClosed {
span.End()
}
}()
body := request.GetBody()
if body == nil {
return errEmptyBody
@ -512,6 +587,9 @@ func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRe
}
}
span.End()
spanClosed = true
return p.next.Send(ctx, request)
}

View file

@ -139,7 +139,7 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro
}{obj: obj, err: err}
}
func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) {
var (
ok bool
obj *objectSDK.Object

View file

@ -4,15 +4,21 @@ import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
func (exec *execCtx) executeLocal(ctx context.Context) {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal")
defer func() {
span.End()
}()
var err error
exec.collectedObject, err = exec.svc.localStorage.get(exec)
exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec)
var errSplitInfo *objectSDK.SplitInfoError
var errRemoved apistatus.ObjectAlreadyRemoved

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -11,6 +12,9 @@ import (
)
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
defer span.End()
exec.log.Debug("processing node...")
client, ok := exec.remoteClient(info)

View file

@ -31,7 +31,7 @@ type cfg struct {
log *logger.Logger
localStorage interface {
get(*execCtx) (*object.Object, error)
get(context.Context, *execCtx) (*object.Object, error)
}
clientCache interface {

View file

@ -200,13 +200,13 @@ func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.Priva
return res.Object(), nil
}
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) {
if exec.headOnly() {
var headPrm engine.HeadPrm
headPrm.WithAddress(exec.address())
headPrm.WithRaw(exec.isRaw())
r, err := e.engine.Head(headPrm)
r, err := e.engine.Head(ctx, headPrm)
if err != nil {
return nil, err
}
@ -217,7 +217,7 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
getRange.WithAddress(exec.address())
getRange.WithPayloadRange(rng)
r, err := e.engine.GetRange(getRange)
r, err := e.engine.GetRange(ctx, getRange)
if err != nil {
return nil, err
}
@ -227,7 +227,7 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
var getPrm engine.GetPrm
getPrm.WithAddress(exec.address())
r, err := e.engine.Get(getPrm)
r, err := e.engine.Get(ctx, getPrm)
if err != nil {
return nil, err
}

View file

@ -9,6 +9,7 @@ import (
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -18,6 +19,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type getRequestForwarder struct {
@ -30,6 +33,11 @@ type getRequestForwarder struct {
}
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "getRequestForwarder.forwardRequestToNode",
trace.WithAttributes(attribute.String("address", addr.String())),
)
defer span.End()
var err error
// once compose and resign forwarding request

View file

@ -9,6 +9,7 @@ import (
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -18,6 +19,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type getRangeRequestForwarder struct {
@ -29,6 +32,11 @@ type getRangeRequestForwarder struct {
}
func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "getRangeRequestForwarder.forwardRequestToNode",
trace.WithAttributes(attribute.String("address", addr.String())),
)
defer span.End()
var err error
// once compose and resign forwarding request

View file

@ -8,6 +8,7 @@ import (
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
@ -19,6 +20,8 @@ import (
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type headRequestForwarder struct {
@ -30,6 +33,11 @@ type headRequestForwarder struct {
}
func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "headRequestForwarder.forwardRequestToNode",
trace.WithAttributes(attribute.String("address", addr.String())),
)
defer span.End()
var err error
// once compose and resign forwarding request

View file

@ -96,7 +96,7 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
resp := new(objectV2.HeadResponse)
resp.SetBody(new(objectV2.HeadResponseBody))
p, err := s.toHeadPrm(ctx, req, resp)
p, err := s.toHeadPrm(req, resp)
if err != nil {
return nil, err
}

View file

@ -215,7 +215,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
return nil
}
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody()
addrV2 := body.GetAddress()

View file

@ -54,7 +54,7 @@ func (s *getStreamSigner) Send(resp *object.GetResponse) error {
}
func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error {
return s.sigSvc.HandleServerStreamRequest(req,
return s.sigSvc.HandleServerStreamRequest(stream.Context(), req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
},
@ -126,7 +126,7 @@ func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
}
func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error {
return s.sigSvc.HandleServerStreamRequest(req,
return s.sigSvc.HandleServerStreamRequest(stream.Context(), req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
},
@ -176,7 +176,7 @@ func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error {
}
func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
return s.sigSvc.HandleServerStreamRequest(req,
return s.sigSvc.HandleServerStreamRequest(stream.Context(), req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
},

View file

@ -27,7 +27,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
if task.obj == nil {
var err error
task.obj, err = engine.Get(p.localStorage, task.addr)
task.obj, err = engine.Get(ctx, p.localStorage, task.addr)
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),

View file

@ -8,6 +8,7 @@ import (
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"github.com/hashicorp/golang-lru/v2/simplelru"
"google.golang.org/grpc"
@ -84,8 +85,15 @@ func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn,
return nil, err
}
opts := make([]grpc.DialOption, 1, 2)
opts[0] = grpc.WithBlock()
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor(
tracing.NewGRPCUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
tracing.NewGRPCStreamClientInterceptor(),
),
}
// FIXME(@fyrchik): ugly hack #1322
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {

View file

@ -10,6 +10,7 @@ import (
"math/rand"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -146,7 +147,14 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return false
}
cc, err := grpc.DialContext(egCtx, a.URIAddr(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.DialContext(egCtx, a.URIAddr(),
grpc.WithChainUnaryInterceptor(
tracing.NewGRPCUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
tracing.NewGRPCStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// Failed to connect, try the next address.
return false

View file

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -67,8 +68,7 @@ func (s *RequestMessageStreamer) Send(ctx context.Context, req any) error {
var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
if err = verifyRequestSignature(ctx, req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = s.send(ctx, req)
@ -112,7 +112,7 @@ func (s *RequestMessageStreamer) CloseAndRecv(ctx context.Context) (ResponseMess
setStatusV2(resp, err)
}
if err = signResponse(s.key, resp, s.statusSupported); err != nil {
if err = signResponse(ctx, s.key, resp, s.statusSupported); err != nil {
return nil, err
}
@ -130,6 +130,7 @@ func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer
}
func (s *SignService) HandleServerStreamRequest(
ctx context.Context,
req any,
respWriter ResponseMessageWriter,
blankResp ResponseConstructor,
@ -142,12 +143,11 @@ func (s *SignService) HandleServerStreamRequest(
var err error
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
if err = verifyRequestSignature(ctx, req); err != nil {
err = fmt.Errorf("could not verify request: %w", err)
} else {
err = respWriterCaller(func(resp ResponseMessage) error {
if err := signResponse(s.key, resp, statusSupported); err != nil {
if err := signResponse(ctx, s.key, resp, statusSupported); err != nil {
return err
}
@ -164,7 +164,7 @@ func (s *SignService) HandleServerStreamRequest(
setStatusV2(resp, err)
_ = signResponse(s.key, resp, false) // panics or returns nil with false arg
_ = signResponse(ctx, s.key, resp, false) // panics or returns nil with false arg
return respWriter(resp)
}
@ -183,8 +183,7 @@ func (s *SignService) HandleUnaryRequest(ctx context.Context, req any, handler U
err error
)
// verify request signatures
if err = signature.VerifyServiceMessage(req); err != nil {
if err = verifyRequestSignature(ctx, req); err != nil {
var sigErr apistatus.SignatureVerification
sigErr.SetMessage(err.Error())
@ -205,7 +204,7 @@ func (s *SignService) HandleUnaryRequest(ctx context.Context, req any, handler U
}
// sign the response
if err = signResponse(s.key, resp, statusSupported); err != nil {
if err = signResponse(ctx, s.key, resp, statusSupported); err != nil {
return nil, err
}
@ -233,7 +232,10 @@ func setStatusV2(resp ResponseMessage, err error) {
// The signature error affects the result depending on the protocol version:
// - if status return is supported, panics since we cannot return the failed status, because it will not be signed;
// - otherwise, returns error in order to transport it directly.
func signResponse(key *ecdsa.PrivateKey, resp any, statusSupported bool) error {
func signResponse(ctx context.Context, key *ecdsa.PrivateKey, resp any, statusSupported bool) error {
_, span := tracing.StartSpanFromContext(ctx, "signResponse")
defer span.End()
err := signature.SignServiceMessage(key, resp)
if err != nil {
err = fmt.Errorf("could not sign response: %w", err)
@ -247,3 +249,10 @@ func signResponse(key *ecdsa.PrivateKey, resp any, statusSupported bool) error {
return err
}
func verifyRequestSignature(ctx context.Context, req any) error {
_, span := tracing.StartSpanFromContext(ctx, "verifyRequestSignature")
defer span.End()
return signature.VerifyServiceMessage(req)
}