[#135] get-object: Add tracing spans

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-03-13 14:37:35 +03:00 committed by fyrchik
parent 5af9f58469
commit 0920d848d0
80 changed files with 523 additions and 231 deletions

View file

@ -33,7 +33,7 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
var prm blobovnicza.GetPrm
prm.SetAddress(addr)
res, err := blz.Get(prm)
res, err := blz.Get(cmd.Context(), prm)
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))
data := res.Object()

View file

@ -102,7 +102,7 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
initAndLog(c, "session", initSessionService)
initAndLog(c, "reputation", func(c *cfg) { initReputationService(ctx, c) })
initAndLog(c, "notification", initNotifications)
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService)

View file

@ -23,7 +23,7 @@ type notificationSource struct {
defaultTopic string
}
func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, addr oid.Address)) {
func (n *notificationSource) Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address)) {
log := n.l.With(zap.Uint64("epoch", epoch))
listRes, err := n.e.ListContainers(engine.ListContainersPrm{})
@ -51,7 +51,7 @@ func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, ad
}
for _, a := range selectRes.AddressList() {
err = n.processAddress(a, handler)
err = n.processAddress(ctx, a, handler)
if err != nil {
log.Error("notificator: could not process object",
zap.Stringer("address", a),
@ -66,13 +66,14 @@ func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, ad
}
func (n *notificationSource) processAddress(
ctx context.Context,
a oid.Address,
h func(topic string, addr oid.Address),
) error {
var prm engine.HeadPrm
prm.WithAddress(a)
res, err := n.e.Head(prm)
res, err := n.e.Head(ctx, prm)
if err != nil {
return err
}
@ -108,7 +109,7 @@ func (n notificationWriter) Notify(topic string, address oid.Address) {
}
}
func initNotifications(c *cfg) {
func initNotifications(ctx context.Context, c *cfg) {
if nodeconfig.Notification(c.appCfg).Enabled() {
topic := nodeconfig.Notification(c.appCfg).DefaultTopic()
pubKey := hex.EncodeToString(c.cfgNodeInfo.localInfo.PublicKey())
@ -151,7 +152,7 @@ func initNotifications(c *cfg) {
addNewEpochAsyncNotificationHandler(c, func(e event.Event) {
ev := e.(netmap.NewEpoch)
n.ProcessEpoch(ev.EpochNumber())
n.ProcessEpoch(ctx, ev.EpochNumber())
})
}
}

6
go.mod
View file

@ -20,7 +20,6 @@ require (
github.com/multiformats/go-multiaddr v0.8.0
github.com/nats-io/nats.go v1.22.1
github.com/nspcc-dev/neo-go v0.100.1
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb // indirect
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.4.0
github.com/paulmach/orb v0.2.2
@ -31,6 +30,8 @@ require (
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
@ -80,6 +81,7 @@ require (
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20221202075445-cb5c18dc73eb // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@ -94,13 +96,11 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
github.com/twmb/murmur3 v1.1.5 // indirect
github.com/urfave/cli v1.22.5 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect

View file

@ -1,6 +1,7 @@
package blobovnicza
import (
"context"
"errors"
"math/rand"
"os"
@ -39,7 +40,7 @@ func testGet(t *testing.T, blz *Blobovnicza, addr oid.Address, expObj []byte, as
pGet.SetAddress(addr)
// try to read object from Blobovnicza
res, err := blz.Get(pGet)
res, err := blz.Get(context.Background(), pGet)
if assertErr != nil {
require.True(t, assertErr(err))
} else {

View file

@ -1,12 +1,16 @@
package blobovnicza
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// GetPrm groups the parameters of Get operation.
@ -39,7 +43,13 @@ var errInterruptForEach = errors.New("interrupt for-each")
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
// presented in Blobovnicza.
func (b *Blobovnicza) Get(prm GetPrm) (GetRes, error) {
func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Get",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
))
defer span.End()
var (
data []byte
addrKey = addressKey(prm.addr)

View file

@ -1,6 +1,7 @@
package blobovnicza
import (
"context"
"os"
"path/filepath"
"testing"
@ -56,7 +57,7 @@ func TestBlobovnicza_Get(t *testing.T) {
prmGet.SetAddress(addr)
checkObj := func() {
res, err := blz.Get(prmGet)
res, err := blz.Get(context.Background(), prmGet)
require.NoError(t, err)
require.Equal(t, obj, res.Object())

View file

@ -1,15 +1,27 @@
package blobovniczatree
import (
"context"
"encoding/hex"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// Exists implements common.Storage.
func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Exists",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
))
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String())
@ -32,7 +44,7 @@ func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
_, ok := activeCache[dirPath]
_, err := b.getObjectFromLevel(gPrm, p, !ok)
_, err := b.getObjectFromLevel(ctx, gPrm, p, !ok)
if err != nil {
if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not get object from level",

View file

@ -1,6 +1,7 @@
package blobovniczatree
import (
"context"
"os"
"path/filepath"
"testing"
@ -44,7 +45,7 @@ func TestExistsInvalidStorageID(t *testing.T) {
storageID[0]--
}
res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID})
res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: storageID})
require.NoError(t, err)
require.False(t, res.Exists)
})
@ -57,7 +58,7 @@ func TestExistsInvalidStorageID(t *testing.T) {
require.NoError(t, os.Chmod(badDir, 0))
t.Cleanup(func() { _ = os.Chmod(filepath.Join(dir, "9"), os.ModePerm) })
res, err := b.Exists(common.ExistsPrm{Address: addr, StorageID: storageID})
res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: storageID})
require.Error(t, err)
require.False(t, res.Exists)
})

View file

@ -1,14 +1,19 @@
package blobovniczatree
import (
"context"
"encoding/hex"
"fmt"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -16,7 +21,15 @@ import (
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
// Otherwise, all Blobovniczas are processed descending weight.
func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.GetRes, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Get",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.Bool("raw", prm.Raw),
))
defer span.End()
var bPrm blobovnicza.GetPrm
bPrm.SetAddress(prm.Address)
@ -27,7 +40,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
return res, err
}
return b.getObject(blz, bPrm)
return b.getObject(ctx, blz, bPrm)
}
activeCache := make(map[string]struct{})
@ -37,7 +50,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
_, ok := activeCache[dirPath]
res, err = b.getObjectFromLevel(bPrm, p, !ok)
res, err = b.getObjectFromLevel(ctx, bPrm, p, !ok)
if err != nil {
if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not get object from level",
@ -64,7 +77,7 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
// tries to read object from particular blobovnicza.
//
// returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) {
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
@ -72,7 +85,7 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
v, ok := b.opened.Get(blzPath)
b.lruMtx.Unlock()
if ok {
if res, err := b.getObject(v, prm); err == nil {
if res, err := b.getObject(ctx, v, prm); err == nil {
return res, err
} else if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not read object from opened blobovnicza",
@ -92,7 +105,7 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
b.activeMtx.RUnlock()
if ok && tryActive {
if res, err := b.getObject(active.blz, prm); err == nil {
if res, err := b.getObject(ctx, active.blz, prm); err == nil {
return res, err
} else if !blobovnicza.IsErrNotFound(err) {
b.log.Debug("could not get object from active blobovnicza",
@ -117,12 +130,12 @@ func (b *Blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
return common.GetRes{}, err
}
return b.getObject(blz, prm)
return b.getObject(ctx, blz, prm)
}
// reads object from blobovnicza and returns GetSmallRes.
func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (common.GetRes, error) {
res, err := blz.Get(prm)
func (b *Blobovniczas) getObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (common.GetRes, error) {
res, err := blz.Get(ctx, prm)
if err != nil {
return common.GetRes{}, err
}

View file

@ -1,14 +1,20 @@
package blobovniczatree
import (
"context"
"encoding/hex"
"fmt"
"path/filepath"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -16,7 +22,16 @@ import (
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
// Otherwise, all Blobovniczas are processed descending weight.
func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes, err error) {
func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (res common.GetRangeRes, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.GetRange",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String())
@ -24,7 +39,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes,
return common.GetRangeRes{}, err
}
return b.getObjectRange(blz, prm)
return b.getObjectRange(ctx, blz, prm)
}
activeCache := make(map[string]struct{})
@ -35,7 +50,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes,
_, ok := activeCache[dirPath]
res, err = b.getRangeFromLevel(prm, p, !ok)
res, err = b.getRangeFromLevel(ctx, prm, p, !ok)
if err != nil {
outOfBounds := isErrOutOfRange(err)
if !outOfBounds && !blobovnicza.IsErrNotFound(err) {
@ -68,7 +83,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes,
// tries to read range of object payload data from particular blobovnicza.
//
// returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) {
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
@ -76,7 +91,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
v, ok := b.opened.Get(blzPath)
b.lruMtx.Unlock()
if ok {
res, err := b.getObjectRange(v, prm)
res, err := b.getObjectRange(ctx, v, prm)
switch {
case err == nil,
isErrOutOfRange(err):
@ -101,7 +116,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
b.activeMtx.RUnlock()
if ok && tryActive {
res, err := b.getObjectRange(active.blz, prm)
res, err := b.getObjectRange(ctx, active.blz, prm)
switch {
case err == nil,
isErrOutOfRange(err):
@ -131,11 +146,11 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
return common.GetRangeRes{}, err
}
return b.getObjectRange(blz, prm)
return b.getObjectRange(ctx, blz, prm)
}
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
func (b *Blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm common.GetRangePrm) (common.GetRangeRes, error) {
func (b *Blobovniczas) getObjectRange(ctx context.Context, blz *blobovnicza.Blobovnicza, prm common.GetRangePrm) (common.GetRangeRes, error) {
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.Address)
@ -143,7 +158,7 @@ func (b *Blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm common.G
// stores data that is compressed on BlobStor side.
// If blobovnicza learns to do the compression itself,
// we can start using GetRange.
res, err := blz.Get(gPrm)
res, err := blz.Get(ctx, gPrm)
if err != nil {
return common.GetRangeRes{}, err
}

View file

@ -1,6 +1,7 @@
package blobstor
import (
"context"
"path/filepath"
"testing"
@ -62,11 +63,11 @@ func TestCompression(t *testing.T) {
}
testGet := func(t *testing.T, b *BlobStor, i int) {
res1, err := b.Get(common.GetPrm{Address: object.AddressOf(smallObj[i])})
res1, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(smallObj[i])})
require.NoError(t, err)
require.Equal(t, smallObj[i], res1.Object)
res2, err := b.Get(common.GetPrm{Address: object.AddressOf(bigObj[i])})
res2, err := b.Get(context.Background(), common.GetPrm{Address: object.AddressOf(bigObj[i])})
require.NoError(t, err)
require.Equal(t, bigObj[i], res2.Object)
}

View file

@ -1,6 +1,10 @@
package common
import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
)
// Storage represents key-value object storage.
// It is used as a building block for a blobstor of a shard.
@ -16,9 +20,9 @@ type Storage interface {
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)
Exists(ExistsPrm) (ExistsRes, error)
Get(context.Context, GetPrm) (GetRes, error)
GetRange(context.Context, GetRangePrm) (GetRangeRes, error)
Exists(context.Context, ExistsPrm) (ExistsRes, error)
Put(PutPrm) (PutRes, error)
Delete(DeletePrm) (DeleteRes, error)
Iterate(IteratePrm) (IterateRes, error)

View file

@ -1,7 +1,13 @@
package blobstor
import (
"context"
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -9,15 +15,22 @@ import (
//
// Returns any error encountered that did not allow
// to completely check object existence.
func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Exists",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
))
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID != nil {
if len(prm.StorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.Exists(prm)
return b.storage[len(b.storage)-1].Storage.Exists(ctx, prm)
}
return b.storage[0].Storage.Exists(prm)
return b.storage[0].Storage.Exists(ctx, prm)
}
// If there was an error during existence check below,
@ -31,7 +44,7 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
// error | error | log the first error, return the second
var errors []error
for i := range b.storage {
res, err := b.storage[i].Storage.Exists(prm)
res, err := b.storage[i].Storage.Exists(ctx, prm)
if err == nil && res.Exists {
return res, nil
} else if err != nil {

View file

@ -1,6 +1,7 @@
package blobstor
import (
"context"
"os"
"testing"
@ -43,13 +44,13 @@ func TestExists(t *testing.T) {
for i := range objects {
prm.Address = objectCore.AddressOf(objects[i])
res, err := b.Exists(prm)
res, err := b.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
}
prm.Address = oidtest.Address()
res, err := b.Exists(prm)
res, err := b.Exists(context.Background(), prm)
require.NoError(t, err)
require.False(t, res.Exists)
@ -60,13 +61,13 @@ func TestExists(t *testing.T) {
// Object exists, first error is logged.
prm.Address = objectCore.AddressOf(objects[0])
res, err := b.Exists(prm)
res, err := b.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
// Object doesn't exist, first error is returned.
prm.Address = objectCore.AddressOf(objects[1])
_, err = b.Exists(prm)
_, err = b.Exists(context.Background(), prm)
require.Error(t, err)
require.ErrorIs(t, err, teststore.ErrDiskExploded)
})

View file

@ -1,6 +1,7 @@
package fstree
import (
"context"
"crypto/sha256"
"errors"
"fmt"
@ -11,6 +12,7 @@ import (
"strings"
"syscall"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -19,6 +21,8 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// FSTree represents an object storage as a filesystem tree.
@ -208,7 +212,13 @@ func (t *FSTree) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
// Exists returns the path to the file with object contents if it exists in the storage
// and an error otherwise.
func (t *FSTree) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Exists",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
))
defer span.End()
p := t.treePath(prm.Address)
_, err := os.Stat(p)
@ -336,16 +346,30 @@ func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error
}
// Get returns an object from the storage by address.
func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.Get",
trace.WithAttributes(
attribute.Bool("raw", prm.Raw),
attribute.String("address", prm.Address.EncodeToString()),
))
defer span.End()
p := t.treePath(prm.Address)
if _, err := os.Stat(p); os.IsNotExist(err) {
return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
data, err := os.ReadFile(p)
if err != nil {
return common.GetRes{}, err
var data []byte
var err error
{
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Get.ReadFile")
defer span.End()
data, err = os.ReadFile(p)
if err != nil {
return common.GetRes{}, err
}
}
data, err = t.Decompress(data)
@ -362,8 +386,16 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
}
// GetRange implements common.Storage.
func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
res, err := t.Get(common.GetPrm{Address: prm.Address})
func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.GetRange",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
res, err := t.Get(ctx, common.GetPrm{Address: prm.Address})
if err != nil {
return common.GetRangeRes{}, err
}

View file

@ -1,23 +1,36 @@
package blobstor
import (
"context"
"encoding/hex"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Get reads the object from b.
// If the descriptor is present, only one sub-storage is tried,
// Otherwise, each sub-storage is tried in order.
func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Get",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.Bool("raw", prm.Raw),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
))
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID == nil {
for i := range b.storage {
res, err := b.storage[i].Storage.Get(prm)
res, err := b.storage[i].Storage.Get(ctx, prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
}
@ -26,7 +39,7 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
if len(prm.StorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.Get(prm)
return b.storage[len(b.storage)-1].Storage.Get(ctx, prm)
}
return b.storage[0].Storage.Get(prm)
return b.storage[0].Storage.Get(ctx, prm)
}

View file

@ -1,23 +1,38 @@
package blobstor
import (
"context"
"encoding/hex"
"errors"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// GetRange reads object payload data from b.
// If the descriptor is present, only one sub-storage is tried,
// Otherwise, each sub-storage is tried in order.
func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.GetRange",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
if prm.StorageID == nil {
for i := range b.storage {
res, err := b.storage[i].Storage.GetRange(prm)
res, err := b.storage[i].Storage.GetRange(ctx, prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
}
@ -26,7 +41,7 @@ func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error)
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
}
if len(prm.StorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.GetRange(prm)
return b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm)
}
return b.storage[0].Storage.GetRange(prm)
return b.storage[0].Storage.GetRange(ctx, prm)
}

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"math/rand"
"testing"
@ -26,7 +27,7 @@ func TestControl(t *testing.T, cons Constructor, min, max uint64) {
prm.StorageID = objects[i].storageID
prm.Raw = true
_, err := s.Get(prm)
_, err := s.Get(context.Background(), prm)
require.NoError(t, err)
}

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -35,18 +36,18 @@ func TestDelete(t *testing.T, cons Constructor, min, max uint64) {
t.Run("exists fail", func(t *testing.T) {
prm := common.ExistsPrm{Address: oidtest.Address()}
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.False(t, res.Exists)
})
t.Run("get fail", func(t *testing.T) {
prm := common.GetPrm{Address: oidtest.Address()}
_, err := s.Get(prm)
_, err := s.Get(context.Background(), prm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
t.Run("getrange fail", func(t *testing.T) {
prm := common.GetRangePrm{Address: oidtest.Address()}
_, err := s.GetRange(prm)
_, err := s.GetRange(context.Background(), prm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
})
@ -75,7 +76,7 @@ func TestDelete(t *testing.T, cons Constructor, min, max uint64) {
prm.Address = objects[3].addr
prm.Raw = true
res, err := s.Get(prm)
res, err := s.Get(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objects[3].raw, res.RawData)
})

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -18,7 +19,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) {
t.Run("missing object", func(t *testing.T) {
prm := common.ExistsPrm{Address: oidtest.Address()}
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.False(t, res.Exists)
})
@ -29,7 +30,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) {
t.Run("without storage ID", func(t *testing.T) {
prm.StorageID = nil
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
})
@ -37,7 +38,7 @@ func TestExists(t *testing.T, cons Constructor, min, max uint64) {
t.Run("with storage ID", func(t *testing.T) {
prm.StorageID = objects[0].storageID
res, err := s.Exists(prm)
res, err := s.Exists(context.Background(), prm)
require.NoError(t, err)
require.True(t, res.Exists)
})

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -19,7 +20,7 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
t.Run("missing object", func(t *testing.T) {
gPrm := common.GetPrm{Address: oidtest.Address()}
_, err := s.Get(gPrm)
_, err := s.Get(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
@ -29,13 +30,13 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
// With storage ID.
gPrm.StorageID = objects[i].storageID
res, err := s.Get(gPrm)
res, err := s.Get(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
// Without storage ID.
gPrm.StorageID = nil
res, err = s.Get(gPrm)
res, err = s.Get(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
@ -43,7 +44,7 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
gPrm.StorageID = objects[i].storageID
gPrm.Raw = true
res, err = s.Get(gPrm)
res, err = s.Get(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].raw, res.RawData)
}

View file

@ -1,6 +1,7 @@
package blobstortest
import (
"context"
"math"
"testing"
@ -20,7 +21,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
t.Run("missing object", func(t *testing.T) {
gPrm := common.GetRangePrm{Address: oidtest.Address()}
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
@ -38,14 +39,14 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
t.Run("without storage ID", func(t *testing.T) {
// Without storage ID.
res, err := s.GetRange(gPrm)
res, err := s.GetRange(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, payload[start:stop], res.Data)
})
t.Run("with storage ID", func(t *testing.T) {
gPrm.StorageID = objects[0].storageID
res, err := s.GetRange(gPrm)
res, err := s.GetRange(context.Background(), gPrm)
require.NoError(t, err)
require.Equal(t, payload[start:stop], res.Data)
})
@ -54,7 +55,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(uint64(len(payload) + 10))
gPrm.Range.SetLength(10)
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
@ -62,7 +63,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(10)
gPrm.Range.SetLength(uint64(len(payload)))
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
@ -70,7 +71,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(0)
gPrm.Range.SetLength(1 << 63)
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
@ -78,7 +79,7 @@ func TestGetRange(t *testing.T, cons Constructor, min, max uint64) {
gPrm.Range.SetOffset(10)
gPrm.Range.SetLength(math.MaxUint64 - 2)
_, err := s.GetRange(gPrm)
_, err := s.GetRange(context.Background(), gPrm)
require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange))
})
}

View file

@ -2,6 +2,7 @@
package memstore
import (
"context"
"fmt"
"sync"
@ -32,7 +33,7 @@ func New(opts ...Option) common.Storage {
return st
}
func (s *memstoreImpl) Get(req common.GetPrm) (common.GetRes, error) {
func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()
@ -58,8 +59,8 @@ func (s *memstoreImpl) Get(req common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, nil
}
func (s *memstoreImpl) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(common.GetPrm{
func (s *memstoreImpl) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(ctx, common.GetPrm{
Address: req.Address,
StorageID: req.StorageID,
})
@ -80,7 +81,7 @@ func (s *memstoreImpl) GetRange(req common.GetRangePrm) (common.GetRangeRes, err
}, nil
}
func (s *memstoreImpl) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
func (s *memstoreImpl) Exists(_ context.Context, req common.ExistsPrm) (common.ExistsRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()

View file

@ -1,6 +1,7 @@
package memstore
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -32,13 +33,13 @@ func TestSimpleLifecycle(t *testing.T) {
}
{
resp, err := s.Exists(common.ExistsPrm{Address: addr})
resp, err := s.Exists(context.Background(), common.ExistsPrm{Address: addr})
require.NoError(t, err)
require.True(t, resp.Exists)
}
{
resp, err := s.Get(common.GetPrm{Address: addr})
resp, err := s.Get(context.Background(), common.GetPrm{Address: addr})
require.NoError(t, err)
require.Equal(t, obj.Payload(), resp.Object.Payload())
}
@ -47,7 +48,7 @@ func TestSimpleLifecycle(t *testing.T) {
var objRange objectSDK.Range
objRange.SetOffset(256)
objRange.SetLength(512)
resp, err := s.GetRange(common.GetRangePrm{
resp, err := s.GetRange(context.Background(), common.GetRangePrm{
Address: addr,
Range: objRange,
})
@ -61,7 +62,7 @@ func TestSimpleLifecycle(t *testing.T) {
}
{
resp, err := s.Exists(common.ExistsPrm{Address: addr})
resp, err := s.Exists(context.Background(), common.ExistsPrm{Address: addr})
require.NoError(t, err)
require.False(t, resp.Exists)
}

View file

@ -1,6 +1,7 @@
package blobstor
import (
"context"
"fmt"
"os"
"testing"
@ -127,7 +128,7 @@ func BenchmarkSubstorageReadPerf(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := st.Get(common.GetPrm{Address: addrGen.Next()})
_, err := st.Get(context.Background(), common.GetPrm{Address: addrGen.Next()})
require.NoError(b, err)
}
})

View file

@ -13,6 +13,7 @@
package teststore
import (
"context"
"errors"
"fmt"
"sync"
@ -140,36 +141,36 @@ func (s *TestStore) SetReportErrorFunc(f func(string, error)) {
}
}
func (s *TestStore) Get(req common.GetPrm) (common.GetRes, error) {
func (s *TestStore) Get(ctx context.Context, req common.GetPrm) (common.GetRes, error) {
switch {
case s.overrides.Get != nil:
return s.overrides.Get(req)
case s.st != nil:
return s.st.Get(req)
return s.st.Get(ctx, req)
default:
panic(fmt.Sprintf("unexpected storage call: Get(%+v)", req))
}
}
func (s *TestStore) GetRange(req common.GetRangePrm) (common.GetRangeRes, error) {
func (s *TestStore) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
switch {
case s.overrides.GetRange != nil:
return s.overrides.GetRange(req)
case s.st != nil:
return s.st.GetRange(req)
return s.st.GetRange(ctx, req)
default:
panic(fmt.Sprintf("unexpected storage call: GetRange(%+v)", req))
}
}
func (s *TestStore) Exists(req common.ExistsPrm) (common.ExistsRes, error) {
func (s *TestStore) Exists(ctx context.Context, req common.ExistsPrm) (common.ExistsRes, error) {
switch {
case s.overrides.Exists != nil:
return s.overrides.Exists(req)
case s.st != nil:
return s.st.Exists(req)
return s.st.Exists(ctx, req)
default:
panic(fmt.Sprintf("unexpected storage call: Exists(%+v)", req))
}

View file

@ -212,20 +212,20 @@ func TestExecBlocks(t *testing.T) {
require.NoError(t, e.BlockExecution(errBlock))
// try to exec some op
_, err := Head(e, addr)
_, err := Head(context.Background(), e, addr)
require.ErrorIs(t, err, errBlock)
// resume executions
require.NoError(t, e.ResumeExecution())
_, err = Head(e, addr) // can be any data-related op
_, err = Head(context.Background(), e, addr) // can be any data-related op
require.NoError(t, err)
// close
require.NoError(t, e.Close())
// try exec after close
_, err = Head(e, addr)
_, err = Head(context.Background(), e, addr)
require.Error(t, err)
// try to resume

View file

@ -72,7 +72,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(prm.addr)
resExists, err := sh.Exists(existsPrm)
resExists, err := sh.Exists(ctx, existsPrm)
if err != nil {
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
return true

View file

@ -93,7 +93,7 @@ func checkGetError(t *testing.T, e *StorageEngine, addr oid.Address, expected an
var getPrm GetPrm
getPrm.WithAddress(addr)
_, err := e.Get(getPrm)
_, err := e.Get(context.Background(), getPrm)
if expected != nil {
require.ErrorAs(t, err, expected)
} else {

View file

@ -102,7 +102,7 @@ func TestErrorReporting(t *testing.T) {
te.ng.mtx.RUnlock()
require.NoError(t, err)
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
@ -115,7 +115,7 @@ func TestErrorReporting(t *testing.T) {
}
for i := uint32(1); i < 3; i++ {
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
@ -136,7 +136,7 @@ func TestErrorReporting(t *testing.T) {
te.ng.mtx.RUnlock()
require.NoError(t, err)
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.NoError(t, err)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
@ -149,14 +149,14 @@ func TestErrorReporting(t *testing.T) {
}
for i := uint32(1); i < errThreshold; i++ {
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
for i := uint32(0); i < 2; i++ {
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
@ -193,9 +193,9 @@ func TestBlobstorFailback(t *testing.T) {
for i := range objs {
addr := object.AddressOf(objs[i])
_, err = te.ng.Get(GetPrm{addr: addr})
_, err = te.ng.Get(context.Background(), GetPrm{addr: addr})
require.NoError(t, err)
_, err = te.ng.GetRange(RngPrm{addr: addr})
_, err = te.ng.GetRange(context.Background(), RngPrm{addr: addr})
require.NoError(t, err)
}
@ -213,15 +213,15 @@ func TestBlobstorFailback(t *testing.T) {
for i := range objs {
addr := object.AddressOf(objs[i])
getRes, err := te.ng.Get(GetPrm{addr: addr})
getRes, err := te.ng.Get(context.Background(), GetPrm{addr: addr})
require.NoError(t, err)
require.Equal(t, objs[i], getRes.Object())
rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
rngRes, err := te.ng.GetRange(context.Background(), RngPrm{addr: addr, off: 1, ln: 10})
require.NoError(t, err)
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
_, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
_, err = te.ng.GetRange(context.Background(), RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
}

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"fmt"
@ -58,7 +59,7 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID {
shardIDs[i] = prm.shardID[i].String()
@ -83,7 +84,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
var res EvacuateShardRes
for _, shardID := range shardIDs {
if err = e.evacuateShard(shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
return res, err
}
}
@ -92,7 +93,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
return res, nil
}
func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
@ -113,7 +114,7 @@ func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res
return err
}
if err = e.evacuateObjects(sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
@ -160,7 +161,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
return shards, weights, nil
}
func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
for i := range toEvacuate {
addr := toEvacuate[i].Address
@ -168,7 +169,7 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getRes, err := sh.Get(getPrm)
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
continue
@ -176,7 +177,7 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add
return err
}
if e.tryEvacuateObject(addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) {
if e.tryEvacuateObject(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) {
continue
}
@ -195,14 +196,14 @@ func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.Add
return nil
}
func (e *StorageEngine) tryEvacuateObject(addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
func (e *StorageEngine) tryEvacuateObject(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) bool {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, object)
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",

View file

@ -91,7 +91,7 @@ func TestEvacuateShard(t *testing.T) {
var prm GetPrm
prm.WithAddress(objectCore.AddressOf(objects[i]))
_, err := e.Get(prm)
_, err := e.Get(context.Background(), prm)
require.NoError(t, err)
}
}
@ -102,14 +102,14 @@ func TestEvacuateShard(t *testing.T) {
prm.WithShardIDList(ids[2:3])
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, shard.ErrMustBeReadOnly)
require.Equal(t, 0, res.Count())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objPerShard, res.count)
@ -120,7 +120,7 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t)
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(prm)
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 0, res.count)
@ -165,13 +165,13 @@ func TestEvacuateNetwork(t *testing.T) {
var prm EvacuateShardPrm
prm.shardID = ids[0:1]
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, 0, res.Count())
prm.handler = acceptOneOf(objects, 2)
res, err = e.Evacuate(prm)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
})
@ -185,14 +185,14 @@ func TestEvacuateNetwork(t *testing.T) {
prm.shardID = ids[1:2]
prm.handler = acceptOneOf(objects, 2)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, 3)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 3, res.Count())
})
@ -217,14 +217,14 @@ func TestEvacuateNetwork(t *testing.T) {
prm.shardID = evacuateIDs
prm.handler = acceptOneOf(objects, totalCount-1)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Count())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(prm)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Count())
})

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -16,7 +17,7 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) {
exists := false
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(shPrm)
res, err := sh.Exists(context.TODO(), shPrm)
if err != nil {
if shard.IsErrRemoved(err) {
alreadyRemoved = true

View file

@ -1,14 +1,18 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -43,16 +47,22 @@ func (r GetRes) Object() *objectSDK.Object {
// Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Get(prm GetPrm) (res GetRes, err error) {
func (e *StorageEngine) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
err = e.execIfNotBlocked(func() error {
res, err = e.get(prm)
res, err = e.get(ctx, prm)
return err
})
return
}
func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.get",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
))
defer span.End()
if e.metrics != nil {
defer elapsed(e.metrics.AddGetDuration)()
}
@ -69,7 +79,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
Engine: e,
}
it.tryGetWithMeta()
it.tryGetWithMeta(ctx)
if it.SplitInfo != nil {
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
@ -84,7 +94,7 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
return GetRes{}, it.OutError
}
it.tryGetFromBlobstore()
it.tryGetFromBlobstore(ctx)
if it.Object == nil {
return GetRes{}, it.OutError
@ -116,14 +126,14 @@ type getShardIterator struct {
splitInfoErr *objectSDK.SplitInfoError
}
func (i *getShardIterator) tryGetWithMeta() {
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) {
noMeta := sh.GetMode().NoMetabase()
i.ShardPrm.SetIgnoreMeta(noMeta)
i.HasDegraded = i.HasDegraded || noMeta
res, err := sh.Get(i.ShardPrm)
res, err := sh.Get(ctx, i.ShardPrm)
if err == nil {
i.Object = res.Object()
return true
@ -162,7 +172,7 @@ func (i *getShardIterator) tryGetWithMeta() {
})
}
func (i *getShardIterator) tryGetFromBlobstore() {
func (i *getShardIterator) tryGetFromBlobstore(ctx context.Context) {
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
@ -174,18 +184,18 @@ func (i *getShardIterator) tryGetFromBlobstore() {
return false
}
res, err := sh.Get(i.ShardPrm)
res, err := sh.Get(ctx, i.ShardPrm)
i.Object = res.Object()
return err == nil
})
}
// Get reads object from local storage by provided address.
func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
func Get(ctx context.Context, storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
var getPrm GetPrm
getPrm.WithAddress(addr)
res, err := storage.Get(getPrm)
res, err := storage.Get(ctx, getPrm)
if err != nil {
return nil, err
}

View file

@ -1,8 +1,10 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -52,16 +54,19 @@ func (r HeadRes) Header() *objectSDK.Object {
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object was inhumed.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Head(prm HeadPrm) (res HeadRes, err error) {
func (e *StorageEngine) Head(ctx context.Context, prm HeadPrm) (res HeadRes, err error) {
err = e.execIfNotBlocked(func() error {
res, err = e.head(prm)
res, err = e.head(ctx, prm)
return err
})
return
}
func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.head")
defer span.End()
if e.metrics != nil {
defer elapsed(e.metrics.AddHeadDuration)()
}
@ -81,7 +86,7 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
shPrm.SetRaw(prm.raw)
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Head(shPrm)
res, err := sh.Head(ctx, shPrm)
if err != nil {
switch {
case shard.IsErrNotFound(err):
@ -139,11 +144,11 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
}
// Head reads object header from local storage by provided address.
func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
func Head(ctx context.Context, storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
var headPrm HeadPrm
headPrm.WithAddress(addr)
res, err := storage.Head(headPrm)
res, err := storage.Head(ctx, headPrm)
if err != nil {
return nil, err
}
@ -153,12 +158,12 @@ func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
// HeadRaw reads object header from local storage by provided address and raw
// flag.
func HeadRaw(storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) {
func HeadRaw(ctx context.Context, storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) {
var headPrm HeadPrm
headPrm.WithAddress(addr)
headPrm.WithRaw(raw)
res, err := storage.Head(headPrm)
res, err := storage.Head(ctx, headPrm)
if err != nil {
return nil, err
}

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"os"
"testing"
@ -66,7 +67,7 @@ func TestHeadRaw(t *testing.T) {
headPrm.WithAddress(parentAddr)
headPrm.WithRaw(true)
_, err = e.Head(headPrm)
_, err = e.Head(context.Background(), headPrm)
require.Error(t, err)
var si *object.SplitInfoError

View file

@ -134,7 +134,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
if checkExists {
existPrm.SetAddress(addr)
exRes, err := sh.Exists(existPrm)
exRes, err := sh.Exists(ctx, existPrm)
if err != nil {
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
// inhumed once - no need to be inhumed again

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -69,7 +70,7 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addrLocked)
exRes, err := sh.Exists(existsPrm)
exRes, err := sh.Exists(context.TODO(), existsPrm)
if err != nil {
var siErr *objectSDK.SplitInfoError
if !errors.As(err, &siErr) {

View file

@ -1,6 +1,7 @@
package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -72,7 +73,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
return false
}
putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj)
putDone, exists := e.putToShard(context.TODO(), sh, ind, pool, addr, prm.obj)
finished = putDone || exists
return finished
})
@ -87,7 +88,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
var putSuccess, alreadyExists bool
exitCh := make(chan struct{})
@ -98,7 +99,7 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool
var existPrm shard.ExistsPrm
existPrm.SetAddress(addr)
exists, err := sh.Exists(existPrm)
exists, err := sh.Exists(ctx, existPrm)
if err != nil {
if shard.IsErrObjectExpired(err) {
// object is already found but

View file

@ -1,14 +1,19 @@
package engine
import (
"context"
"errors"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -56,16 +61,24 @@ func (r RngRes) Object() *objectSDK.Object {
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) GetRange(prm RngPrm) (res RngRes, err error) {
func (e *StorageEngine) GetRange(ctx context.Context, prm RngPrm) (res RngRes, err error) {
err = e.execIfNotBlocked(func() error {
res, err = e.getRange(prm)
res, err = e.getRange(ctx, prm)
return err
})
return
}
func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getRange",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
attribute.String("offset", strconv.FormatUint(prm.off, 10)),
attribute.String("length", strconv.FormatUint(prm.ln, 10)),
))
defer span.End()
if e.metrics != nil {
defer elapsed(e.metrics.AddRangeDuration)()
}
@ -83,7 +96,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
Engine: e,
}
it.tryGetWithMeta()
it.tryGetWithMeta(ctx)
if it.SplitInfo != nil {
return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
@ -96,7 +109,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
return RngRes{}, it.OutError
}
it.tryGetFromBlobstor()
it.tryGetFromBlobstor(ctx)
if it.Object == nil {
return RngRes{}, it.OutError
@ -114,12 +127,12 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
}
// GetRange reads object payload range from local storage by provided address.
func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) {
func GetRange(ctx context.Context, storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) {
var rangePrm RngPrm
rangePrm.WithAddress(addr)
rangePrm.WithPayloadRange(rng)
res, err := storage.GetRange(rangePrm)
res, err := storage.GetRange(ctx, rangePrm)
if err != nil {
return nil, err
}
@ -141,13 +154,13 @@ type getRangeShardIterator struct {
Engine *StorageEngine
}
func (i *getRangeShardIterator) tryGetWithMeta() {
func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) {
noMeta := sh.GetMode().NoMetabase()
i.HasDegraded = i.HasDegraded || noMeta
i.ShardPrm.SetIgnoreMeta(noMeta)
res, err := sh.GetRange(i.ShardPrm)
res, err := sh.GetRange(ctx, i.ShardPrm)
if err == nil {
i.Object = res.Object()
return true
@ -185,7 +198,7 @@ func (i *getRangeShardIterator) tryGetWithMeta() {
})
}
func (i *getRangeShardIterator) tryGetFromBlobstor() {
func (i *getRangeShardIterator) tryGetFromBlobstor(ctx context.Context) {
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
@ -197,7 +210,7 @@ func (i *getRangeShardIterator) tryGetFromBlobstor() {
return false
}
res, err := sh.GetRange(i.ShardPrm)
res, err := sh.GetRange(ctx, i.ShardPrm)
if shard.IsErrOutOfRange(err) {
var errOutOfRange apistatus.ObjectOutOfRange

View file

@ -116,7 +116,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addr)
res, err := shards[i].Exists(existsPrm)
res, err := shards[i].Exists(ctx, existsPrm)
if err != nil {
return err
} else if !res.Exists() {

View file

@ -63,6 +63,7 @@ func TestShardOpen(t *testing.T) {
newShard := func() *Shard {
return New(
WithID(NewIDFromBytes([]byte{})),
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
@ -146,6 +147,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
require.NoError(t, err)
sh = New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
@ -155,7 +157,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
var getPrm GetPrm
getPrm.SetAddress(addr)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
require.NoError(t, sh.Close())
}
@ -176,6 +178,7 @@ func TestRefillMetabase(t *testing.T) {
}
sh := New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta")),
@ -277,7 +280,7 @@ func TestRefillMetabase(t *testing.T) {
checkObj := func(addr oid.Address, expObj *objectSDK.Object) {
headPrm.SetAddress(addr)
res, err := sh.Head(headPrm)
res, err := sh.Head(context.Background(), headPrm)
if expObj == nil {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
@ -302,7 +305,7 @@ func TestRefillMetabase(t *testing.T) {
for _, member := range tombMembers {
headPrm.SetAddress(member)
_, err := sh.Head(headPrm)
_, err := sh.Head(context.Background(), headPrm)
if exists {
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
@ -343,6 +346,7 @@ func TestRefillMetabase(t *testing.T) {
require.NoError(t, err)
sh = New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithMetaBaseOptions(
meta.WithPath(filepath.Join(p, "meta_restored")),

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -51,7 +52,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = sh.Delete(delPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
@ -69,13 +70,13 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err := sh.Put(putPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err)
_, err = sh.Delete(delPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
}

View file

@ -2,6 +2,7 @@ package shard_test
import (
"bytes"
"context"
"io"
"math/rand"
"os"
@ -276,7 +277,7 @@ func checkRestore(t *testing.T, sh *shard.Shard, prm shard.RestorePrm, objects [
for i := range objects {
getPrm.SetAddress(object.AddressOf(objects[i]))
res, err := sh.Get(getPrm)
res, err := sh.Get(context.Background(), getPrm)
require.NoError(t, err)
require.Equal(t, objects[i], res.Object())
}

View file

@ -1,6 +1,8 @@
package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,7 +35,7 @@ func (p ExistsRes) Exists() bool {
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
var exists bool
var err error
@ -45,7 +47,7 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
p.Address = prm.addr
var res common.ExistsRes
res, err = s.blobStor.Exists(p)
res, err = s.blobStor.Exists(ctx, p)
exists = res.Exists
} else {
var existsPrm meta.ExistsPrm

View file

@ -33,6 +33,7 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
rootPath := t.TempDir()
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
@ -115,7 +116,7 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
var getPrm shard.GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj))
require.Eventually(t, func() bool {
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "expired object must be deleted")
}

View file

@ -1,8 +1,10 @@
package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -11,6 +13,8 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -61,7 +65,15 @@ func (r GetRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Get",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Bool("skip_meta", prm.skipMeta),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
@ -70,7 +82,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
getPrm.Address = prm.addr
getPrm.StorageID = id
res, err := stor.Get(getPrm)
res, err := stor.Get(ctx, getPrm)
if err != nil {
return nil, err
}
@ -79,7 +91,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
}
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
return c.Get(prm.addr)
return c.Get(ctx, prm.addr)
}
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()

View file

@ -2,6 +2,7 @@ package shard_test
import (
"bytes"
"context"
"errors"
"testing"
"time"
@ -111,11 +112,11 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
}
func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (shard.GetRes, error) {
res, err := sh.Get(getPrm)
res, err := sh.Get(context.Background(), getPrm)
if hasWriteCache {
require.Eventually(t, func() bool {
if shard.IsErrNotFound(err) {
res, err = sh.Get(getPrm)
res, err = sh.Get(context.Background(), getPrm)
}
return !shard.IsErrNotFound(err)
}, time.Second, time.Millisecond*100)

View file

@ -1,9 +1,14 @@
package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// HeadPrm groups the parameters of Head operation.
@ -43,7 +48,15 @@ func (r HeadRes) Object() *objectSDK.Object {
// Returns an error of type apistatus.ObjectNotFound if object is missing in Shard.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Head",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Bool("raw", prm.raw),
))
defer span.End()
var obj *objectSDK.Object
var err error
if s.GetMode().NoMetabase() {
@ -52,7 +65,7 @@ func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
getPrm.SetIgnoreMeta(true)
var res GetRes
res, err = s.Get(getPrm)
res, err = s.Get(ctx, getPrm)
obj = res.Object()
} else {
var headParams meta.GetPrm

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"errors"
"testing"
"time"
@ -75,18 +76,18 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
headPrm.SetAddress(object.AddressOf(parent))
headPrm.SetRaw(false)
head, err := sh.Head(headPrm)
head, err := sh.Head(context.Background(), headPrm)
require.NoError(t, err)
require.Equal(t, parent.CutPayload(), head.Object())
})
}
func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (shard.HeadRes, error) {
res, err := sh.Head(headPrm)
res, err := sh.Head(context.Background(), headPrm)
if hasWriteCache {
require.Eventually(t, func() bool {
if shard.IsErrNotFound(err) {
res, err = sh.Head(headPrm)
res, err = sh.Head(context.Background(), headPrm)
}
return !shard.IsErrNotFound(err)
}, time.Second, time.Millisecond*100)

View file

@ -51,6 +51,6 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
_, err = sh.Inhume(context.Background(), inhPrm)
require.NoError(t, err)
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
}

View file

@ -27,6 +27,7 @@ func TestShard_Lock(t *testing.T) {
rootPath := t.TempDir()
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{
@ -137,7 +138,7 @@ func TestShard_Lock(t *testing.T) {
var getPrm shard.GetPrm
getPrm.SetAddress(objectcore.AddressOf(obj))
_, err = sh.Get(getPrm)
_, err = sh.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
})
}

View file

@ -1,6 +1,10 @@
package shard
import (
"context"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -8,6 +12,8 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// RngPrm groups the parameters of GetRange operation.
@ -66,7 +72,17 @@ func (r RngRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetRange",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Bool("skip_meta", prm.skipMeta),
attribute.String("offset", strconv.FormatUint(prm.off, 10)),
attribute.String("length", strconv.FormatUint(prm.ln, 10)),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
@ -77,7 +93,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
getRngPrm.Range.SetLength(prm.ln)
getRngPrm.StorageID = id
res, err := stor.GetRange(getRngPrm)
res, err := stor.GetRange(ctx, getRngPrm)
if err != nil {
return nil, err
}
@ -89,7 +105,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
}
wc := func(c writecache.Cache) (*object.Object, error) {
res, err := c.Get(prm.addr)
res, err := c.Get(ctx, prm.addr)
if err != nil {
return nil, err
}

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"math"
"path/filepath"
"testing"
@ -105,7 +106,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
rngPrm.SetAddress(addr)
rngPrm.SetRange(tc.rng.GetOffset(), tc.rng.GetLength())
res, err := sh.GetRange(rngPrm)
res, err := sh.GetRange(context.Background(), rngPrm)
if tc.hasErr {
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
} else {

View file

@ -66,7 +66,7 @@ func TestShardReload(t *testing.T) {
var prm ExistsPrm
prm.SetAddress(objects[i].addr)
res, err := sh.Exists(prm)
res, err := sh.Exists(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, exists, res.Exists(), "object #%d is missing", i)
}

View file

@ -63,6 +63,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
}
opts := []shard.Option{
shard.WithID(shard.NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(bsOpts...),
shard.WithMetaBaseOptions(

View file

@ -1,6 +1,7 @@
package shard_test
import (
"context"
"math/rand"
"testing"
@ -55,7 +56,7 @@ func TestWriteCacheObjectLoss(t *testing.T) {
for i := range objects {
getPrm.SetAddress(object.AddressOf(objects[i]))
_, err := sh.Get(getPrm)
_, err := sh.Get(context.Background(), getPrm)
require.NoError(t, err, i)
}
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"os"
"path/filepath"
"testing"
@ -95,7 +96,7 @@ func TestFlush(t *testing.T) {
prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID()
res, err := bs.Get(prm)
res, err := bs.Get(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
}
@ -119,7 +120,7 @@ func TestFlush(t *testing.T) {
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
@ -149,7 +150,7 @@ func TestFlush(t *testing.T) {
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
@ -266,7 +267,7 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.Open(true))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
_, err := wc.Get(context.Background(), objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
@ -275,7 +276,7 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.Open(false))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(objects[i].addr)
_, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {

View file

@ -1,6 +1,9 @@
package writecache
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -8,14 +11,22 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Get returns object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
saddr := addr.EncodeToString()
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
trace.WithAttributes(
attribute.String("address", saddr),
))
defer span.End()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
@ -23,7 +34,7 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
return obj, obj.Unmarshal(value)
}
res, err := c.fsTree.Get(common.GetPrm{Address: addr})
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
if err != nil {
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
@ -35,8 +46,14 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
// Head returns object header from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Head(addr oid.Address) (*objectSDK.Object, error) {
obj, err := c.Get(addr)
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
))
defer span.End()
obj, err := c.Get(ctx, addr)
if err != nil {
return nil, err
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"errors"
"sync"
@ -177,6 +178,6 @@ func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
prm.SetAddress(addr)
mRes, _ := c.metabase.StorageID(prm)
res, err := c.blobstor.Exists(common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
}

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"io/fs"
"os"
"time"
@ -27,7 +28,7 @@ type metabase interface {
type blob interface {
Put(common.PutPrm) (common.PutRes, error)
NeedsCompression(obj *objectSDK.Object) bool
Exists(res common.ExistsPrm) (common.ExistsRes, error)
Exists(ctx context.Context, res common.ExistsPrm) (common.ExistsRes, error)
}
type options struct {

View file

@ -1,6 +1,7 @@
package writecache
import (
"context"
"os"
"sync"
@ -23,8 +24,8 @@ type Info struct {
// Cache represents write-cache for objects.
type Cache interface {
Get(address oid.Address) (*object.Object, error)
Head(oid.Address) (*object.Object, error)
Get(ctx context.Context, address oid.Address) (*object.Object, error)
Head(context.Context, oid.Address) (*object.Object, error)
// Delete removes object referenced by the given oid.Address from the
// Cache. Returns any error encountered that prevented the object to be
// removed.

View file

@ -19,7 +19,7 @@ import (
"google.golang.org/grpc/status"
)
func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) {
func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
@ -30,7 +30,7 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)
res, err := s.s.Evacuate(prm)
res, err := s.s.Evacuate(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

View file

@ -1,6 +1,8 @@
package notificator
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -8,7 +10,7 @@ import (
type NotificationSource interface {
// Iterate must iterate over all notifications for the
// provided epoch and call handler for all of them.
Iterate(epoch uint64, handler func(topic string, addr oid.Address))
Iterate(ctx context.Context, epoch uint64, handler func(topic string, addr oid.Address))
}
// NotificationWriter notifies all the subscribers

View file

@ -1,6 +1,7 @@
package notificator
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -71,11 +72,11 @@ func New(prm *Prm) *Notificator {
// ProcessEpoch looks for all objects with defined epoch in the storage
// and passes their addresses to the NotificationWriter.
func (n *Notificator) ProcessEpoch(epoch uint64) {
func (n *Notificator) ProcessEpoch(ctx context.Context, epoch uint64) {
logger := n.l.With(zap.Uint64("epoch", epoch))
logger.Debug("notificator: start processing object notifications")
n.ns.Iterate(epoch, func(topic string, addr oid.Address) {
n.ns.Iterate(ctx, epoch, func(topic string, addr oid.Address) {
n.l.Debug("notificator: processing object notification",
zap.String("topic", topic),
zap.Stringer("address", addr),

View file

@ -1,6 +1,7 @@
package v2
import (
"context"
"crypto/ecdsa"
"errors"
"testing"
@ -26,7 +27,7 @@ type testLocalStorage struct {
err error
}
func (s *testLocalStorage) Head(addr oid.Address) (*object.Object, error) {
func (s *testLocalStorage) Head(ctx context.Context, addr oid.Address) (*object.Object, error) {
require.True(s.t, addr.Container().Equals(s.expAddr.Container()))
require.True(s.t, addr.Object().Equals(s.expAddr.Object()))

View file

@ -1,6 +1,7 @@
package v2
import (
"context"
"errors"
"fmt"
@ -27,7 +28,7 @@ type cfg struct {
}
type ObjectStorage interface {
Head(oid.Address) (*object.Object, error)
Head(context.Context, oid.Address) (*object.Object, error)
}
type Request interface {
@ -207,7 +208,7 @@ func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, b
addr.SetContainer(cnr)
addr.SetObject(*idObj)
obj, err := h.storage.Head(addr)
obj, err := h.storage.Head(context.TODO(), addr)
if err == nil {
return headersFromObject(obj, cnr, idObj), true
}

View file

@ -1,6 +1,7 @@
package v2
import (
"context"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
@ -12,10 +13,10 @@ type localStorage struct {
ls *engine.StorageEngine
}
func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) {
func (s *localStorage) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
if s.ls == nil {
return nil, io.ErrUnexpectedEOF
}
return engine.Head(s.ls, addr)
return engine.Head(ctx, s.ls, addr)
}

View file

@ -139,7 +139,7 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro
}{obj: obj, err: err}
}
func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) {
var (
ok bool
obj *objectSDK.Object

View file

@ -4,15 +4,21 @@ import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
func (exec *execCtx) executeLocal(ctx context.Context) {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal")
defer func() {
span.End()
}()
var err error
exec.collectedObject, err = exec.svc.localStorage.get(exec)
exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec)
var errSplitInfo *objectSDK.SplitInfoError
var errRemoved apistatus.ObjectAlreadyRemoved

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -11,6 +12,9 @@ import (
)
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
defer span.End()
exec.log.Debug("processing node...")
client, ok := exec.remoteClient(info)

View file

@ -31,7 +31,7 @@ type cfg struct {
log *logger.Logger
localStorage interface {
get(*execCtx) (*object.Object, error)
get(context.Context, *execCtx) (*object.Object, error)
}
clientCache interface {

View file

@ -200,13 +200,13 @@ func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.Priva
return res.Object(), nil
}
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) {
if exec.headOnly() {
var headPrm engine.HeadPrm
headPrm.WithAddress(exec.address())
headPrm.WithRaw(exec.isRaw())
r, err := e.engine.Head(headPrm)
r, err := e.engine.Head(ctx, headPrm)
if err != nil {
return nil, err
}
@ -217,7 +217,7 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
getRange.WithAddress(exec.address())
getRange.WithPayloadRange(rng)
r, err := e.engine.GetRange(getRange)
r, err := e.engine.GetRange(ctx, getRange)
if err != nil {
return nil, err
}
@ -227,7 +227,7 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
var getPrm engine.GetPrm
getPrm.WithAddress(exec.address())
r, err := e.engine.Get(getPrm)
r, err := e.engine.Get(ctx, getPrm)
if err != nil {
return nil, err
}

View file

@ -9,6 +9,7 @@ import (
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -18,6 +19,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type getRequestForwarder struct {
@ -30,6 +33,11 @@ type getRequestForwarder struct {
}
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "getRequestForwarder.forwardRequestToNode",
trace.WithAttributes(attribute.String("address", addr.String())),
)
defer span.End()
var err error
// once compose and resign forwarding request

View file

@ -9,6 +9,7 @@ import (
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -18,6 +19,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type getRangeRequestForwarder struct {
@ -29,6 +32,11 @@ type getRangeRequestForwarder struct {
}
func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "getRangeRequestForwarder.forwardRequestToNode",
trace.WithAttributes(attribute.String("address", addr.String())),
)
defer span.End()
var err error
// once compose and resign forwarding request

View file

@ -8,6 +8,7 @@ import (
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
@ -19,6 +20,8 @@ import (
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type headRequestForwarder struct {
@ -30,6 +33,11 @@ type headRequestForwarder struct {
}
func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "headRequestForwarder.forwardRequestToNode",
trace.WithAttributes(attribute.String("address", addr.String())),
)
defer span.End()
var err error
// once compose and resign forwarding request

View file

@ -96,7 +96,7 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
resp := new(objectV2.HeadResponse)
resp.SetBody(new(objectV2.HeadResponseBody))
p, err := s.toHeadPrm(ctx, req, resp)
p, err := s.toHeadPrm(req, resp)
if err != nil {
return nil, err
}

View file

@ -215,7 +215,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
return nil
}
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody()
addrV2 := body.GetAddress()

View file

@ -27,7 +27,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
if task.obj == nil {
var err error
task.obj, err = engine.Get(p.localStorage, task.addr)
task.obj, err = engine.Get(ctx, p.localStorage, task.addr)
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),