forked from TrueCloudLab/frostfs-node
[#373] blobovnizca: Add missed/fix tracing spans
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
028d4a8058
commit
a8526d45e9
10 changed files with 58 additions and 20 deletions
|
@ -21,7 +21,7 @@ func testPutGet(t *testing.T, blz *Blobovnicza, addr oid.Address, sz uint64, ass
|
|||
var pPut PutPrm
|
||||
pPut.SetAddress(addr)
|
||||
pPut.SetMarshaledObject(data)
|
||||
_, err := blz.Put(pPut)
|
||||
_, err := blz.Put(context.Background(), pPut)
|
||||
if assertErrPut != nil {
|
||||
require.True(t, assertErrPut(err))
|
||||
} else {
|
||||
|
|
|
@ -38,13 +38,14 @@ func (p *DeletePrm) SetAddress(addr oid.Address) {
|
|||
func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.path),
|
||||
attribute.String("address", prm.addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
addrKey := addressKey(prm.addr)
|
||||
|
||||
removed := false
|
||||
found := false
|
||||
|
||||
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
||||
|
@ -56,9 +57,6 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
|||
|
||||
sz := uint64(len(objData))
|
||||
|
||||
// decrease fullness counter
|
||||
b.decSize(sz)
|
||||
|
||||
// remove object from the bucket
|
||||
err := buck.Delete(addrKey)
|
||||
|
||||
|
@ -67,16 +65,18 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
|||
zap.String("binary size", stringifyByteSize(sz)),
|
||||
zap.String("range", stringifyBounds(lower, upper)),
|
||||
)
|
||||
// decrease fullness counter
|
||||
b.decSize(sz)
|
||||
}
|
||||
|
||||
removed = true
|
||||
found = true
|
||||
|
||||
// stop iteration
|
||||
return true, err
|
||||
})
|
||||
})
|
||||
|
||||
if err == nil && !removed {
|
||||
if err == nil && !found {
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
return DeleteRes{}, errNotFound
|
||||
|
|
|
@ -1,17 +1,30 @@
|
|||
package blobovnicza
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Exists check if object with the specified address is stored in b.
|
||||
func (b *Blobovnicza) Exists(addr oid.Address) (bool, error) {
|
||||
func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error) {
|
||||
var (
|
||||
exists bool
|
||||
addrKey = addressKey(addr)
|
||||
exists = false
|
||||
)
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.path),
|
||||
attribute.String("address", addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
addrKey := addressKey(addr)
|
||||
|
||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
|
||||
exists = buck.Get(addrKey) != nil
|
||||
|
|
|
@ -46,6 +46,7 @@ var errInterruptForEach = errors.New("interrupt for-each")
|
|||
func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.path),
|
||||
attribute.String("address", prm.addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
|
|
@ -41,7 +41,7 @@ func TestBlobovnicza_Get(t *testing.T) {
|
|||
addr := oidtest.Address()
|
||||
obj := make([]byte, firstBucketBound+1)
|
||||
|
||||
exists, err := blz.Exists(addr)
|
||||
exists, err := blz.Exists(context.Background(), addr)
|
||||
require.NoError(t, err)
|
||||
require.False(t, exists)
|
||||
|
||||
|
@ -50,7 +50,7 @@ func TestBlobovnicza_Get(t *testing.T) {
|
|||
prmPut.SetMarshaledObject(obj)
|
||||
|
||||
// place object to [32K:64K] bucket
|
||||
_, err = blz.Put(prmPut)
|
||||
_, err = blz.Put(context.Background(), prmPut)
|
||||
require.NoError(t, err)
|
||||
|
||||
var prmGet GetPrm
|
||||
|
@ -61,7 +61,7 @@ func TestBlobovnicza_Get(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, obj, res.Object())
|
||||
|
||||
exists, err := blz.Exists(addr)
|
||||
exists, err := blz.Exists(context.Background(), addr)
|
||||
require.NoError(t, err)
|
||||
require.True(t, exists)
|
||||
}
|
||||
|
|
|
@ -4,8 +4,11 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *Blobovnicza) iterateBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
|
||||
|
@ -119,6 +122,15 @@ type IterateRes struct {
|
|||
//
|
||||
// Handler should not retain object data. Handler must not be nil.
|
||||
func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.path),
|
||||
attribute.Bool("decode_addresses", prm.decodeAddresses),
|
||||
attribute.Bool("without_data", prm.withoutData),
|
||||
attribute.Bool("ignore_errors", prm.ignoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var elem IterationElement
|
||||
|
||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestBlobovniczaIterate(t *testing.T) {
|
|||
|
||||
data := [][]byte{{0, 1, 2, 3}, {5, 6, 7, 8}}
|
||||
addr := oidtest.Address()
|
||||
_, err := b.Put(PutPrm{addr: addr, objData: data[0]})
|
||||
_, err := b.Put(context.Background(), PutPrm{addr: addr, objData: data[0]})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
package blobovnicza
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// PutPrm groups the parameters of Put operation.
|
||||
|
@ -47,7 +51,15 @@ func (p *PutPrm) SetMarshaledObject(data []byte) {
|
|||
// Returns ErrFull if blobovnicza is filled.
|
||||
//
|
||||
// Should not be called in read-only configuration.
|
||||
func (b *Blobovnicza) Put(prm PutPrm) (PutRes, error) {
|
||||
func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.path),
|
||||
attribute.String("address", prm.addr.EncodeToString()),
|
||||
attribute.Int("size", len(prm.objData)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
sz := uint64(len(prm.objData))
|
||||
bucketName := bucketForSize(sz)
|
||||
key := addressKey(prm.addr)
|
||||
|
|
|
@ -30,7 +30,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
|||
return common.ExistsRes{}, err
|
||||
}
|
||||
|
||||
exists, err := blz.Exists(prm.Address)
|
||||
exists, err := blz.Exists(ctx, prm.Address)
|
||||
return common.ExistsRes{Exists: exists}, err
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
|
|||
PutPrm: putPrm,
|
||||
}
|
||||
|
||||
if err := b.iterateDeepest(ctx, prm.Address, it.iterate); err != nil {
|
||||
if err := b.iterateDeepest(ctx, prm.Address, func(s string) (bool, error) { return it.iterate(ctx, s) }); err != nil {
|
||||
return common.PutRes{}, err
|
||||
} else if it.ID == nil {
|
||||
if it.AllFull {
|
||||
|
@ -64,7 +64,7 @@ type putIterator struct {
|
|||
PutPrm blobovnicza.PutPrm
|
||||
}
|
||||
|
||||
func (i *putIterator) iterate(path string) (bool, error) {
|
||||
func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) {
|
||||
active, err := i.B.getActivated(path)
|
||||
if err != nil {
|
||||
if !isLogical(err) {
|
||||
|
@ -77,7 +77,7 @@ func (i *putIterator) iterate(path string) (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
if _, err := active.blz.Put(i.PutPrm); err != nil {
|
||||
if _, err := active.blz.Put(ctx, i.PutPrm); err != nil {
|
||||
// Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error
|
||||
// or update active blobovnicza in other thread. In the latter case the database will be closed
|
||||
// and `updateActive` takes care of not updating the active blobovnicza twice.
|
||||
|
@ -99,7 +99,7 @@ func (i *putIterator) iterate(path string) (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
return i.iterate(path)
|
||||
return i.iterate(ctx, path)
|
||||
}
|
||||
|
||||
i.AllFull = false
|
||||
|
|
Loading…
Reference in a new issue