forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
11 changed files with 134 additions and 19 deletions
|
@ -26,11 +26,11 @@ import (
|
|||
func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.GetRes, err error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
found = false
|
||||
success = false
|
||||
size = 0
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Get(time.Since(startedAt), size, found, prm.StorageID != nil)
|
||||
b.metrics.Get(time.Since(startedAt), size, success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Get",
|
||||
|
@ -54,7 +54,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
|
||||
res, err = b.getObject(ctx, blz, bPrm)
|
||||
if err == nil {
|
||||
found = true
|
||||
success = true
|
||||
size = len(res.RawData)
|
||||
}
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
return res, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
found = true
|
||||
success = true
|
||||
size = len(res.RawData)
|
||||
|
||||
return
|
||||
|
|
|
@ -27,11 +27,11 @@ import (
|
|||
func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (res common.GetRangeRes, err error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
found = false
|
||||
success = false
|
||||
size = 0
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.GetRange(time.Since(startedAt), size, found, prm.StorageID != nil)
|
||||
b.metrics.GetRange(time.Since(startedAt), size, success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.GetRange",
|
||||
|
@ -54,12 +54,13 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
res, err := b.getObjectRange(ctx, blz, prm)
|
||||
if err == nil {
|
||||
size = len(res.Data)
|
||||
found = true
|
||||
success = true
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
objectFound := false
|
||||
|
||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
@ -82,18 +83,19 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
||||
found = err == nil
|
||||
objectFound = err == nil
|
||||
|
||||
// abort iterator if found, otherwise process all Blobovniczas
|
||||
return err == nil, nil
|
||||
})
|
||||
|
||||
if err == nil && !found {
|
||||
if err == nil && !objectFound {
|
||||
// not found in any blobovnicza
|
||||
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
success = true
|
||||
size = len(res.Data)
|
||||
}
|
||||
|
||||
|
|
|
@ -43,10 +43,12 @@ type cfg struct {
|
|||
compression compression.Config
|
||||
log *logger.Logger
|
||||
storage []SubStorage
|
||||
metrics Metrics
|
||||
}
|
||||
|
||||
func initConfig(c *cfg) {
|
||||
c.log = &logger.Logger{Logger: zap.L()}
|
||||
c.metrics = &noopMetrics{}
|
||||
}
|
||||
|
||||
// New creates, initializes and returns new BlobStor instance.
|
||||
|
@ -113,3 +115,9 @@ func (b *BlobStor) SetReportErrorFunc(f func(string, error)) {
|
|||
b.storage[i].Storage.SetReportErrorFunc(f)
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetrics(m Metrics) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ func (b *BlobStor) Open(readOnly bool) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
b.metrics.SetMode(readOnly)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -65,5 +66,8 @@ func (b *BlobStor) Close() error {
|
|||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
if firstErr == nil {
|
||||
b.metrics.Close()
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -13,6 +14,14 @@ import (
|
|||
)
|
||||
|
||||
func (b *BlobStor) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Delete(time.Since(startedAt), success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
|
@ -28,6 +37,7 @@ func (b *BlobStor) Delete(ctx context.Context, prm common.DeletePrm) (common.Del
|
|||
res, err := b.storage[i].Storage.Delete(ctx, prm)
|
||||
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
if err == nil {
|
||||
success = true
|
||||
logOp(b.log, deleteOp, prm.Address, b.storage[i].Storage.Type(), prm.StorageID)
|
||||
}
|
||||
return res, err
|
||||
|
@ -45,6 +55,7 @@ func (b *BlobStor) Delete(ctx context.Context, prm common.DeletePrm) (common.Del
|
|||
|
||||
res, err := st.Delete(ctx, prm)
|
||||
if err == nil {
|
||||
success = true
|
||||
logOp(b.log, deleteOp, prm.Address, st.Type(), prm.StorageID)
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package blobstor
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -17,6 +18,14 @@ import (
|
|||
// Returns any error encountered that did not allow
|
||||
// to completely check object existence.
|
||||
func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
var (
|
||||
exists = false
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Exists(time.Since(startedAt), exists, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
|
@ -29,9 +38,13 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
|
|||
|
||||
if prm.StorageID != nil {
|
||||
if len(prm.StorageID) == 0 {
|
||||
return b.storage[len(b.storage)-1].Storage.Exists(ctx, prm)
|
||||
res, err := b.storage[len(b.storage)-1].Storage.Exists(ctx, prm)
|
||||
exists = err == nil && res.Exists
|
||||
return res, err
|
||||
}
|
||||
return b.storage[0].Storage.Exists(ctx, prm)
|
||||
res, err := b.storage[0].Storage.Exists(ctx, prm)
|
||||
exists = err == nil && res.Exists
|
||||
return res, err
|
||||
}
|
||||
|
||||
// If there was an error during existence check below,
|
||||
|
@ -47,6 +60,7 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
|
|||
for i := range b.storage {
|
||||
res, err := b.storage[i].Storage.Exists(ctx, prm)
|
||||
if err == nil && res.Exists {
|
||||
exists = true
|
||||
return res, nil
|
||||
} else if err != nil {
|
||||
errors = append(errors, err)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -16,7 +17,14 @@ import (
|
|||
// Get reads the object from b.
|
||||
// If the descriptor is present, only one sub-storage is tried,
|
||||
// Otherwise, each sub-storage is tried in order.
|
||||
func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (res common.GetRes, err error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Get(time.Since(startedAt), len(res.RawData), err == nil, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
|
@ -30,7 +38,7 @@ func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, e
|
|||
|
||||
if prm.StorageID == nil {
|
||||
for i := range b.storage {
|
||||
res, err := b.storage[i].Storage.Get(ctx, prm)
|
||||
res, err = b.storage[i].Storage.Get(ctx, prm)
|
||||
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
return res, err
|
||||
}
|
||||
|
@ -39,7 +47,9 @@ func (b *BlobStor) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, e
|
|||
return common.GetRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
if len(prm.StorageID) == 0 {
|
||||
return b.storage[len(b.storage)-1].Storage.Get(ctx, prm)
|
||||
res, err = b.storage[len(b.storage)-1].Storage.Get(ctx, prm)
|
||||
} else {
|
||||
res, err = b.storage[0].Storage.Get(ctx, prm)
|
||||
}
|
||||
return b.storage[0].Storage.Get(ctx, prm)
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/hex"
|
||||
"errors"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -17,7 +18,14 @@ import (
|
|||
// GetRange reads object payload data from b.
|
||||
// If the descriptor is present, only one sub-storage is tried,
|
||||
// Otherwise, each sub-storage is tried in order.
|
||||
func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (res common.GetRangeRes, err error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.GetRange(time.Since(startedAt), len(res.Data), err == nil, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.GetRange",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
|
@ -32,7 +40,7 @@ func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (common
|
|||
|
||||
if prm.StorageID == nil {
|
||||
for i := range b.storage {
|
||||
res, err := b.storage[i].Storage.GetRange(ctx, prm)
|
||||
res, err = b.storage[i].Storage.GetRange(ctx, prm)
|
||||
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
return res, err
|
||||
}
|
||||
|
@ -41,7 +49,9 @@ func (b *BlobStor) GetRange(ctx context.Context, prm common.GetRangePrm) (common
|
|||
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
if len(prm.StorageID) == 0 {
|
||||
return b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm)
|
||||
res, err = b.storage[len(b.storage)-1].Storage.GetRange(ctx, prm)
|
||||
} else {
|
||||
res, err = b.storage[0].Storage.GetRange(ctx, prm)
|
||||
}
|
||||
return b.storage[0].Storage.GetRange(ctx, prm)
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -3,10 +3,14 @@ package blobstor
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -18,6 +22,19 @@ import (
|
|||
//
|
||||
// If handler returns an error, method wraps and returns it immediately.
|
||||
func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Iterate(time.Since(startedAt), success)
|
||||
}()
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
|
@ -27,6 +44,7 @@ func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.I
|
|||
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
|
||||
}
|
||||
}
|
||||
success = true
|
||||
return common.IterateRes{}, nil
|
||||
}
|
||||
|
||||
|
|
26
pkg/local_object_storage/blobstor/metrics.go
Normal file
26
pkg/local_object_storage/blobstor/metrics.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
package blobstor
|
||||
|
||||
import "time"
|
||||
|
||||
type Metrics interface {
|
||||
SetMode(readOnly bool)
|
||||
Close()
|
||||
|
||||
Delete(d time.Duration, success, withStorageID bool)
|
||||
Exists(d time.Duration, success, withStorageID bool)
|
||||
GetRange(d time.Duration, size int, success, withStorageID bool)
|
||||
Get(d time.Duration, size int, success, withStorageID bool)
|
||||
Iterate(d time.Duration, success bool)
|
||||
Put(d time.Duration, size int, success bool)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (m *noopMetrics) SetMode(bool) {}
|
||||
func (m *noopMetrics) Close() {}
|
||||
func (m *noopMetrics) Delete(time.Duration, bool, bool) {}
|
||||
func (m *noopMetrics) Exists(time.Duration, bool, bool) {}
|
||||
func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
|
@ -3,6 +3,7 @@ package blobstor
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -26,6 +27,15 @@ var ErrNoPlaceFound = logicerr.New("couldn't find a place to store an object")
|
|||
// Returns any error encountered that
|
||||
// did not allow to completely save the object.
|
||||
func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
size = 0
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Put(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
|
@ -47,11 +57,13 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
|
|||
}
|
||||
prm.RawData = data
|
||||
}
|
||||
size = len(prm.RawData)
|
||||
|
||||
for i := range b.storage {
|
||||
if b.storage[i].Policy == nil || b.storage[i].Policy(prm.Object, prm.RawData) {
|
||||
res, err := b.storage[i].Storage.Put(ctx, prm)
|
||||
if err == nil {
|
||||
success = true
|
||||
logOp(b.log, putOp, prm.Address, b.storage[i].Storage.Type(), res.StorageID)
|
||||
}
|
||||
return res, err
|
||||
|
|
Loading…
Reference in a new issue