forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
11 changed files with 157 additions and 15 deletions
|
@ -54,7 +54,7 @@ func defaultCfg(c *cfg) {
|
|||
fullSizeLimit: 1 << 30, // 1GB
|
||||
objSizeLimit: 1 << 20, // 1MB
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
metrics: &noopMetrics{},
|
||||
metrics: &NoopMetrics{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,9 +8,9 @@ type Metrics interface {
|
|||
DecSize(size uint64)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
type NoopMetrics struct{}
|
||||
|
||||
func (m *noopMetrics) IncOpenCount() {}
|
||||
func (m *noopMetrics) DecOpenCount() {}
|
||||
func (m *noopMetrics) IncSize(uint64) {}
|
||||
func (m *noopMetrics) DecSize(uint64) {}
|
||||
func (m *NoopMetrics) IncOpenCount() {}
|
||||
func (m *NoopMetrics) DecOpenCount() {}
|
||||
func (m *NoopMetrics) IncSize(uint64) {}
|
||||
func (m *NoopMetrics) DecSize(uint64) {}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
// Open opens blobovnicza tree.
|
||||
func (b *Blobovniczas) Open(readOnly bool) error {
|
||||
b.readOnly = readOnly
|
||||
b.metrics.SetMode(readOnly)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -70,6 +71,7 @@ func (b *Blobovniczas) Close() error {
|
|||
}
|
||||
|
||||
b.active = make(map[string]blobovniczaWithIndex)
|
||||
b.metrics.Close()
|
||||
|
||||
b.lruMtx.Unlock()
|
||||
|
||||
|
@ -123,9 +125,12 @@ func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicz
|
|||
b.openMtx.Lock()
|
||||
defer b.openMtx.Unlock()
|
||||
|
||||
path := filepath.Join(b.rootPath, p)
|
||||
|
||||
blz := blobovnicza.New(append(b.blzOpts,
|
||||
blobovnicza.WithReadOnly(b.readOnly),
|
||||
blobovnicza.WithPath(filepath.Join(b.rootPath, p)),
|
||||
blobovnicza.WithPath(path),
|
||||
blobovnicza.WithMetrics(b.metrics.BlobovnicaMetrics(path)),
|
||||
)...)
|
||||
|
||||
if err := blz.Open(); err != nil {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
|
@ -21,8 +22,17 @@ import (
|
|||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||
// Otherwise, all Blobovniczas are processed descending weight.
|
||||
func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res common.DeleteRes, err error) {
|
||||
var (
|
||||
success = false
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Delete(time.Since(startedAt), success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
))
|
||||
|
@ -42,7 +52,10 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
return res, err
|
||||
}
|
||||
|
||||
return b.deleteObject(ctx, blz, bPrm)
|
||||
if res, err = b.deleteObject(ctx, blz, bPrm); err == nil {
|
||||
success = true
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
|
@ -78,6 +91,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
// not found in any blobovnicza
|
||||
return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
success = err == nil
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
|
@ -16,8 +17,18 @@ import (
|
|||
|
||||
// Exists implements common.Storage.
|
||||
func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
found = false
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Exists(time.Since(startedAt), success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
))
|
||||
|
@ -39,7 +50,6 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
|||
var gPrm blobovnicza.GetPrm
|
||||
gPrm.SetAddress(prm.Address)
|
||||
|
||||
var found bool
|
||||
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
||||
|
@ -59,5 +69,6 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
|||
return found, nil
|
||||
})
|
||||
|
||||
success = err == nil
|
||||
return common.ExistsRes{Exists: found}, err
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/hex"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
|
@ -23,8 +24,18 @@ import (
|
|||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||
// Otherwise, all Blobovniczas are processed descending weight.
|
||||
func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.GetRes, err error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
found = false
|
||||
size = 0
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Get(time.Since(startedAt), size, found, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
attribute.Bool("raw", prm.Raw),
|
||||
|
@ -41,7 +52,11 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
return res, err
|
||||
}
|
||||
|
||||
return b.getObject(ctx, blz, bPrm)
|
||||
res, err = b.getObject(ctx, blz, bPrm)
|
||||
if err == nil {
|
||||
found = true
|
||||
size = len(res.RawData)
|
||||
}
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
|
@ -72,6 +87,9 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
return res, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
found = true
|
||||
size = len(res.RawData)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
|
@ -24,8 +25,18 @@ import (
|
|||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||
// Otherwise, all Blobovniczas are processed descending weight.
|
||||
func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (res common.GetRangeRes, err error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
found = false
|
||||
size = 0
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.GetRange(time.Since(startedAt), size, found, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.GetRange",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
|
||||
|
@ -40,11 +51,15 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
return common.GetRangeRes{}, err
|
||||
}
|
||||
|
||||
return b.getObjectRange(ctx, blz, prm)
|
||||
res, err := b.getObjectRange(ctx, blz, prm)
|
||||
if err == nil {
|
||||
size = len(res.Data)
|
||||
found = true
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
objectFound := false
|
||||
|
||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
@ -67,17 +82,21 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
||||
objectFound = err == nil
|
||||
found = err == nil
|
||||
|
||||
// abort iterator if found, otherwise process all Blobovniczas
|
||||
return err == nil, nil
|
||||
})
|
||||
|
||||
if err == nil && !objectFound {
|
||||
if err == nil && !found {
|
||||
// not found in any blobovnicza
|
||||
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
size = len(res.Data)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -4,16 +4,35 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Iterate iterates over all objects in b.
|
||||
func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
return common.IterateRes{}, b.iterateBlobovniczas(ctx, prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
err error
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Iterate(time.Since(startedAt), err == nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.rootPath),
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
err = b.iterateBlobovniczas(ctx, prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||
var subPrm blobovnicza.IteratePrm
|
||||
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {
|
||||
data, err := b.compression.Decompress(elem.ObjectData())
|
||||
|
@ -43,6 +62,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
|||
_, err := blz.Iterate(ctx, subPrm)
|
||||
return err
|
||||
})
|
||||
return common.IterateRes{}, err
|
||||
}
|
||||
|
||||
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
||||
|
|
35
pkg/local_object_storage/blobstor/blobovniczatree/metrics.go
Normal file
35
pkg/local_object_storage/blobstor/blobovniczatree/metrics.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
)
|
||||
|
||||
type Metrics interface {
|
||||
BlobovnicaMetrics(path string) blobovnicza.Metrics
|
||||
|
||||
SetMode(readOnly bool)
|
||||
Close()
|
||||
|
||||
Delete(d time.Duration, success, withStorageID bool)
|
||||
Exists(d time.Duration, success, withStorageID bool)
|
||||
GetRange(d time.Duration, size int, success, withStorageID bool)
|
||||
Get(d time.Duration, size int, success, withStorageID bool)
|
||||
Iterate(d time.Duration, success bool)
|
||||
Put(d time.Duration, size int, success bool)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (m *noopMetrics) BlobovnicaMetrics(string) blobovnicza.Metrics {
|
||||
return &blobovnicza.NoopMetrics{}
|
||||
}
|
||||
func (m *noopMetrics) SetMode(bool) {}
|
||||
func (m *noopMetrics) Close() {}
|
||||
func (m *noopMetrics) Delete(time.Duration, bool, bool) {}
|
||||
func (m *noopMetrics) Exists(time.Duration, bool, bool) {}
|
||||
func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
|
@ -21,6 +21,7 @@ type cfg struct {
|
|||
blzOpts []blobovnicza.Option
|
||||
// reportError is the function called when encountering disk errors.
|
||||
reportError func(string, error)
|
||||
metrics Metrics
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
@ -40,6 +41,7 @@ func initConfig(c *cfg) {
|
|||
blzShallowDepth: defaultBlzShallowDepth,
|
||||
blzShallowWidth: defaultBlzShallowWidth,
|
||||
reportError: func(string, error) {},
|
||||
metrics: &noopMetrics{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -91,3 +93,9 @@ func WithObjectSizeLimit(sz uint64) Option {
|
|||
c.blzOpts = append(c.blzOpts, blobovnicza.WithObjectSizeLimit(sz))
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetrics(m Metrics) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
|
@ -19,6 +20,15 @@ import (
|
|||
//
|
||||
// returns error if could not save object in any blobovnicza.
|
||||
func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
var (
|
||||
success bool
|
||||
size int
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.Put(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
|
@ -33,6 +43,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
|
|||
if !prm.DontCompress {
|
||||
prm.RawData = b.compression.Compress(prm.RawData)
|
||||
}
|
||||
size = len(prm.RawData)
|
||||
|
||||
var putPrm blobovnicza.PutPrm
|
||||
putPrm.SetAddress(prm.Address)
|
||||
|
@ -54,6 +65,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
|
|||
return common.PutRes{}, errPutFailed
|
||||
}
|
||||
|
||||
success = true
|
||||
return common.PutRes{StorageID: it.ID.Bytes()}, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue