From 9f51c850547cf3b3f04f045268e7bbe0f8afc33d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 2 Sep 2020 15:50:51 +0300 Subject: [PATCH] [#17] localstorage: Adopt local object storage for new types Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/localstore/alias.go | 35 -- pkg/local_object_storage/localstore/del.go | 38 -- pkg/local_object_storage/localstore/filter.go | 21 - .../localstore/filter_funcs.go | 39 -- .../localstore/filter_test.go | 6 - pkg/local_object_storage/localstore/get.go | 30 -- pkg/local_object_storage/localstore/has.go | 20 - .../localstore/interface.go | 102 ----- pkg/local_object_storage/localstore/list.go | 41 -- .../localstore/localstore.pb.go | Bin 11986 -> 0 bytes .../localstore/localstore.proto | 14 - .../localstore/localstore_test.go | 418 ------------------ pkg/local_object_storage/localstore/meta.go | 104 +++-- .../localstore/methods.go | 107 +++++ pkg/local_object_storage/localstore/put.go | 47 -- pkg/local_object_storage/localstore/range.go | 36 -- .../localstore/storage.go | 53 +++ 17 files changed, 227 insertions(+), 884 deletions(-) delete mode 100644 pkg/local_object_storage/localstore/alias.go delete mode 100644 pkg/local_object_storage/localstore/del.go delete mode 100644 pkg/local_object_storage/localstore/filter_funcs.go delete mode 100644 pkg/local_object_storage/localstore/get.go delete mode 100644 pkg/local_object_storage/localstore/has.go delete mode 100644 pkg/local_object_storage/localstore/interface.go delete mode 100644 pkg/local_object_storage/localstore/list.go delete mode 100644 pkg/local_object_storage/localstore/localstore.pb.go delete mode 100644 pkg/local_object_storage/localstore/localstore.proto delete mode 100644 pkg/local_object_storage/localstore/localstore_test.go create mode 100644 pkg/local_object_storage/localstore/methods.go delete mode 100644 pkg/local_object_storage/localstore/put.go delete mode 100644 pkg/local_object_storage/localstore/range.go create mode 100644 pkg/local_object_storage/localstore/storage.go diff --git a/pkg/local_object_storage/localstore/alias.go b/pkg/local_object_storage/localstore/alias.go deleted file mode 100644 index 03053f48..00000000 --- a/pkg/local_object_storage/localstore/alias.go +++ /dev/null @@ -1,35 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/hash" - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-api-go/refs" -) - -// CID is a type alias of -// CID from refs package of neofs-api-go. -type CID = refs.CID - -// SGID is a type alias of -// SGID from refs package of neofs-api-go. -type SGID = refs.ObjectID - -// Header is a type alias of -// Header from object package of neofs-api-go. -type Header = object.Header - -// Object is a type alias of -// Object from object package of neofs-api-go. -type Object = object.Object - -// ObjectID is a type alias of -// ObjectID from refs package of neofs-api-go. -type ObjectID = refs.ObjectID - -// Address is a type alias of -// Address from refs package of neofs-api-go. -type Address = refs.Address - -// Hash is a type alias of -// Hash from hash package of neofs-api-go. -type Hash = hash.Hash diff --git a/pkg/local_object_storage/localstore/del.go b/pkg/local_object_storage/localstore/del.go deleted file mode 100644 index 1a1859f1..00000000 --- a/pkg/local_object_storage/localstore/del.go +++ /dev/null @@ -1,38 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/refs" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -func (l *localstore) Del(key refs.Address) error { - k, err := key.Hash() - if err != nil { - return errors.Wrap(err, "Localstore Del failed on key.Marshal") - } - - // try to fetch object for metrics - obj, err := l.Get(key) - if err != nil { - l.log.Warn("localstore Del failed on localstore.Get", zap.Error(err)) - } - - if err := l.blobBucket.Del(k); err != nil { - l.log.Warn("Localstore Del failed on BlobBucket.Del", zap.Error(err)) - } - - if err := l.metaBucket.Del(k); err != nil { - return errors.Wrap(err, "Localstore Del failed on MetaBucket.Del") - } - - if obj != nil { - l.col.UpdateContainer( - key.CID, - obj.SystemHeader.PayloadLength, - metrics2.RemSpace) - } - - return nil -} diff --git a/pkg/local_object_storage/localstore/filter.go b/pkg/local_object_storage/localstore/filter.go index f27a3767..7f7891bd 100644 --- a/pkg/local_object_storage/localstore/filter.go +++ b/pkg/local_object_storage/localstore/filter.go @@ -175,27 +175,6 @@ func NewFilter(p *FilterParams) FilterPipeline { } } -// AllPassIncludingFilter returns FilterPipeline with sub-filters composed from parameters. -// Result filter fails with CodeFail code if any of the sub-filters returns not a CodePass code. -func AllPassIncludingFilter(name string, params ...*FilterParams) (FilterPipeline, error) { - res := NewFilter(&FilterParams{ - Name: name, - FilterFunc: SkippingFilterFunc, - }) - - for i := range params { - if err := res.PutSubFilter(SubFilterParams{ - FilterPipeline: NewFilter(params[i]), - OnIgnore: CodeFail, - OnFail: CodeFail, - }); err != nil { - return nil, errors.Wrap(err, "could not create all pass including filter") - } - } - - return res, nil -} - func (p *filterPipeline) Pass(ctx context.Context, meta *ObjectMeta) *FilterResult { p.RLock() defer p.RUnlock() diff --git a/pkg/local_object_storage/localstore/filter_funcs.go b/pkg/local_object_storage/localstore/filter_funcs.go deleted file mode 100644 index c92610c2..00000000 --- a/pkg/local_object_storage/localstore/filter_funcs.go +++ /dev/null @@ -1,39 +0,0 @@ -package localstore - -import ( - "context" -) - -// SkippingFilterFunc is a FilterFunc that always returns result with -// CodePass code and nil error. -func SkippingFilterFunc(_ context.Context, _ *ObjectMeta) *FilterResult { - return ResultPass() -} - -// ContainerFilterFunc returns a FilterFunc that returns: -// - result with CodePass code and nil error if CID of ObjectMeta if from the CID list; -// - result with CodeFail code an nil error otherwise. -func ContainerFilterFunc(cidList []CID) FilterFunc { - return func(_ context.Context, meta *ObjectMeta) *FilterResult { - for i := range cidList { - if meta.Object.SystemHeader.CID.Equal(cidList[i]) { - return ResultPass() - } - } - - return ResultFail() - } -} - -// StoredEarlierThanFilterFunc returns a FilterFunc that returns: -// - result with CodePass code and nil error if StoreEpoch is less that argument; -// - result with CodeFail code and nil error otherwise. -func StoredEarlierThanFilterFunc(epoch uint64) FilterFunc { - return func(_ context.Context, meta *ObjectMeta) *FilterResult { - if meta.StoreEpoch < epoch { - return ResultPass() - } - - return ResultFail() - } -} diff --git a/pkg/local_object_storage/localstore/filter_test.go b/pkg/local_object_storage/localstore/filter_test.go index 2da66b7f..96fd8c6a 100644 --- a/pkg/local_object_storage/localstore/filter_test.go +++ b/pkg/local_object_storage/localstore/filter_test.go @@ -1,18 +1,12 @@ package localstore import ( - "context" "errors" "testing" "github.com/stretchr/testify/require" ) -func TestSkippingFilterFunc(t *testing.T) { - res := SkippingFilterFunc(context.TODO(), &ObjectMeta{}) - require.Equal(t, CodePass, res.Code()) -} - func TestFilterResult(t *testing.T) { var ( r *FilterResult diff --git a/pkg/local_object_storage/localstore/get.go b/pkg/local_object_storage/localstore/get.go deleted file mode 100644 index 4e4090f4..00000000 --- a/pkg/local_object_storage/localstore/get.go +++ /dev/null @@ -1,30 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/pkg/errors" -) - -func (l *localstore) Get(key refs.Address) (*Object, error) { - var ( - err error - k, v []byte - o = new(Object) - ) - - k, err = key.Hash() - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on key.Marshal") - } - - v, err = l.blobBucket.Get(k) - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on blobBucket.Get") - } - - if err = o.Unmarshal(v); err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on Object.Unmarshal") - } - - return o, nil -} diff --git a/pkg/local_object_storage/localstore/has.go b/pkg/local_object_storage/localstore/has.go deleted file mode 100644 index 831e77de..00000000 --- a/pkg/local_object_storage/localstore/has.go +++ /dev/null @@ -1,20 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/pkg/errors" -) - -func (l *localstore) Has(key refs.Address) (bool, error) { - var ( - err error - k []byte - ) - - k, err = key.Hash() - if err != nil { - return false, errors.Wrap(err, "localstore.Has failed on key.Marshal") - } - - return l.metaBucket.Has(k) && l.blobBucket.Has(k), nil -} diff --git a/pkg/local_object_storage/localstore/interface.go b/pkg/local_object_storage/localstore/interface.go deleted file mode 100644 index 236c8952..00000000 --- a/pkg/local_object_storage/localstore/interface.go +++ /dev/null @@ -1,102 +0,0 @@ -package localstore - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type ( - // Localstore is an interface of local object storage. - Localstore interface { - Put(context.Context, *Object) error - Get(Address) (*Object, error) - Del(Address) error - Meta(Address) (*ObjectMeta, error) - Iterator - Has(Address) (bool, error) - ObjectsCount() (uint64, error) - - object.PositionReader - Size() int64 - } - - // MetaHandler is a function that handles ObjectMeta. - MetaHandler func(*ObjectMeta) bool - - // Iterator is an interface of the iterator over local object storage. - Iterator interface { - Iterate(FilterPipeline, MetaHandler) error - } - - // ListItem is an ObjectMeta wrapper. - ListItem struct { - ObjectMeta - } - - // Params groups the parameters of - // local object storage constructor. - Params struct { - BlobBucket bucket.Bucket - MetaBucket bucket.Bucket - Logger *zap.Logger - Collector metrics2.Collector - } - - localstore struct { - metaBucket bucket.Bucket - blobBucket bucket.Bucket - - log *zap.Logger - col metrics2.Collector - } -) - -// ErrOutOfRange is returned when requested object payload range is -// out of object payload bounds. -var ErrOutOfRange = errors.New("range is out of payload bounds") - -// ErrEmptyMetaHandler is returned by functions that expect -// a non-nil MetaHandler, but received nil. -var ErrEmptyMetaHandler = errors.New("meta handler is nil") - -var errNilLogger = errors.New("logger is nil") - -var errNilCollector = errors.New("metrics collector is nil") - -// New is a local object storage constructor. -func New(p Params) (Localstore, error) { - switch { - case p.MetaBucket == nil: - return nil, errors.New("meta bucket is nil") - case p.BlobBucket == nil: - return nil, errors.New("blob bucket is nil") - case p.Logger == nil: - return nil, errNilLogger - case p.Collector == nil: - return nil, errNilCollector - } - - return &localstore{ - metaBucket: p.MetaBucket, - blobBucket: p.BlobBucket, - log: p.Logger, - col: p.Collector, - }, nil -} - -func (l localstore) Size() int64 { return l.blobBucket.Size() } - -// TODO: implement less costly method of counting. -func (l localstore) ObjectsCount() (uint64, error) { - items, err := l.metaBucket.List() - if err != nil { - return 0, err - } - - return uint64(len(items)), nil -} diff --git a/pkg/local_object_storage/localstore/list.go b/pkg/local_object_storage/localstore/list.go deleted file mode 100644 index c4e1ec62..00000000 --- a/pkg/local_object_storage/localstore/list.go +++ /dev/null @@ -1,41 +0,0 @@ -package localstore - -import ( - "context" - - "go.uber.org/zap" -) - -func (l *localstore) Iterate(filter FilterPipeline, handler MetaHandler) error { - if handler == nil { - return ErrEmptyMetaHandler - } else if filter == nil { - filter = NewFilter(&FilterParams{ - Name: "SKIPPING_FILTER", - FilterFunc: SkippingFilterFunc, - }) - } - - return l.metaBucket.Iterate(func(_, v []byte) bool { - meta := new(ObjectMeta) - if err := meta.Unmarshal(v); err != nil { - l.log.Error("unmarshal meta bucket item failure", zap.Error(err)) - } else if filter.Pass(context.TODO(), meta).Code() == CodePass { - return !handler(meta) - } - return true - }) -} - -// ListItems iterates over Iterator with FilterPipeline and returns all passed items. -func ListItems(it Iterator, f FilterPipeline) ([]ListItem, error) { - res := make([]ListItem, 0) - err := it.Iterate(f, func(meta *ObjectMeta) (stop bool) { - res = append(res, ListItem{ - ObjectMeta: *meta, - }) - return - }) - - return res, err -} diff --git a/pkg/local_object_storage/localstore/localstore.pb.go b/pkg/local_object_storage/localstore/localstore.pb.go deleted file mode 100644 index 8700f28ea2a06d30f060bf807786415f046641d5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 11986 zcmds7d2icB7XMrN6te&%FGNrA3UMwcN)-=iNa!T^8NMiS>nmbQxmQQorYEa@I17X&e?hUKWq7-y02jeanfXR#x>#k2 zN zu&2^ef!<7=*{L7Q2F}Qz1><0DFy8dvHZ7M8n~*DhPGTvDZ5JHK3msEeDn@ay2Gu!4 z%@o&?R#*8N5fO(;m=~d^+%%19Dscg0l~BXiA;I11`S9>?6N}YQl+&BaXSSHP-L8r^ zcy*eMw@ODSPejUvuiaN?p0jMboog&4T~bq>U8vYnxk@RY{I6C4HXVQg3Stq|KQ2@f zMoh>Gg=eM!pttyETmlx!b(es_{h*H3HB_^Jty;#&q?|^0SQ>|fIu7IF$u-{=zbid@$&?MkvNgjoIJZHb-&exxE#`K zY!DC*E{f|8GMHH12w{vjq+8GSb%75r`%$Sf*r z5R-Sp-j8~LRMXiNHm7#GURK&rztdH%s#i27q5|1s$8qJ{NAVk!n|A9%`qWleJyEO5 zuz3OwSu8~g`lA0BPG@Mc(HKyJ21JRn(s$^9iGy<#}vkz?)_Pp7u6U_`n_R)+;k60pi#4MSp!}lY;7owu^h#8ES zq?#VK7<1|9HKGo)s- z2>THW;V|im=*QZeilHW~!GIq|{9+=8V$f42C+Q|^L@t+MK5svA|iYH+z&XKtG*qVK@J3CyY7A>c|K9XEv z$SIphGGAmL^LoIdOvMLCXe5!yB|Z>PSAh7fWV^)sDLYmQhTzJ&Sz!-JEKMa341*dW zN1~foBm=ZqnayP~Ky+mo$r#Z_j9C_Z@_{@^eV6N;-8*JeI7}iCQ?TpAHx-F}t%PlY z(dhbL7GIhjOZv^Kwrk|kDh(6h^67H)rg1!d$cH3&YqN;1H zx*|cfZR-$CqWT4#j+Af_t@JSNo)OnsE&nFyafij4LV(6Ca=OXH2yn*U}|bodsQn( z{c4}bi4cO}d<*A>zwr3~9UTGYG{iI2t_r)R6K&TCbclC&e2gnwFuVpG^ap)z5}8}A zIsHI(H41z94d%=b%~+#yyg{Rb5j$@{Rx%^OklcZ~sZC#X_%cRHn|40mH-RBnkxigS z^VfUWk)$Af$Sq{Sx&a46<0~0B>T?At$y(M#7pUFyqeON{>BuT?&XIN~Db-*-)(rI_ zT8TBbBEzXq4`pJgV~Vf(Sacv3%Q}_@0nSn=jk@aIL}xi_nwRA5^u89&_ppOYhlNN? zfoV2^Gy{PzF4W-se8%a(Mw8bR(KLh2+xMShp9$2T2=IBI%n2MI&2T56M@OvPZe$?X z;T-JujlNV_Fh;c#Qj04YC6T(Q27g0DGHg#F*^ZhBTG&yv%7#Q8L1}e_Vsx}&CmB22 zk%>^jucwGeiY?*YXzZ^AC4qHE83E$nM*1JBVo#B(DxXa}s)vWlE>om9-3Eh0A&cG6 z%2|6|i-$5UYc4f7Z93oQFAlx$bTS>cAO0M4;D~8Qogk{rVOgI3Q48Ng^wv2W`?Vj` z`MCn!f7$v?dlWJ2AbQnw)WM(SwRu=s9k)?TZ=cIW8{1p>!oPf=HAE*Rl{}v#yr^p7 zW6ATx*9^dKD21>Sz7FxtLd3-i;pMksP!JtjZTC@oKR0WLSIJPS-U5ac#6dQfMeB{r zEPI|5FYk30MCt3wd?0VCTVP}ZvN=z)xM514VSY~Z!xEprw14^Xn-a|>nK->!{%d-{ zca-^!1}pNXNDT|SR4=~iko{Rwh7Odg&bfjZk=;x?ca{SJsu6@b@?WfC;sy0b!#2}} zvqn^O=z$JZ%xii1DYd}#7FH<%P&ABPOu$v2m~ehVA5+e1=gw!0$lpnH;;TiLu9k|N zk)-%$NKZJMF`Ha~a}*=wx$PjV{hnYFMv-1Pkt&>p`ovcvK6CxV@3iQEMhJIV9Lq7s zhbnwZurJwk9#!$d8Dv`18ZFImJsNgM^g2s%fL>XOL?K<7fiVh%u>d571P;Y(oG(xj zq#2undZfMi9k6NdWLuf@gejfT}9xF_sM+H%YF^4ZN`m8 zwW>Z%`TL>)!zUFRG(gt&jNi0s>y`BtJNB%U?RrPIXhBurR!@nXe zYP=Elz45?5b_^BgXk? zSDbIuo%b&PwT5$NoUgT{H1Cq_-?eS|zupGwy|y~s$h_l*z}0B{S9q!F<@Y$L0{DCR zsHWFPagnbBXB8FVBfFuPR-Ga{zr)w8$MUp+9;>a}oR1LN*^@^MYg-rEcP32T@Ow83 z*iqZk3}J3%GHA0u5}o(z3%18X1e=4ee~G_iWW-Mc=zLUmd%hlt^w+&!kDE-R6izKz zE7%`x1kNKV-Q=AW&-C;=J3rvqxs*RO^Y>JuxZ${RqLblNd<-v{L)1g#ov)Yp>Daze z@2?r_a716i(kgp+E-R~&kgSymkNPlX`{Rg%h~};;=c^Or2~elI)Q_P5fkB4WF|hT?NT@j8UwDreVL*M4m+AY zd-70b)+hc(=Rgx%8S|y7DxzF`_F2jxZ)$DJGzGEwK^AFNRcHvit16-9Q-I^w)BCyF z9}!;cQa8adOA+4Vr??VJLAt`Rj=$oc4zBF4?eIH7I@Z}&RxJmGU2T_1@!P3|&Qxkn b+6<|~zTHh~RUWwEJCskqqp@dBo#wv*>AuD& diff --git a/pkg/local_object_storage/localstore/localstore.proto b/pkg/local_object_storage/localstore/localstore.proto deleted file mode 100644 index b1fd6067..00000000 --- a/pkg/local_object_storage/localstore/localstore.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; -option go_package = "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"; - -package localstore; - -import "object/types.proto"; -import "github.com/gogo/protobuf/gogoproto/gogo.proto"; - -message ObjectMeta { - object.Object Object = 1; - bytes PayloadHash = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "Hash"]; - uint64 PayloadSize = 3; - uint64 StoreEpoch = 4; -} diff --git a/pkg/local_object_storage/localstore/localstore_test.go b/pkg/local_object_storage/localstore/localstore_test.go deleted file mode 100644 index 761e2739..00000000 --- a/pkg/local_object_storage/localstore/localstore_test.go +++ /dev/null @@ -1,418 +0,0 @@ -package localstore - -import ( - "context" - "sync" - "testing" - - "github.com/google/uuid" - "github.com/nspcc-dev/neofs-api-go/container" - "github.com/nspcc-dev/neofs-api-go/hash" - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/test" - meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -type ( - fakeCollector struct { - sync.Mutex - items map[refs.CID]uint64 - } -) - -func (f *fakeCollector) Start(_ context.Context) { panic("implement me") } -func (f *fakeCollector) UpdateSpaceUsage() { panic("implement me") } -func (f *fakeCollector) SetIterator(_ meta2.Iterator) { panic("implement me") } -func (f *fakeCollector) SetCounter(counter metrics2.ObjectCounter) { panic("implement me") } - -func (f *fakeCollector) UpdateContainer(cid refs.CID, size uint64, op metrics2.SpaceOp) { - f.Lock() - defer f.Unlock() - - switch op { - case metrics2.AddSpace: - f.items[cid] += size - case metrics2.RemSpace: - if val, ok := f.items[cid]; !ok || val < size { - return - } - - f.items[cid] -= size - default: - return - } -} - -func newCollector() metrics2.Collector { - return &fakeCollector{ - items: make(map[refs.CID]uint64), - } -} - -func testObject(t *testing.T) *Object { - var ( - uid refs.UUID - cid CID - ) - - t.Run("Prepare object", func(t *testing.T) { - cnr, err := container.NewTestContainer() - require.NoError(t, err) - - cid, err = cnr.ID() - require.NoError(t, err) - - id, err := uuid.NewRandom() - uid = refs.UUID(id) - require.NoError(t, err) - }) - - obj := &Object{ - SystemHeader: object.SystemHeader{ - Version: 1, - ID: uid, - CID: cid, - OwnerID: refs.OwnerID([refs.OwnerIDSize]byte{}), // TODO: avoid hardcode - }, - Headers: []Header{ - { - Value: &object.Header_UserHeader{ - UserHeader: &object.UserHeader{ - Key: "Profession", - Value: "Developer", - }, - }, - }, - { - Value: &object.Header_UserHeader{ - UserHeader: &object.UserHeader{ - Key: "Language", - Value: "GO", - }, - }, - }, - }, - } - - return obj -} - -func newLocalstore(t *testing.T) Localstore { - ls, err := New(Params{ - BlobBucket: test.Bucket(), - MetaBucket: test.Bucket(), - Logger: zap.L(), - Collector: newCollector(), - }) - require.NoError(t, err) - - return ls -} - -func TestNew(t *testing.T) { - t.Run("New localstore", func(t *testing.T) { - var err error - - _, err = New(Params{}) - require.Error(t, err) - - _, err = New(Params{ - BlobBucket: test.Bucket(), - MetaBucket: test.Bucket(), - Logger: zap.L(), - Collector: newCollector(), - }) - require.NoError(t, err) - }) -} - -func TestLocalstore_Del(t *testing.T) { - t.Run("Del method", func(t *testing.T) { - var ( - err error - ls Localstore - obj *Object - ) - - ls = newLocalstore(t) - - obj = testObject(t) - obj.SetPayload([]byte("Hello, world")) - - k := *obj.Address() - - store, ok := ls.(*localstore) - require.True(t, ok) - require.NotNil(t, store) - - metric, ok := store.col.(*fakeCollector) - require.True(t, ok) - require.NotNil(t, metric) - - err = ls.Put(context.Background(), obj) - require.NoError(t, err) - require.NotEmpty(t, obj.Payload) - require.Contains(t, metric.items, obj.SystemHeader.CID) - require.Equal(t, obj.SystemHeader.PayloadLength, metric.items[obj.SystemHeader.CID]) - - err = ls.Del(k) - require.NoError(t, err) - require.Contains(t, metric.items, obj.SystemHeader.CID) - require.Equal(t, uint64(0), metric.items[obj.SystemHeader.CID]) - - _, err = ls.Get(k) - require.Error(t, err) - }) -} - -func TestLocalstore_Get(t *testing.T) { - t.Run("Get method (default)", func(t *testing.T) { - var ( - err error - ls Localstore - obj *Object - ) - - ls = newLocalstore(t) - - obj = testObject(t) - - err = ls.Put(context.Background(), obj) - require.NoError(t, err) - - k := *obj.Address() - - o, err := ls.Get(k) - require.NoError(t, err) - require.Equal(t, obj, o) - }) -} - -func TestLocalstore_Put(t *testing.T) { - t.Run("Put method", func(t *testing.T) { - var ( - err error - ls Localstore - obj *Object - ) - - ls = newLocalstore(t) - store, ok := ls.(*localstore) - require.True(t, ok) - require.NotNil(t, store) - - metric, ok := store.col.(*fakeCollector) - require.True(t, ok) - require.NotNil(t, metric) - - obj = testObject(t) - - err = ls.Put(context.Background(), obj) - require.NoError(t, err) - require.Contains(t, metric.items, obj.SystemHeader.CID) - require.Equal(t, obj.SystemHeader.PayloadLength, metric.items[obj.SystemHeader.CID]) - - o, err := ls.Get(*obj.Address()) - require.NoError(t, err) - require.Equal(t, obj, o) - }) -} - -func TestLocalstore_List(t *testing.T) { - t.Run("List method (no filters)", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - items, err := ListItems(ls, nil) - require.NoError(t, err) - require.Len(t, items, objCount) - - for i := range items { - require.Contains(t, objs, *items[i].Object) - } - }) - - t.Run("List method ('bad' filter)", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - items, err := ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: ContainerFilterFunc([]CID{}), - })) - require.NoError(t, err) - require.Len(t, items, 0) - }) - - t.Run("List method (filter by cid)", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - cidVals := []CID{objs[0].SystemHeader.CID} - - items, err := ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: ContainerFilterFunc(cidVals), - })) - require.NoError(t, err) - require.Len(t, items, 1) - - for i := range items { - require.Contains(t, objs, *items[i].Object) - } - }) - - t.Run("Filter stored earlier", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - epoch uint64 = 100 - list []ListItem - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - ctx := context.WithValue(context.Background(), StoreEpochValue, epoch) - - for i := range objs { - err = ls.Put(ctx, &objs[i]) - require.NoError(t, err) - } - - list, err = ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: StoredEarlierThanFilterFunc(epoch - 1), - })) - require.NoError(t, err) - require.Empty(t, list) - - list, err = ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: StoredEarlierThanFilterFunc(epoch), - })) - require.NoError(t, err) - require.Empty(t, list) - - list, err = ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: StoredEarlierThanFilterFunc(epoch + 1), - })) - require.NoError(t, err) - require.Len(t, list, objCount) - }) - - t.Run("Filter with complex filter", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - cidVals := []CID{objs[0].SystemHeader.CID} - - mainF, err := AllPassIncludingFilter("TEST_FILTER", &FilterParams{ - Name: "CID_LIST", - FilterFunc: ContainerFilterFunc(cidVals), - }) - - items, err := ListItems(ls, mainF) - require.NoError(t, err) - require.Len(t, items, 1) - }) - - t.Run("Meta info", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - epoch uint64 = 100 - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - ctx := context.WithValue(context.Background(), StoreEpochValue, epoch) - - for i := range objs { - err = ls.Put(ctx, &objs[i]) - require.NoError(t, err) - - meta, err := ls.Meta(*objs[i].Address()) - require.NoError(t, err) - - noPayload := objs[i] - noPayload.Payload = nil - - require.Equal(t, *meta.Object, noPayload) - require.Equal(t, meta.PayloadHash, hash.Sum(objs[i].Payload)) - require.Equal(t, meta.PayloadSize, uint64(len(objs[i].Payload))) - require.Equal(t, epoch, meta.StoreEpoch) - } - }) -} diff --git a/pkg/local_object_storage/localstore/meta.go b/pkg/local_object_storage/localstore/meta.go index ba1acd14..e270f84e 100644 --- a/pkg/local_object_storage/localstore/meta.go +++ b/pkg/local_object_storage/localstore/meta.go @@ -1,52 +1,82 @@ package localstore import ( - "context" + "encoding/binary" + "io" - "github.com/nspcc-dev/neofs-api-go/hash" - "github.com/nspcc-dev/neofs-api-go/refs" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/pkg/errors" ) -// StoreEpochValue is a context key of object storing epoch number. -const StoreEpochValue = "store epoch" +// ObjectMeta represents meta information about +// the object that is stored in meta storage. +type ObjectMeta struct { + head *object.Object -func (l *localstore) Meta(key refs.Address) (*ObjectMeta, error) { - var ( - err error - meta ObjectMeta - k, v []byte - ) - - k, err = key.Hash() - if err != nil { - return nil, errors.Wrap(err, "Localstore Meta failed on key.Marshal") - } - - v, err = l.metaBucket.Get(k) - if err != nil { - return nil, errors.Wrap(err, "Localstore Meta failed on metaBucket.Get") - } - - if err := meta.Unmarshal(v); err != nil { - return nil, errors.Wrap(err, "Localstore Metafailed on ObjectMeta.Unmarshal") - } - - return &meta, nil + savedAtEpoch uint64 } -func metaFromObject(ctx context.Context, obj *Object) *ObjectMeta { - meta := new(ObjectMeta) - o := *obj - meta.Object = &o - meta.Object.Payload = nil - meta.PayloadSize = uint64(len(obj.Payload)) - meta.PayloadHash = hash.Sum(obj.Payload) +// SavedAtEpoch returns the number of epoch +// at which the object was saved locally. +func (m *ObjectMeta) SavedAtEpoch() uint64 { + if m != nil { + return m.savedAtEpoch + } - storeEpoch, ok := ctx.Value(StoreEpochValue).(uint64) - if ok { - meta.StoreEpoch = storeEpoch + return 0 +} + +// Head returns the object w/o payload. +func (m *ObjectMeta) Head() *object.Object { + if m != nil { + return m.head + } + + return nil +} + +// AddressFromMeta extracts the Address from object meta. +func AddressFromMeta(m *ObjectMeta) *object.Address { + return m.Head().Address() +} + +func metaFromObject(o *object.Object) *ObjectMeta { + // FIXME: remove hard-code + meta := new(ObjectMeta) + meta.savedAtEpoch = 10 + meta.head = &object.Object{ + Object: o.CutPayload(), } return meta } + +func metaToBytes(m *ObjectMeta) ([]byte, error) { + data := make([]byte, 8) + + binary.BigEndian.PutUint64(data, m.savedAtEpoch) + + addrBytes, err := m.head.MarshalStableV2() + if err != nil { + return nil, err + } + + return append(data, addrBytes...), nil +} + +func metaFromBytes(data []byte) (*ObjectMeta, error) { + if len(data) < 8 { + return nil, io.ErrUnexpectedEOF + } + + obj, err := object.FromBytes(data[8:]) + if err != nil { + return nil, errors.Wrap(err, "could not get object address from bytes") + } + + meta := new(ObjectMeta) + meta.savedAtEpoch = binary.BigEndian.Uint64(data) + meta.head = obj + + return meta, nil +} diff --git a/pkg/local_object_storage/localstore/methods.go b/pkg/local_object_storage/localstore/methods.go new file mode 100644 index 00000000..39521abb --- /dev/null +++ b/pkg/local_object_storage/localstore/methods.go @@ -0,0 +1,107 @@ +package localstore + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +func (s *Storage) Put(obj *object.Object) error { + addrBytes, err := obj.Address().MarshalStableV2() + if err != nil { + return errors.Wrap(err, "could not marshal object address") + } + + objBytes, err := obj.MarshalStableV2() + if err != nil { + return errors.Wrap(err, "could not marshal the object") + } + + metaBytes, err := metaToBytes(metaFromObject(obj)) + if err != nil { + return errors.Wrap(err, "could not marshal object meta") + } + + if err := s.blobBucket.Set(addrBytes, objBytes); err != nil { + return errors.Wrap(err, "could no save object in BLOB storage") + } + + if err := s.metaBucket.Set(addrBytes, metaBytes); err != nil { + return errors.Wrap(err, "could not save object in meta storage") + } + + return nil +} + +func (s *Storage) Delete(addr *object.Address) error { + addrBytes, err := addr.MarshalStableV2() + if err != nil { + return errors.Wrap(err, "could not marshal object address") + } + + if err := s.blobBucket.Del(addrBytes); err != nil { + s.log.Warn("could not remove object from BLOB storage", + zap.Error(err), + ) + } + + if err := s.metaBucket.Del(addrBytes); err != nil { + return errors.Wrap(err, "could not remove object from meta storage") + } + + return nil +} + +func (s *Storage) Get(addr *object.Address) (*object.Object, error) { + addrBytes, err := addr.MarshalStableV2() + if err != nil { + return nil, errors.Wrap(err, "could not marshal object address") + } + + objBytes, err := s.blobBucket.Get(addrBytes) + if err != nil { + return nil, errors.Wrap(err, "could not get object from BLOB storage") + } + + return object.FromBytes(objBytes) +} + +func (s *Storage) Head(addr *object.Address) (*ObjectMeta, error) { + addrBytes, err := addr.MarshalStableV2() + if err != nil { + return nil, errors.Wrap(err, "could not marshal object address") + } + + metaBytes, err := s.metaBucket.Get(addrBytes) + if err != nil { + return nil, errors.Wrap(err, "could not get object from meta storage") + } + + return metaFromBytes(metaBytes) +} + +func (s *Storage) Iterate(filter FilterPipeline, handler func(*ObjectMeta) bool) error { + if filter == nil { + filter = NewFilter(&FilterParams{ + Name: "SKIPPING_FILTER", + FilterFunc: func(context.Context, *ObjectMeta) *FilterResult { + return ResultPass() + }, + }) + } + + return s.metaBucket.Iterate(func(_, v []byte) bool { + meta, err := metaFromBytes(v) + if err != nil { + s.log.Error("unmarshal meta bucket item failure", + zap.Error(err), + ) + } else if filter.Pass(context.TODO(), meta).Code() == CodePass { + return !handler(meta) + } + + return true + }) +} diff --git a/pkg/local_object_storage/localstore/put.go b/pkg/local_object_storage/localstore/put.go deleted file mode 100644 index bc18475d..00000000 --- a/pkg/local_object_storage/localstore/put.go +++ /dev/null @@ -1,47 +0,0 @@ -package localstore - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/refs" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/pkg/errors" -) - -func (l *localstore) Put(ctx context.Context, obj *Object) error { - var ( - oa refs.Address - k, v []byte - err error - ) - - oa = *obj.Address() - k, err = oa.Hash() - - if err != nil { - return errors.Wrap(err, "Localstore Put failed on StorageKey.marshal") - } - - if v, err = obj.Marshal(); err != nil { - return errors.Wrap(err, "Localstore Put failed on blobValue") - } - - if err = l.blobBucket.Set(k, v); err != nil { - return errors.Wrap(err, "Localstore Put failed on BlobBucket.Set") - } - - if v, err = metaFromObject(ctx, obj).Marshal(); err != nil { - return errors.Wrap(err, "Localstore Put failed on metaValue") - } - - if err = l.metaBucket.Set(k, v); err != nil { - return errors.Wrap(err, "Localstore Put failed on MetaBucket.Set") - } - - l.col.UpdateContainer( - obj.SystemHeader.CID, - obj.SystemHeader.PayloadLength, - metrics2.AddSpace) - - return nil -} diff --git a/pkg/local_object_storage/localstore/range.go b/pkg/local_object_storage/localstore/range.go deleted file mode 100644 index 05e43f53..00000000 --- a/pkg/local_object_storage/localstore/range.go +++ /dev/null @@ -1,36 +0,0 @@ -package localstore - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/pkg/errors" -) - -func (l *localstore) PRead(ctx context.Context, key Address, rng object.Range) ([]byte, error) { - var ( - err error - k, v []byte - obj Object - ) - - k, err = key.Hash() - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on key.Marshal") - } - - v, err = l.blobBucket.Get(k) - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on blobBucket.Get") - } - - if err := obj.Unmarshal(v); err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on object.Unmarshal") - } - - if rng.Offset+rng.Length > uint64(len(obj.Payload)) { - return nil, ErrOutOfRange - } - - return obj.Payload[rng.Offset : rng.Offset+rng.Length], nil -} diff --git a/pkg/local_object_storage/localstore/storage.go b/pkg/local_object_storage/localstore/storage.go new file mode 100644 index 00000000..284ac385 --- /dev/null +++ b/pkg/local_object_storage/localstore/storage.go @@ -0,0 +1,53 @@ +package localstore + +import ( + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Storage represents NeoFS local object storage. +type Storage struct { + log *logger.Logger + + metaBucket bucket.Bucket + + blobBucket bucket.Bucket +} + +// Option is an option of Storage constructor. +type Option func(*cfg) + +type cfg struct { + logger *logger.Logger +} + +func defaultCfg() *cfg { + return &cfg{ + logger: zap.L(), + } +} + +// New is a local object storage constructor. +func New(blob, meta bucket.Bucket, opts ...Option) *Storage { + cfg := defaultCfg() + + for i := range opts { + opts[i](cfg) + } + + return &Storage{ + metaBucket: meta, + blobBucket: blob, + log: cfg.logger, + } +} + +// WithLogger returns Storage option of used logger. +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + if l != nil { + c.logger = l + } + } +}