[#1523] blobstor: Unify fstree and blobovnicza interfaces

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-07-08 10:09:48 +03:00 committed by fyrchik
parent 5aa3defc67
commit 266458fe5c
21 changed files with 157 additions and 183 deletions

View file

@ -87,6 +87,8 @@ type blobovniczaWithIndex struct {
blz *blobovnicza.Blobovnicza
}
var _ common.Storage = (*Blobovniczas)(nil)
var errPutFailed = errors.New("could not save the object in any blobovnicza")
// NewBlobovniczaTree returns new instance of blobovnizas tree.
@ -143,6 +145,10 @@ func indexSlice(number uint64) []uint64 {
//
// returns error if could not save object in any blobovnicza.
func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
if !prm.DontCompress {
prm.RawData = b.CConfig.Compress(prm.RawData)
}
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(prm.Address)
putPrm.SetMarshaledObject(prm.RawData)

View file

@ -45,7 +45,7 @@ func (b *Blobovniczas) Init() error {
})
}
// closes blobovnicza tree.
// Close implements common.Storage.
func (b *Blobovniczas) Close() error {
b.activeMtx.Lock()

View file

@ -8,6 +8,7 @@ import (
"go.uber.org/zap"
)
// Exists implements common.Storage.
func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
activeCache := make(map[string]struct{})

View file

@ -55,6 +55,7 @@ func initConfig(c *cfg) {
fsTree: fstree.FSTree{
Depth: defaultShallowDepth,
DirNameLen: hex.EncodedLen(fstree.DirNameLen),
CConfig: &c.CConfig,
Info: Info{
Permissions: defaultPerm,
RootPath: "./",

View file

@ -8,8 +8,10 @@ import (
type GetPrm struct {
Address oid.Address
StorageID []byte
Raw bool
}
type GetRes struct {
Object *objectSDK.Object
Object *objectSDK.Object
RawData []byte
}

View file

@ -7,9 +7,10 @@ import (
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
Address oid.Address
Object *objectSDK.Object
RawData []byte
Address oid.Address
Object *objectSDK.Object
RawData []byte
DontCompress bool
}
// PutRes groups the resulting values of Put operation.

View file

@ -0,0 +1,12 @@
package common
// Storage represents key-value object storage.
// It is used as a building block for a blobstor of a shard.
type Storage interface {
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)
Exists(ExistsPrm) (ExistsRes, error)
Put(PutPrm) (PutRes, error)
Delete(DeletePrm) (DeleteRes, error)
Iterate(IteratePrm) (IterateRes, error)
}

View file

@ -83,10 +83,10 @@ func (c *CConfig) Decompress(data []byte) ([]byte, error) {
// Compress compresses data if compression is enabled
// and returns data untouched otherwise.
func (c *CConfig) Compress(data []byte) []byte {
if c.Enabled {
return c.encoder.EncodeAll(data, make([]byte, 0, len(data)))
if c == nil || !c.Enabled {
return data
}
return data
return c.encoder.EncodeAll(data, make([]byte, 0, len(data)))
}
// Close closes encoder and decoder, returns any error occured.

View file

@ -4,7 +4,6 @@ import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
)
@ -31,18 +30,12 @@ func (b *BlobStor) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
//
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
func (b *BlobStor) deleteBig(prm common.DeletePrm) (common.DeleteRes, error) {
err := b.fsTree.Delete(prm.Address)
if errors.Is(err, fstree.ErrFileNotFound) {
var errNotFound apistatus.ObjectNotFound
err = errNotFound
}
res, err := b.fsTree.Delete(prm)
if err == nil {
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree DELETE"))
}
return common.DeleteRes{}, err
return res, err
}
// deleteSmall removes an object from blobovnicza of BLOB storage.

View file

@ -1,10 +1,7 @@
package blobstor
import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"go.uber.org/zap"
)
@ -45,12 +42,7 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
// checks if object is presented in shallow dir.
func (b *BlobStor) existsBig(prm common.ExistsPrm) (common.ExistsRes, error) {
_, err := b.fsTree.Exists(prm.Address)
if errors.Is(err, fstree.ErrFileNotFound) {
return common.ExistsRes{}, nil
}
return common.ExistsRes{Exists: err == nil}, err
return b.fsTree.Exists(prm)
}
// existsSmall checks if object is presented in blobovnicza.

View file

@ -10,8 +10,11 @@ import (
"strings"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
"github.com/nspcc-dev/neofs-node/pkg/util"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)
@ -19,6 +22,7 @@ import (
type FSTree struct {
Info
*compression.CConfig
Depth int
DirNameLen int
}
@ -39,8 +43,7 @@ const (
MaxDepth = (sha256.Size - 1) / DirNameLen
)
// ErrFileNotFound is returned when file is missing.
var ErrFileNotFound = errors.New("file not found")
var _ common.Storage = (*FSTree)(nil)
func stringifyAddress(addr oid.Address) string {
return addr.Object().EncodeToString() + "." + addr.Container().EncodeToString()
@ -116,6 +119,9 @@ func (t *FSTree) iterate(depth int, curPath []string, prm common.IteratePrm) err
} else {
var data []byte
data, err = os.ReadFile(filepath.Join(curPath...))
if err == nil {
data, err = t.Decompress(data)
}
if err != nil {
if prm.IgnoreErrors {
if prm.ErrorHandler != nil {
@ -158,25 +164,34 @@ func (t *FSTree) treePath(addr oid.Address) string {
}
// Delete removes the object with the specified address from the storage.
func (t *FSTree) Delete(addr oid.Address) error {
p, err := t.Exists(addr)
func (t *FSTree) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
p, err := t.getPath(prm.Address)
if err != nil {
return err
if os.IsNotExist(err) {
var errNotFound apistatus.ObjectNotFound
err = errNotFound
}
return common.DeleteRes{}, err
}
return os.Remove(p)
return common.DeleteRes{}, os.Remove(p)
}
// Exists returns the path to the file with object contents if it exists in the storage
// and an error otherwise.
func (t *FSTree) Exists(addr oid.Address) (string, error) {
func (t *FSTree) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
_, err := t.getPath(prm.Address)
found := err == nil
if os.IsNotExist(err) {
err = nil
}
return common.ExistsRes{Exists: found}, err
}
func (t *FSTree) getPath(addr oid.Address) (string, error) {
p := t.treePath(addr)
_, err := os.Stat(p)
if os.IsNotExist(err) {
err = ErrFileNotFound
}
return p, err
}
@ -187,7 +202,9 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) {
if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
return common.PutRes{}, err
}
if !prm.DontCompress {
prm.RawData = t.Compress(prm.RawData)
}
return common.PutRes{}, os.WriteFile(p, prm.RawData, t.Permissions)
}
@ -209,14 +226,50 @@ func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error
}
// Get returns an object from the storage by address.
func (t *FSTree) Get(prm common.GetPrm) ([]byte, error) {
func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
p := t.treePath(prm.Address)
if _, err := os.Stat(p); os.IsNotExist(err) {
return nil, ErrFileNotFound
var errNotFound apistatus.ObjectNotFound
return common.GetRes{}, errNotFound
}
return os.ReadFile(p)
data, err := os.ReadFile(p)
if err != nil {
return common.GetRes{}, err
}
data, err = t.Decompress(data)
if err != nil {
return common.GetRes{}, err
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, err
}
return common.GetRes{Object: obj, RawData: data}, err
}
// GetRange implements common.Storage.
func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
res, err := t.Get(common.GetPrm{Address: prm.Address})
if err != nil {
return common.GetRangeRes{}, err
}
payload := res.Object.Payload()
from := prm.Range.GetOffset()
to := from + prm.Range.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, apistatus.ObjectOutOfRange{}
}
return common.GetRangeRes{
Data: payload[from:to],
}, nil
}
// NumberOfObjects walks the file tree rooted at FSTree's root

View file

@ -1,7 +1,6 @@
package fstree
import (
"crypto/rand"
"errors"
"os"
"path/filepath"
@ -11,6 +10,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
"github.com/stretchr/testify/require"
)
@ -45,10 +45,10 @@ func TestFSTree(t *testing.T) {
a := oidtest.Address()
addrs = append(addrs, a)
data := make([]byte, 10)
_, _ = rand.Read(data[:])
data, err := objecttest.Object().Marshal()
require.NoError(t, err)
_, err := fs.Put(common.PutPrm{Address: a, RawData: data})
_, err = fs.Put(common.PutPrm{Address: a, RawData: data})
require.NoError(t, err)
store[a.EncodeToString()] = data
}
@ -57,7 +57,7 @@ func TestFSTree(t *testing.T) {
for _, a := range addrs {
actual, err := fs.Get(common.GetPrm{Address: a})
require.NoError(t, err)
require.Equal(t, store[a.EncodeToString()], actual)
require.Equal(t, store[a.EncodeToString()], actual.RawData)
}
_, err := fs.Get(common.GetPrm{Address: oidtest.Address()})
@ -66,12 +66,14 @@ func TestFSTree(t *testing.T) {
t.Run("exists", func(t *testing.T) {
for _, a := range addrs {
_, err := fs.Exists(a)
res, err := fs.Exists(common.ExistsPrm{Address: a})
require.NoError(t, err)
require.True(t, res.Exists)
}
_, err := fs.Exists(oidtest.Address())
require.Error(t, err)
res, err := fs.Exists(common.ExistsPrm{Address: oidtest.Address()})
require.NoError(t, err)
require.False(t, res.Exists)
})
t.Run("iterate", func(t *testing.T) {
@ -154,14 +156,18 @@ func TestFSTree(t *testing.T) {
})
t.Run("delete", func(t *testing.T) {
require.NoError(t, fs.Delete(addrs[0]))
_, err := fs.Exists(addrs[0])
require.Error(t, err)
_, err = fs.Exists(addrs[1])
_, err := fs.Delete(common.DeletePrm{Address: addrs[0]})
require.NoError(t, err)
require.Error(t, fs.Delete(oidtest.Address()))
res, err := fs.Exists(common.ExistsPrm{Address: addrs[0]})
require.NoError(t, err)
require.False(t, res.Exists)
res, err = fs.Exists(common.ExistsPrm{Address: addrs[1]})
require.NoError(t, err)
require.True(t, res.Exists)
_, err = fs.Delete(common.DeletePrm{Address: oidtest.Address()})
require.Error(t, err)
})
}

View file

@ -2,12 +2,9 @@ package blobstor
import (
"errors"
"fmt"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
)
// Get reads the object from b.
@ -37,29 +34,7 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
// presented in shallow dir.
func (b *BlobStor) getBig(prm common.GetPrm) (common.GetRes, error) {
// get compressed object data
data, err := b.fsTree.Get(prm)
if err != nil {
if errors.Is(err, fstree.ErrFileNotFound) {
var errNotFound apistatus.ObjectNotFound
return common.GetRes{}, errNotFound
}
return common.GetRes{}, fmt.Errorf("could not read object from fs tree: %w", err)
}
data, err = b.Decompress(data)
if err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// unmarshal the object
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
return common.GetRes{Object: obj}, nil
return b.fsTree.Get(prm)
}
func (b *BlobStor) getSmall(prm common.GetPrm) (common.GetRes, error) {

View file

@ -2,12 +2,9 @@ package blobstor
import (
"errors"
"fmt"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
)
// GetRange reads object payload data from b.
@ -37,29 +34,12 @@ func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error)
// Returns an error of type apistatus.ObjectNotFound if object is missing.
func (b *BlobStor) getRangeBig(prm common.GetRangePrm) (common.GetRangeRes, error) {
// get compressed object data
data, err := b.fsTree.Get(common.GetPrm{Address: prm.Address})
res, err := b.fsTree.Get(common.GetPrm{Address: prm.Address})
if err != nil {
if errors.Is(err, fstree.ErrFileNotFound) {
var errNotFound apistatus.ObjectNotFound
return common.GetRangeRes{}, errNotFound
}
return common.GetRangeRes{}, fmt.Errorf("could not read object from fs tree: %w", err)
return common.GetRangeRes{}, err
}
data, err = b.Decompress(data)
if err != nil {
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// unmarshal the object
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
payload := obj.Payload()
payload := res.Object.Payload()
ln, off := prm.Range.GetLength(), prm.Range.GetOffset()
if pLen := uint64(len(payload)); ln+off < off || pLen < off || pLen < ln+off {

View file

@ -21,24 +21,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err)
}
// FIXME decompress in the fstree
iPrm := prm
iPrm.Handler = func(element common.IterationElement) error {
data, err := b.Decompress(element.ObjectData)
if err != nil {
if prm.IgnoreErrors {
if prm.ErrorHandler != nil {
return prm.ErrorHandler(element.Address, err)
}
return nil
}
return fmt.Errorf("could not decompress object data: %w", err)
}
element.ObjectData = data
return prm.Handler(element)
}
_, err = b.fsTree.Iterate(iPrm)
_, err = b.fsTree.Iterate(prm)
if err != nil && !prm.IgnoreErrors {
return common.IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err)
}

View file

@ -66,11 +66,11 @@ func TestIterateObjects(t *testing.T) {
}
for _, v := range mObjs {
_, err := blobStor.PutRaw(common.PutPrm{Address: v.addr, RawData: v.data}, true)
_, err := blobStor.Put(common.PutPrm{Address: v.addr, RawData: v.data})
require.NoError(t, err)
}
err := IterateBinaryObjects(blobStor, func(_ oid.Address, data []byte, descriptor []byte) error {
err := IterateBinaryObjects(blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
v, ok := mObjs[string(data)]
require.True(t, ok)

View file

@ -2,9 +2,7 @@ package blobstor
import (
"fmt"
"os"
"github.com/klauspost/compress/zstd"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
@ -20,7 +18,9 @@ import (
// Returns any error encountered that
// did not allow to completely save the object.
func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) {
prm.Address = object.AddressOf(prm.Object)
if prm.Object != nil {
prm.Address = object.AddressOf(prm.Object)
}
if prm.RawData == nil {
// marshal object
data, err := prm.Object.Marshal()
@ -30,7 +30,21 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) {
prm.RawData = data
}
return b.PutRaw(prm, b.NeedsCompression(prm.Object))
big := b.isBig(prm.RawData)
if big {
_, err := b.fsTree.Put(prm)
if err != nil {
return common.PutRes{}, err
}
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree PUT"))
return common.PutRes{}, nil
}
// save object in blobovnicza
return b.blobovniczas.Put(prm)
}
// NeedsCompression returns true if the object should be compressed.
@ -41,40 +55,6 @@ func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
return b.cfg.CConfig.NeedsCompression(obj)
}
// PutRaw saves an already marshaled object in BLOB storage.
func (b *BlobStor) PutRaw(prm common.PutPrm, compress bool) (common.PutRes, error) {
big := b.isBig(prm.RawData)
if big {
var err error
if compress {
err = b.fsTree.PutStream(prm.Address, func(f *os.File) error {
enc, _ := zstd.NewWriter(f) // nil error if no options are provided
if _, err := enc.Write(prm.RawData); err != nil {
return err
}
return enc.Close()
})
} else {
_, err = b.fsTree.Put(prm)
}
if err != nil {
return common.PutRes{}, err
}
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree PUT"))
return common.PutRes{}, nil
}
if compress {
prm.RawData = b.CConfig.Compress(prm.RawData)
}
// save object in blobovnicza
return b.blobovniczas.Put(prm)
}
// checks if object is "big".
func (b *BlobStor) isBig(data []byte) bool {
return uint64(len(data)) > b.smallSizeLimit

View file

@ -1,11 +1,8 @@
package writecache
import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
@ -44,13 +41,7 @@ func (c *cache) Delete(addr oid.Address) error {
return nil
}
err := c.fsTree.Delete(addr)
if errors.Is(err, fstree.ErrFileNotFound) {
var errNotFound apistatus.ObjectNotFound
err = errNotFound
}
_, err := c.fsTree.Delete(common.DeletePrm{Address: addr})
if err == nil {
storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("fstree DELETE"))
c.objCounters.DecFS()

View file

@ -158,8 +158,9 @@ func (c *cache) flushBigObjects() {
var prm common.PutPrm
prm.Address = addr
prm.RawData = data
prm.DontCompress = !compress
if _, err := c.blobstor.PutRaw(common.PutPrm{Address: addr, RawData: data}, compress); err != nil {
if _, err := c.blobstor.Put(prm); err != nil {
c.log.Error("cant flush object to blobstor", zap.Error(err))
return nil
}

View file

@ -22,20 +22,15 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) {
return obj, obj.Unmarshal(value)
}
data, err := c.fsTree.Get(common.GetPrm{Address: addr})
res, err := c.fsTree.Get(common.GetPrm{Address: addr})
if err != nil {
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return nil, err
}
c.flushed.Get(saddr)
return obj, nil
return res.Object, nil
}
// Head returns object header from write-cache.

View file

@ -7,6 +7,7 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
"github.com/nspcc-dev/neofs-node/pkg/util"
@ -145,7 +146,8 @@ func (c *cache) deleteFromDisk(keys [][]byte) error {
continue
}
if err := c.fsTree.Delete(addr); err != nil && !errors.Is(err, fstree.ErrFileNotFound) {
_, err := c.fsTree.Delete(common.DeletePrm{Address: addr})
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
lastErr = err
c.log.Error("can't remove object from write-cache", zap.Error(err))
continue