From 266458fe5c496a055dc8abcd9ec3282fb80a34f2 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 8 Jul 2022 10:09:48 +0300 Subject: [PATCH] [#1523] blobstor: Unify fstree and blobovnicza interfaces Signed-off-by: Evgenii Stratonikov --- .../blobstor/blobovniczatree/blobovnicza.go | 6 ++ .../blobstor/blobovniczatree/control.go | 2 +- .../blobstor/blobovniczatree/exists.go | 1 + pkg/local_object_storage/blobstor/blobstor.go | 1 + .../blobstor/common/get.go | 4 +- .../blobstor/common/put.go | 7 +- .../blobstor/common/storage.go | 12 +++ .../blobstor/compression/compress.go | 6 +- pkg/local_object_storage/blobstor/delete.go | 11 +-- pkg/local_object_storage/blobstor/exists.go | 10 +-- .../blobstor/fstree/fstree.go | 83 +++++++++++++++---- .../blobstor/fstree/fstree_test.go | 36 ++++---- pkg/local_object_storage/blobstor/get.go | 27 +----- .../blobstor/get_range.go | 26 +----- pkg/local_object_storage/blobstor/iterate.go | 19 +---- .../blobstor/iterate_test.go | 4 +- pkg/local_object_storage/blobstor/put.go | 56 ++++--------- pkg/local_object_storage/writecache/delete.go | 13 +-- pkg/local_object_storage/writecache/flush.go | 3 +- pkg/local_object_storage/writecache/get.go | 9 +- .../writecache/storage.go | 4 +- 21 files changed, 157 insertions(+), 183 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/common/storage.go diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go index c68b7ed77..28a9c6547 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go @@ -87,6 +87,8 @@ type blobovniczaWithIndex struct { blz *blobovnicza.Blobovnicza } +var _ common.Storage = (*Blobovniczas)(nil) + var errPutFailed = errors.New("could not save the object in any blobovnicza") // NewBlobovniczaTree returns new instance of blobovnizas tree. @@ -143,6 +145,10 @@ func indexSlice(number uint64) []uint64 { // // returns error if could not save object in any blobovnicza. func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) { + if !prm.DontCompress { + prm.RawData = b.CConfig.Compress(prm.RawData) + } + var putPrm blobovnicza.PutPrm putPrm.SetAddress(prm.Address) putPrm.SetMarshaledObject(prm.RawData) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index 4db7df0e5..58450d9a0 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -45,7 +45,7 @@ func (b *Blobovniczas) Init() error { }) } -// closes blobovnicza tree. +// Close implements common.Storage. func (b *Blobovniczas) Close() error { b.activeMtx.Lock() diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go index d4391a6c8..586e5bc01 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" ) +// Exists implements common.Storage. func (b *Blobovniczas) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { activeCache := make(map[string]struct{}) diff --git a/pkg/local_object_storage/blobstor/blobstor.go b/pkg/local_object_storage/blobstor/blobstor.go index 2505278c9..80ace0067 100644 --- a/pkg/local_object_storage/blobstor/blobstor.go +++ b/pkg/local_object_storage/blobstor/blobstor.go @@ -55,6 +55,7 @@ func initConfig(c *cfg) { fsTree: fstree.FSTree{ Depth: defaultShallowDepth, DirNameLen: hex.EncodedLen(fstree.DirNameLen), + CConfig: &c.CConfig, Info: Info{ Permissions: defaultPerm, RootPath: "./", diff --git a/pkg/local_object_storage/blobstor/common/get.go b/pkg/local_object_storage/blobstor/common/get.go index 414c39ea1..c8a96c5ad 100644 --- a/pkg/local_object_storage/blobstor/common/get.go +++ b/pkg/local_object_storage/blobstor/common/get.go @@ -8,8 +8,10 @@ import ( type GetPrm struct { Address oid.Address StorageID []byte + Raw bool } type GetRes struct { - Object *objectSDK.Object + Object *objectSDK.Object + RawData []byte } diff --git a/pkg/local_object_storage/blobstor/common/put.go b/pkg/local_object_storage/blobstor/common/put.go index 053036ba5..bcbbdce2e 100644 --- a/pkg/local_object_storage/blobstor/common/put.go +++ b/pkg/local_object_storage/blobstor/common/put.go @@ -7,9 +7,10 @@ import ( // PutPrm groups the parameters of Put operation. type PutPrm struct { - Address oid.Address - Object *objectSDK.Object - RawData []byte + Address oid.Address + Object *objectSDK.Object + RawData []byte + DontCompress bool } // PutRes groups the resulting values of Put operation. diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go new file mode 100644 index 000000000..5674333bd --- /dev/null +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -0,0 +1,12 @@ +package common + +// Storage represents key-value object storage. +// It is used as a building block for a blobstor of a shard. +type Storage interface { + Get(GetPrm) (GetRes, error) + GetRange(GetRangePrm) (GetRangeRes, error) + Exists(ExistsPrm) (ExistsRes, error) + Put(PutPrm) (PutRes, error) + Delete(DeletePrm) (DeleteRes, error) + Iterate(IteratePrm) (IterateRes, error) +} diff --git a/pkg/local_object_storage/blobstor/compression/compress.go b/pkg/local_object_storage/blobstor/compression/compress.go index f433d26ea..e71dc8a1a 100644 --- a/pkg/local_object_storage/blobstor/compression/compress.go +++ b/pkg/local_object_storage/blobstor/compression/compress.go @@ -83,10 +83,10 @@ func (c *CConfig) Decompress(data []byte) ([]byte, error) { // Compress compresses data if compression is enabled // and returns data untouched otherwise. func (c *CConfig) Compress(data []byte) []byte { - if c.Enabled { - return c.encoder.EncodeAll(data, make([]byte, 0, len(data))) + if c == nil || !c.Enabled { + return data } - return data + return c.encoder.EncodeAll(data, make([]byte, 0, len(data))) } // Close closes encoder and decoder, returns any error occured. diff --git a/pkg/local_object_storage/blobstor/delete.go b/pkg/local_object_storage/blobstor/delete.go index 8b675ea48..6d9ef6738 100644 --- a/pkg/local_object_storage/blobstor/delete.go +++ b/pkg/local_object_storage/blobstor/delete.go @@ -4,7 +4,6 @@ import ( "errors" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" ) @@ -31,18 +30,12 @@ func (b *BlobStor) Delete(prm common.DeletePrm) (common.DeleteRes, error) { // // Returns an error of type apistatus.ObjectNotFound if there is no object to delete. func (b *BlobStor) deleteBig(prm common.DeletePrm) (common.DeleteRes, error) { - err := b.fsTree.Delete(prm.Address) - if errors.Is(err, fstree.ErrFileNotFound) { - var errNotFound apistatus.ObjectNotFound - - err = errNotFound - } - + res, err := b.fsTree.Delete(prm) if err == nil { storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree DELETE")) } - return common.DeleteRes{}, err + return res, err } // deleteSmall removes an object from blobovnicza of BLOB storage. diff --git a/pkg/local_object_storage/blobstor/exists.go b/pkg/local_object_storage/blobstor/exists.go index e8dd46eb4..3300e988e 100644 --- a/pkg/local_object_storage/blobstor/exists.go +++ b/pkg/local_object_storage/blobstor/exists.go @@ -1,10 +1,7 @@ package blobstor import ( - "errors" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "go.uber.org/zap" ) @@ -45,12 +42,7 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { // checks if object is presented in shallow dir. func (b *BlobStor) existsBig(prm common.ExistsPrm) (common.ExistsRes, error) { - _, err := b.fsTree.Exists(prm.Address) - if errors.Is(err, fstree.ErrFileNotFound) { - return common.ExistsRes{}, nil - } - - return common.ExistsRes{Exists: err == nil}, err + return b.fsTree.Exists(prm) } // existsSmall checks if object is presented in blobovnicza. diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 6a08dc87c..3865b567c 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -10,8 +10,11 @@ import ( "strings" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-node/pkg/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) @@ -19,6 +22,7 @@ import ( type FSTree struct { Info + *compression.CConfig Depth int DirNameLen int } @@ -39,8 +43,7 @@ const ( MaxDepth = (sha256.Size - 1) / DirNameLen ) -// ErrFileNotFound is returned when file is missing. -var ErrFileNotFound = errors.New("file not found") +var _ common.Storage = (*FSTree)(nil) func stringifyAddress(addr oid.Address) string { return addr.Object().EncodeToString() + "." + addr.Container().EncodeToString() @@ -116,6 +119,9 @@ func (t *FSTree) iterate(depth int, curPath []string, prm common.IteratePrm) err } else { var data []byte data, err = os.ReadFile(filepath.Join(curPath...)) + if err == nil { + data, err = t.Decompress(data) + } if err != nil { if prm.IgnoreErrors { if prm.ErrorHandler != nil { @@ -158,25 +164,34 @@ func (t *FSTree) treePath(addr oid.Address) string { } // Delete removes the object with the specified address from the storage. -func (t *FSTree) Delete(addr oid.Address) error { - p, err := t.Exists(addr) +func (t *FSTree) Delete(prm common.DeletePrm) (common.DeleteRes, error) { + p, err := t.getPath(prm.Address) if err != nil { - return err + if os.IsNotExist(err) { + var errNotFound apistatus.ObjectNotFound + err = errNotFound + } + return common.DeleteRes{}, err } - return os.Remove(p) + return common.DeleteRes{}, os.Remove(p) } // Exists returns the path to the file with object contents if it exists in the storage // and an error otherwise. -func (t *FSTree) Exists(addr oid.Address) (string, error) { +func (t *FSTree) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { + _, err := t.getPath(prm.Address) + found := err == nil + if os.IsNotExist(err) { + err = nil + } + return common.ExistsRes{Exists: found}, err +} + +func (t *FSTree) getPath(addr oid.Address) (string, error) { p := t.treePath(addr) _, err := os.Stat(p) - if os.IsNotExist(err) { - err = ErrFileNotFound - } - return p, err } @@ -187,7 +202,9 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) { if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil { return common.PutRes{}, err } - + if !prm.DontCompress { + prm.RawData = t.Compress(prm.RawData) + } return common.PutRes{}, os.WriteFile(p, prm.RawData, t.Permissions) } @@ -209,14 +226,50 @@ func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error } // Get returns an object from the storage by address. -func (t *FSTree) Get(prm common.GetPrm) ([]byte, error) { +func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) { p := t.treePath(prm.Address) if _, err := os.Stat(p); os.IsNotExist(err) { - return nil, ErrFileNotFound + var errNotFound apistatus.ObjectNotFound + return common.GetRes{}, errNotFound } - return os.ReadFile(p) + data, err := os.ReadFile(p) + if err != nil { + return common.GetRes{}, err + } + + data, err = t.Decompress(data) + if err != nil { + return common.GetRes{}, err + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, err + } + + return common.GetRes{Object: obj, RawData: data}, err +} + +// GetRange implements common.Storage. +func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { + res, err := t.Get(common.GetPrm{Address: prm.Address}) + if err != nil { + return common.GetRangeRes{}, err + } + + payload := res.Object.Payload() + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, apistatus.ObjectOutOfRange{} + } + + return common.GetRangeRes{ + Data: payload[from:to], + }, nil } // NumberOfObjects walks the file tree rooted at FSTree's root diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 9c54dffba..1c7103d7c 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -1,7 +1,6 @@ package fstree import ( - "crypto/rand" "errors" "os" "path/filepath" @@ -11,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/util" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/stretchr/testify/require" ) @@ -45,10 +45,10 @@ func TestFSTree(t *testing.T) { a := oidtest.Address() addrs = append(addrs, a) - data := make([]byte, 10) - _, _ = rand.Read(data[:]) + data, err := objecttest.Object().Marshal() + require.NoError(t, err) - _, err := fs.Put(common.PutPrm{Address: a, RawData: data}) + _, err = fs.Put(common.PutPrm{Address: a, RawData: data}) require.NoError(t, err) store[a.EncodeToString()] = data } @@ -57,7 +57,7 @@ func TestFSTree(t *testing.T) { for _, a := range addrs { actual, err := fs.Get(common.GetPrm{Address: a}) require.NoError(t, err) - require.Equal(t, store[a.EncodeToString()], actual) + require.Equal(t, store[a.EncodeToString()], actual.RawData) } _, err := fs.Get(common.GetPrm{Address: oidtest.Address()}) @@ -66,12 +66,14 @@ func TestFSTree(t *testing.T) { t.Run("exists", func(t *testing.T) { for _, a := range addrs { - _, err := fs.Exists(a) + res, err := fs.Exists(common.ExistsPrm{Address: a}) require.NoError(t, err) + require.True(t, res.Exists) } - _, err := fs.Exists(oidtest.Address()) - require.Error(t, err) + res, err := fs.Exists(common.ExistsPrm{Address: oidtest.Address()}) + require.NoError(t, err) + require.False(t, res.Exists) }) t.Run("iterate", func(t *testing.T) { @@ -154,14 +156,18 @@ func TestFSTree(t *testing.T) { }) t.Run("delete", func(t *testing.T) { - require.NoError(t, fs.Delete(addrs[0])) - - _, err := fs.Exists(addrs[0]) - require.Error(t, err) - - _, err = fs.Exists(addrs[1]) + _, err := fs.Delete(common.DeletePrm{Address: addrs[0]}) require.NoError(t, err) - require.Error(t, fs.Delete(oidtest.Address())) + res, err := fs.Exists(common.ExistsPrm{Address: addrs[0]}) + require.NoError(t, err) + require.False(t, res.Exists) + + res, err = fs.Exists(common.ExistsPrm{Address: addrs[1]}) + require.NoError(t, err) + require.True(t, res.Exists) + + _, err = fs.Delete(common.DeletePrm{Address: oidtest.Address()}) + require.Error(t, err) }) } diff --git a/pkg/local_object_storage/blobstor/get.go b/pkg/local_object_storage/blobstor/get.go index fe9a8223b..97b7ebea7 100644 --- a/pkg/local_object_storage/blobstor/get.go +++ b/pkg/local_object_storage/blobstor/get.go @@ -2,12 +2,9 @@ package blobstor import ( "errors" - "fmt" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" ) // Get reads the object from b. @@ -37,29 +34,7 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) { // presented in shallow dir. func (b *BlobStor) getBig(prm common.GetPrm) (common.GetRes, error) { // get compressed object data - data, err := b.fsTree.Get(prm) - if err != nil { - if errors.Is(err, fstree.ErrFileNotFound) { - var errNotFound apistatus.ObjectNotFound - - return common.GetRes{}, errNotFound - } - - return common.GetRes{}, fmt.Errorf("could not read object from fs tree: %w", err) - } - - data, err = b.Decompress(data) - if err != nil { - return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err) - } - - // unmarshal the object - obj := objectSDK.New() - if err := obj.Unmarshal(data); err != nil { - return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err) - } - - return common.GetRes{Object: obj}, nil + return b.fsTree.Get(prm) } func (b *BlobStor) getSmall(prm common.GetPrm) (common.GetRes, error) { diff --git a/pkg/local_object_storage/blobstor/get_range.go b/pkg/local_object_storage/blobstor/get_range.go index f059b2b0d..a161c6381 100644 --- a/pkg/local_object_storage/blobstor/get_range.go +++ b/pkg/local_object_storage/blobstor/get_range.go @@ -2,12 +2,9 @@ package blobstor import ( "errors" - "fmt" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" ) // GetRange reads object payload data from b. @@ -37,29 +34,12 @@ func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) // Returns an error of type apistatus.ObjectNotFound if object is missing. func (b *BlobStor) getRangeBig(prm common.GetRangePrm) (common.GetRangeRes, error) { // get compressed object data - data, err := b.fsTree.Get(common.GetPrm{Address: prm.Address}) + res, err := b.fsTree.Get(common.GetPrm{Address: prm.Address}) if err != nil { - if errors.Is(err, fstree.ErrFileNotFound) { - var errNotFound apistatus.ObjectNotFound - - return common.GetRangeRes{}, errNotFound - } - - return common.GetRangeRes{}, fmt.Errorf("could not read object from fs tree: %w", err) + return common.GetRangeRes{}, err } - data, err = b.Decompress(data) - if err != nil { - return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err) - } - - // unmarshal the object - obj := objectSDK.New() - if err := obj.Unmarshal(data); err != nil { - return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err) - } - - payload := obj.Payload() + payload := res.Object.Payload() ln, off := prm.Range.GetLength(), prm.Range.GetOffset() if pLen := uint64(len(payload)); ln+off < off || pLen < off || pLen < ln+off { diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index bdf33d608..0a737a393 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -21,24 +21,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) { return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err) } - // FIXME decompress in the fstree - iPrm := prm - iPrm.Handler = func(element common.IterationElement) error { - data, err := b.Decompress(element.ObjectData) - if err != nil { - if prm.IgnoreErrors { - if prm.ErrorHandler != nil { - return prm.ErrorHandler(element.Address, err) - } - return nil - } - return fmt.Errorf("could not decompress object data: %w", err) - } - element.ObjectData = data - return prm.Handler(element) - } - - _, err = b.fsTree.Iterate(iPrm) + _, err = b.fsTree.Iterate(prm) if err != nil && !prm.IgnoreErrors { return common.IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err) } diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index 2aad4741c..f5dfd5f33 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -66,11 +66,11 @@ func TestIterateObjects(t *testing.T) { } for _, v := range mObjs { - _, err := blobStor.PutRaw(common.PutPrm{Address: v.addr, RawData: v.data}, true) + _, err := blobStor.Put(common.PutPrm{Address: v.addr, RawData: v.data}) require.NoError(t, err) } - err := IterateBinaryObjects(blobStor, func(_ oid.Address, data []byte, descriptor []byte) error { + err := IterateBinaryObjects(blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { v, ok := mObjs[string(data)] require.True(t, ok) diff --git a/pkg/local_object_storage/blobstor/put.go b/pkg/local_object_storage/blobstor/put.go index 1869b04a6..af8db5861 100644 --- a/pkg/local_object_storage/blobstor/put.go +++ b/pkg/local_object_storage/blobstor/put.go @@ -2,9 +2,7 @@ package blobstor import ( "fmt" - "os" - "github.com/klauspost/compress/zstd" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" @@ -20,7 +18,9 @@ import ( // Returns any error encountered that // did not allow to completely save the object. func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) { - prm.Address = object.AddressOf(prm.Object) + if prm.Object != nil { + prm.Address = object.AddressOf(prm.Object) + } if prm.RawData == nil { // marshal object data, err := prm.Object.Marshal() @@ -30,7 +30,21 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) { prm.RawData = data } - return b.PutRaw(prm, b.NeedsCompression(prm.Object)) + big := b.isBig(prm.RawData) + + if big { + _, err := b.fsTree.Put(prm) + if err != nil { + return common.PutRes{}, err + } + + storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree PUT")) + + return common.PutRes{}, nil + } + + // save object in blobovnicza + return b.blobovniczas.Put(prm) } // NeedsCompression returns true if the object should be compressed. @@ -41,40 +55,6 @@ func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool { return b.cfg.CConfig.NeedsCompression(obj) } -// PutRaw saves an already marshaled object in BLOB storage. -func (b *BlobStor) PutRaw(prm common.PutPrm, compress bool) (common.PutRes, error) { - big := b.isBig(prm.RawData) - - if big { - var err error - if compress { - err = b.fsTree.PutStream(prm.Address, func(f *os.File) error { - enc, _ := zstd.NewWriter(f) // nil error if no options are provided - if _, err := enc.Write(prm.RawData); err != nil { - return err - } - return enc.Close() - }) - } else { - _, err = b.fsTree.Put(prm) - } - if err != nil { - return common.PutRes{}, err - } - - storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree PUT")) - - return common.PutRes{}, nil - } - - if compress { - prm.RawData = b.CConfig.Compress(prm.RawData) - } - - // save object in blobovnicza - return b.blobovniczas.Put(prm) -} - // checks if object is "big". func (b *BlobStor) isBig(data []byte) bool { return uint64(len(data)) > b.smallSizeLimit diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index 27f31cde5..f9879b799 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -1,11 +1,8 @@ package writecache import ( - "errors" - - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -44,13 +41,7 @@ func (c *cache) Delete(addr oid.Address) error { return nil } - err := c.fsTree.Delete(addr) - if errors.Is(err, fstree.ErrFileNotFound) { - var errNotFound apistatus.ObjectNotFound - - err = errNotFound - } - + _, err := c.fsTree.Delete(common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("fstree DELETE")) c.objCounters.DecFS() diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 162261fe4..cf9216a14 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -158,8 +158,9 @@ func (c *cache) flushBigObjects() { var prm common.PutPrm prm.Address = addr prm.RawData = data + prm.DontCompress = !compress - if _, err := c.blobstor.PutRaw(common.PutPrm{Address: addr, RawData: data}, compress); err != nil { + if _, err := c.blobstor.Put(prm); err != nil { c.log.Error("cant flush object to blobstor", zap.Error(err)) return nil } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 6b18fea67..dc7bb2d1c 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -22,20 +22,15 @@ func (c *cache) Get(addr oid.Address) (*objectSDK.Object, error) { return obj, obj.Unmarshal(value) } - data, err := c.fsTree.Get(common.GetPrm{Address: addr}) + res, err := c.fsTree.Get(common.GetPrm{Address: addr}) if err != nil { var errNotFound apistatus.ObjectNotFound return nil, errNotFound } - obj := objectSDK.New() - if err := obj.Unmarshal(data); err != nil { - return nil, err - } - c.flushed.Get(saddr) - return obj, nil + return res.Object, nil } // Head returns object header from write-cache. diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index cc7905dc1..ed7370723 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -7,6 +7,7 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru/simplelru" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" "github.com/nspcc-dev/neofs-node/pkg/util" @@ -145,7 +146,8 @@ func (c *cache) deleteFromDisk(keys [][]byte) error { continue } - if err := c.fsTree.Delete(addr); err != nil && !errors.Is(err, fstree.ErrFileNotFound) { + _, err := c.fsTree.Delete(common.DeletePrm{Address: addr}) + if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) { lastErr = err c.log.Error("can't remove object from write-cache", zap.Error(err)) continue