forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
4 changed files with 126 additions and 12 deletions
|
@ -7,6 +7,7 @@ import (
|
||||||
// Open implements common.Storage.
|
// Open implements common.Storage.
|
||||||
func (t *FSTree) Open(ro bool) error {
|
func (t *FSTree) Open(ro bool) error {
|
||||||
t.readOnly = ro
|
t.readOnly = ro
|
||||||
|
t.metrics.SetMode(ro)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,4 +17,7 @@ func (t *FSTree) Init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close implements common.Storage.
|
// Close implements common.Storage.
|
||||||
func (*FSTree) Close() error { return nil }
|
func (t *FSTree) Close() error {
|
||||||
|
t.metrics.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
|
@ -35,6 +36,7 @@ type FSTree struct {
|
||||||
|
|
||||||
noSync bool
|
noSync bool
|
||||||
readOnly bool
|
readOnly bool
|
||||||
|
metrics Metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// Info groups the information about file storage.
|
// Info groups the information about file storage.
|
||||||
|
@ -64,6 +66,7 @@ func New(opts ...Option) *FSTree {
|
||||||
Config: nil,
|
Config: nil,
|
||||||
Depth: 4,
|
Depth: 4,
|
||||||
DirNameLen: DirNameLen,
|
DirNameLen: DirNameLen,
|
||||||
|
metrics: &noopMetrics{},
|
||||||
}
|
}
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](f)
|
opts[i](f)
|
||||||
|
@ -101,7 +104,24 @@ func addressFromString(s string) (oid.Address, error) {
|
||||||
|
|
||||||
// Iterate iterates over all stored objects.
|
// Iterate iterates over all stored objects.
|
||||||
func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
return common.IterateRes{}, t.iterate(ctx, 0, []string{t.RootPath}, prm)
|
var (
|
||||||
|
err error
|
||||||
|
startedAt = time.Now()
|
||||||
|
)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
t.metrics.Iterate(time.Since(startedAt), err == nil)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Iterate",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", t.RootPath),
|
||||||
|
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
err = t.iterate(ctx, 0, []string{t.RootPath}, prm)
|
||||||
|
return common.IterateRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
|
func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error {
|
||||||
|
@ -202,19 +222,29 @@ func (t *FSTree) treePath(addr oid.Address) string {
|
||||||
|
|
||||||
// Delete removes the object with the specified address from the storage.
|
// Delete removes the object with the specified address from the storage.
|
||||||
func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
startedAt = time.Now()
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.Delete(time.Since(startedAt), err == nil)
|
||||||
|
}()
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Delete",
|
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", t.RootPath),
|
||||||
attribute.String("address", prm.Address.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if t.readOnly {
|
if t.readOnly {
|
||||||
return common.DeleteRes{}, common.ErrReadOnly
|
err = common.ErrReadOnly
|
||||||
|
return common.DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p := t.treePath(prm.Address)
|
p := t.treePath(prm.Address)
|
||||||
|
|
||||||
err := os.Remove(p)
|
err = os.Remove(p)
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
err = logicerr.Wrap(apistatus.ObjectNotFound{})
|
err = logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||||
}
|
}
|
||||||
|
@ -224,8 +254,17 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
|
||||||
// Exists returns the path to the file with object contents if it exists in the storage
|
// Exists returns the path to the file with object contents if it exists in the storage
|
||||||
// and an error otherwise.
|
// and an error otherwise.
|
||||||
func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||||
|
var (
|
||||||
|
success = false
|
||||||
|
startedAt = time.Now()
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.Exists(time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Exists",
|
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Exists",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", t.RootPath),
|
||||||
attribute.String("address", prm.Address.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
@ -237,27 +276,40 @@ func (t *FSTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exist
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
|
success = err == nil
|
||||||
return common.ExistsRes{Exists: found}, err
|
return common.ExistsRes{Exists: found}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put puts an object in the storage.
|
// Put puts an object in the storage.
|
||||||
func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||||
|
var (
|
||||||
|
size int
|
||||||
|
startedAt = time.Now()
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.Put(time.Since(startedAt), size, err == nil)
|
||||||
|
}()
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Put",
|
_, span := tracing.StartSpanFromContext(ctx, "FSTree.Put",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", t.RootPath),
|
||||||
attribute.String("address", prm.Address.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
attribute.Bool("dont_compress", prm.DontCompress),
|
attribute.Bool("dont_compress", prm.DontCompress),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if t.readOnly {
|
if t.readOnly {
|
||||||
return common.PutRes{}, common.ErrReadOnly
|
err = common.ErrReadOnly
|
||||||
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p := t.treePath(prm.Address)
|
p := t.treePath(prm.Address)
|
||||||
|
|
||||||
if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
|
if err = util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
|
||||||
if errors.Is(err, syscall.ENOSPC) {
|
if errors.Is(err, syscall.ENOSPC) {
|
||||||
return common.PutRes{}, common.ErrNoSpace
|
err = common.ErrNoSpace
|
||||||
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
return common.PutRes{}, err
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
|
@ -287,17 +339,19 @@ func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, err
|
||||||
// to be so hecking simple.
|
// to be so hecking simple.
|
||||||
// In a very rare situation we can have multiple partially written copies on disk,
|
// In a very rare situation we can have multiple partially written copies on disk,
|
||||||
// this will be fixed in another issue (we should remove garbage on start).
|
// this will be fixed in another issue (we should remove garbage on start).
|
||||||
|
size = len(prm.RawData)
|
||||||
const retryCount = 5
|
const retryCount = 5
|
||||||
for i := 0; i < retryCount; i++ {
|
for i := 0; i < retryCount; i++ {
|
||||||
tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10)
|
tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10)
|
||||||
err := t.writeAndRename(tmpPath, p, prm.RawData)
|
err = t.writeAndRename(tmpPath, p, prm.RawData)
|
||||||
if err != syscall.EEXIST || i == retryCount-1 {
|
if err != syscall.EEXIST || i == retryCount-1 {
|
||||||
return common.PutRes{StorageID: []byte{}}, err
|
return common.PutRes{StorageID: []byte{}}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = fmt.Errorf("couldn't read file after %d retries", retryCount)
|
||||||
// unreachable, but precaution never hurts, especially 1 day before release.
|
// unreachable, but precaution never hurts, especially 1 day before release.
|
||||||
return common.PutRes{StorageID: []byte{}}, fmt.Errorf("couldn't read file after %d retries", retryCount)
|
return common.PutRes{StorageID: []byte{}}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
||||||
|
@ -365,8 +419,18 @@ func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error
|
||||||
|
|
||||||
// Get returns an object from the storage by address.
|
// Get returns an object from the storage by address.
|
||||||
func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
size = 0
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.Get(time.Since(startedAt), size, success)
|
||||||
|
}()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.Get",
|
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.Get",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", t.RootPath),
|
||||||
attribute.Bool("raw", prm.Raw),
|
attribute.Bool("raw", prm.Raw),
|
||||||
attribute.String("address", prm.Address.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
))
|
))
|
||||||
|
@ -394,19 +458,30 @@ func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRes{}, err
|
return common.GetRes{}, err
|
||||||
}
|
}
|
||||||
|
size = len(data)
|
||||||
|
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
return common.GetRes{}, err
|
return common.GetRes{}, err
|
||||||
}
|
}
|
||||||
|
success = true
|
||||||
return common.GetRes{Object: obj, RawData: data}, err
|
return common.GetRes{Object: obj, RawData: data}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRange implements common.Storage.
|
// GetRange implements common.Storage.
|
||||||
func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
size = 0
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.GetRange(time.Since(startedAt), size, success)
|
||||||
|
}()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.GetRange",
|
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.GetRange",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", t.RootPath),
|
||||||
attribute.String("address", prm.Address.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
|
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
|
||||||
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
|
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
|
||||||
|
@ -426,8 +501,11 @@ func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.G
|
||||||
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{})
|
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
success = true
|
||||||
|
data := payload[from:to]
|
||||||
|
size = len(data)
|
||||||
return common.GetRangeRes{
|
return common.GetRangeRes{
|
||||||
Data: payload[from:to],
|
Data: data,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
26
pkg/local_object_storage/blobstor/fstree/metrics.go
Normal file
26
pkg/local_object_storage/blobstor/fstree/metrics.go
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
package fstree
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type Metrics interface {
|
||||||
|
SetMode(readOnly bool)
|
||||||
|
Close()
|
||||||
|
|
||||||
|
Iterate(d time.Duration, success bool)
|
||||||
|
Delete(d time.Duration, success bool)
|
||||||
|
Exists(d time.Duration, success bool)
|
||||||
|
Put(d time.Duration, size int, success bool)
|
||||||
|
Get(d time.Duration, size int, success bool)
|
||||||
|
GetRange(d time.Duration, size int, success bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopMetrics struct{}
|
||||||
|
|
||||||
|
func (m *noopMetrics) SetMode(bool) {}
|
||||||
|
func (m *noopMetrics) Close() {}
|
||||||
|
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||||
|
func (m *noopMetrics) Delete(time.Duration, bool) {}
|
||||||
|
func (m *noopMetrics) Exists(time.Duration, bool) {}
|
||||||
|
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||||
|
func (m *noopMetrics) Get(time.Duration, int, bool) {}
|
||||||
|
func (m *noopMetrics) GetRange(time.Duration, int, bool) {}
|
|
@ -35,3 +35,9 @@ func WithNoSync(noSync bool) Option {
|
||||||
f.noSync = noSync
|
f.noSync = noSync
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithMetrics(m Metrics) Option {
|
||||||
|
return func(f *FSTree) {
|
||||||
|
f.metrics = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue