diff --git a/pkg/local_object_storage/localstore/alias.go b/pkg/local_object_storage/localstore/alias.go deleted file mode 100644 index 03053f48b..000000000 --- a/pkg/local_object_storage/localstore/alias.go +++ /dev/null @@ -1,35 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/hash" - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-api-go/refs" -) - -// CID is a type alias of -// CID from refs package of neofs-api-go. -type CID = refs.CID - -// SGID is a type alias of -// SGID from refs package of neofs-api-go. -type SGID = refs.ObjectID - -// Header is a type alias of -// Header from object package of neofs-api-go. -type Header = object.Header - -// Object is a type alias of -// Object from object package of neofs-api-go. -type Object = object.Object - -// ObjectID is a type alias of -// ObjectID from refs package of neofs-api-go. -type ObjectID = refs.ObjectID - -// Address is a type alias of -// Address from refs package of neofs-api-go. -type Address = refs.Address - -// Hash is a type alias of -// Hash from hash package of neofs-api-go. -type Hash = hash.Hash diff --git a/pkg/local_object_storage/localstore/del.go b/pkg/local_object_storage/localstore/del.go deleted file mode 100644 index 1a1859f1f..000000000 --- a/pkg/local_object_storage/localstore/del.go +++ /dev/null @@ -1,38 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/refs" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -func (l *localstore) Del(key refs.Address) error { - k, err := key.Hash() - if err != nil { - return errors.Wrap(err, "Localstore Del failed on key.Marshal") - } - - // try to fetch object for metrics - obj, err := l.Get(key) - if err != nil { - l.log.Warn("localstore Del failed on localstore.Get", zap.Error(err)) - } - - if err := l.blobBucket.Del(k); err != nil { - l.log.Warn("Localstore Del failed on BlobBucket.Del", zap.Error(err)) - } - - if err := l.metaBucket.Del(k); err != nil { - return errors.Wrap(err, "Localstore Del failed on MetaBucket.Del") - } - - if obj != nil { - l.col.UpdateContainer( - key.CID, - obj.SystemHeader.PayloadLength, - metrics2.RemSpace) - } - - return nil -} diff --git a/pkg/local_object_storage/localstore/filter.go b/pkg/local_object_storage/localstore/filter.go index f27a37674..7f7891bdd 100644 --- a/pkg/local_object_storage/localstore/filter.go +++ b/pkg/local_object_storage/localstore/filter.go @@ -175,27 +175,6 @@ func NewFilter(p *FilterParams) FilterPipeline { } } -// AllPassIncludingFilter returns FilterPipeline with sub-filters composed from parameters. -// Result filter fails with CodeFail code if any of the sub-filters returns not a CodePass code. -func AllPassIncludingFilter(name string, params ...*FilterParams) (FilterPipeline, error) { - res := NewFilter(&FilterParams{ - Name: name, - FilterFunc: SkippingFilterFunc, - }) - - for i := range params { - if err := res.PutSubFilter(SubFilterParams{ - FilterPipeline: NewFilter(params[i]), - OnIgnore: CodeFail, - OnFail: CodeFail, - }); err != nil { - return nil, errors.Wrap(err, "could not create all pass including filter") - } - } - - return res, nil -} - func (p *filterPipeline) Pass(ctx context.Context, meta *ObjectMeta) *FilterResult { p.RLock() defer p.RUnlock() diff --git a/pkg/local_object_storage/localstore/filter_funcs.go b/pkg/local_object_storage/localstore/filter_funcs.go deleted file mode 100644 index c92610c20..000000000 --- a/pkg/local_object_storage/localstore/filter_funcs.go +++ /dev/null @@ -1,39 +0,0 @@ -package localstore - -import ( - "context" -) - -// SkippingFilterFunc is a FilterFunc that always returns result with -// CodePass code and nil error. -func SkippingFilterFunc(_ context.Context, _ *ObjectMeta) *FilterResult { - return ResultPass() -} - -// ContainerFilterFunc returns a FilterFunc that returns: -// - result with CodePass code and nil error if CID of ObjectMeta if from the CID list; -// - result with CodeFail code an nil error otherwise. -func ContainerFilterFunc(cidList []CID) FilterFunc { - return func(_ context.Context, meta *ObjectMeta) *FilterResult { - for i := range cidList { - if meta.Object.SystemHeader.CID.Equal(cidList[i]) { - return ResultPass() - } - } - - return ResultFail() - } -} - -// StoredEarlierThanFilterFunc returns a FilterFunc that returns: -// - result with CodePass code and nil error if StoreEpoch is less that argument; -// - result with CodeFail code and nil error otherwise. -func StoredEarlierThanFilterFunc(epoch uint64) FilterFunc { - return func(_ context.Context, meta *ObjectMeta) *FilterResult { - if meta.StoreEpoch < epoch { - return ResultPass() - } - - return ResultFail() - } -} diff --git a/pkg/local_object_storage/localstore/filter_test.go b/pkg/local_object_storage/localstore/filter_test.go index 2da66b7f5..96fd8c6a3 100644 --- a/pkg/local_object_storage/localstore/filter_test.go +++ b/pkg/local_object_storage/localstore/filter_test.go @@ -1,18 +1,12 @@ package localstore import ( - "context" "errors" "testing" "github.com/stretchr/testify/require" ) -func TestSkippingFilterFunc(t *testing.T) { - res := SkippingFilterFunc(context.TODO(), &ObjectMeta{}) - require.Equal(t, CodePass, res.Code()) -} - func TestFilterResult(t *testing.T) { var ( r *FilterResult diff --git a/pkg/local_object_storage/localstore/get.go b/pkg/local_object_storage/localstore/get.go deleted file mode 100644 index 4e4090f48..000000000 --- a/pkg/local_object_storage/localstore/get.go +++ /dev/null @@ -1,30 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/pkg/errors" -) - -func (l *localstore) Get(key refs.Address) (*Object, error) { - var ( - err error - k, v []byte - o = new(Object) - ) - - k, err = key.Hash() - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on key.Marshal") - } - - v, err = l.blobBucket.Get(k) - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on blobBucket.Get") - } - - if err = o.Unmarshal(v); err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on Object.Unmarshal") - } - - return o, nil -} diff --git a/pkg/local_object_storage/localstore/has.go b/pkg/local_object_storage/localstore/has.go deleted file mode 100644 index 831e77def..000000000 --- a/pkg/local_object_storage/localstore/has.go +++ /dev/null @@ -1,20 +0,0 @@ -package localstore - -import ( - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/pkg/errors" -) - -func (l *localstore) Has(key refs.Address) (bool, error) { - var ( - err error - k []byte - ) - - k, err = key.Hash() - if err != nil { - return false, errors.Wrap(err, "localstore.Has failed on key.Marshal") - } - - return l.metaBucket.Has(k) && l.blobBucket.Has(k), nil -} diff --git a/pkg/local_object_storage/localstore/interface.go b/pkg/local_object_storage/localstore/interface.go deleted file mode 100644 index 236c8952c..000000000 --- a/pkg/local_object_storage/localstore/interface.go +++ /dev/null @@ -1,102 +0,0 @@ -package localstore - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type ( - // Localstore is an interface of local object storage. - Localstore interface { - Put(context.Context, *Object) error - Get(Address) (*Object, error) - Del(Address) error - Meta(Address) (*ObjectMeta, error) - Iterator - Has(Address) (bool, error) - ObjectsCount() (uint64, error) - - object.PositionReader - Size() int64 - } - - // MetaHandler is a function that handles ObjectMeta. - MetaHandler func(*ObjectMeta) bool - - // Iterator is an interface of the iterator over local object storage. - Iterator interface { - Iterate(FilterPipeline, MetaHandler) error - } - - // ListItem is an ObjectMeta wrapper. - ListItem struct { - ObjectMeta - } - - // Params groups the parameters of - // local object storage constructor. - Params struct { - BlobBucket bucket.Bucket - MetaBucket bucket.Bucket - Logger *zap.Logger - Collector metrics2.Collector - } - - localstore struct { - metaBucket bucket.Bucket - blobBucket bucket.Bucket - - log *zap.Logger - col metrics2.Collector - } -) - -// ErrOutOfRange is returned when requested object payload range is -// out of object payload bounds. -var ErrOutOfRange = errors.New("range is out of payload bounds") - -// ErrEmptyMetaHandler is returned by functions that expect -// a non-nil MetaHandler, but received nil. -var ErrEmptyMetaHandler = errors.New("meta handler is nil") - -var errNilLogger = errors.New("logger is nil") - -var errNilCollector = errors.New("metrics collector is nil") - -// New is a local object storage constructor. -func New(p Params) (Localstore, error) { - switch { - case p.MetaBucket == nil: - return nil, errors.New("meta bucket is nil") - case p.BlobBucket == nil: - return nil, errors.New("blob bucket is nil") - case p.Logger == nil: - return nil, errNilLogger - case p.Collector == nil: - return nil, errNilCollector - } - - return &localstore{ - metaBucket: p.MetaBucket, - blobBucket: p.BlobBucket, - log: p.Logger, - col: p.Collector, - }, nil -} - -func (l localstore) Size() int64 { return l.blobBucket.Size() } - -// TODO: implement less costly method of counting. -func (l localstore) ObjectsCount() (uint64, error) { - items, err := l.metaBucket.List() - if err != nil { - return 0, err - } - - return uint64(len(items)), nil -} diff --git a/pkg/local_object_storage/localstore/list.go b/pkg/local_object_storage/localstore/list.go deleted file mode 100644 index c4e1ec62c..000000000 --- a/pkg/local_object_storage/localstore/list.go +++ /dev/null @@ -1,41 +0,0 @@ -package localstore - -import ( - "context" - - "go.uber.org/zap" -) - -func (l *localstore) Iterate(filter FilterPipeline, handler MetaHandler) error { - if handler == nil { - return ErrEmptyMetaHandler - } else if filter == nil { - filter = NewFilter(&FilterParams{ - Name: "SKIPPING_FILTER", - FilterFunc: SkippingFilterFunc, - }) - } - - return l.metaBucket.Iterate(func(_, v []byte) bool { - meta := new(ObjectMeta) - if err := meta.Unmarshal(v); err != nil { - l.log.Error("unmarshal meta bucket item failure", zap.Error(err)) - } else if filter.Pass(context.TODO(), meta).Code() == CodePass { - return !handler(meta) - } - return true - }) -} - -// ListItems iterates over Iterator with FilterPipeline and returns all passed items. -func ListItems(it Iterator, f FilterPipeline) ([]ListItem, error) { - res := make([]ListItem, 0) - err := it.Iterate(f, func(meta *ObjectMeta) (stop bool) { - res = append(res, ListItem{ - ObjectMeta: *meta, - }) - return - }) - - return res, err -} diff --git a/pkg/local_object_storage/localstore/localstore.pb.go b/pkg/local_object_storage/localstore/localstore.pb.go deleted file mode 100644 index 8700f28ea..000000000 Binary files a/pkg/local_object_storage/localstore/localstore.pb.go and /dev/null differ diff --git a/pkg/local_object_storage/localstore/localstore.proto b/pkg/local_object_storage/localstore/localstore.proto deleted file mode 100644 index b1fd60674..000000000 --- a/pkg/local_object_storage/localstore/localstore.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; -option go_package = "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"; - -package localstore; - -import "object/types.proto"; -import "github.com/gogo/protobuf/gogoproto/gogo.proto"; - -message ObjectMeta { - object.Object Object = 1; - bytes PayloadHash = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "Hash"]; - uint64 PayloadSize = 3; - uint64 StoreEpoch = 4; -} diff --git a/pkg/local_object_storage/localstore/localstore_test.go b/pkg/local_object_storage/localstore/localstore_test.go deleted file mode 100644 index 761e2739c..000000000 --- a/pkg/local_object_storage/localstore/localstore_test.go +++ /dev/null @@ -1,418 +0,0 @@ -package localstore - -import ( - "context" - "sync" - "testing" - - "github.com/google/uuid" - "github.com/nspcc-dev/neofs-api-go/container" - "github.com/nspcc-dev/neofs-api-go/hash" - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/test" - meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -type ( - fakeCollector struct { - sync.Mutex - items map[refs.CID]uint64 - } -) - -func (f *fakeCollector) Start(_ context.Context) { panic("implement me") } -func (f *fakeCollector) UpdateSpaceUsage() { panic("implement me") } -func (f *fakeCollector) SetIterator(_ meta2.Iterator) { panic("implement me") } -func (f *fakeCollector) SetCounter(counter metrics2.ObjectCounter) { panic("implement me") } - -func (f *fakeCollector) UpdateContainer(cid refs.CID, size uint64, op metrics2.SpaceOp) { - f.Lock() - defer f.Unlock() - - switch op { - case metrics2.AddSpace: - f.items[cid] += size - case metrics2.RemSpace: - if val, ok := f.items[cid]; !ok || val < size { - return - } - - f.items[cid] -= size - default: - return - } -} - -func newCollector() metrics2.Collector { - return &fakeCollector{ - items: make(map[refs.CID]uint64), - } -} - -func testObject(t *testing.T) *Object { - var ( - uid refs.UUID - cid CID - ) - - t.Run("Prepare object", func(t *testing.T) { - cnr, err := container.NewTestContainer() - require.NoError(t, err) - - cid, err = cnr.ID() - require.NoError(t, err) - - id, err := uuid.NewRandom() - uid = refs.UUID(id) - require.NoError(t, err) - }) - - obj := &Object{ - SystemHeader: object.SystemHeader{ - Version: 1, - ID: uid, - CID: cid, - OwnerID: refs.OwnerID([refs.OwnerIDSize]byte{}), // TODO: avoid hardcode - }, - Headers: []Header{ - { - Value: &object.Header_UserHeader{ - UserHeader: &object.UserHeader{ - Key: "Profession", - Value: "Developer", - }, - }, - }, - { - Value: &object.Header_UserHeader{ - UserHeader: &object.UserHeader{ - Key: "Language", - Value: "GO", - }, - }, - }, - }, - } - - return obj -} - -func newLocalstore(t *testing.T) Localstore { - ls, err := New(Params{ - BlobBucket: test.Bucket(), - MetaBucket: test.Bucket(), - Logger: zap.L(), - Collector: newCollector(), - }) - require.NoError(t, err) - - return ls -} - -func TestNew(t *testing.T) { - t.Run("New localstore", func(t *testing.T) { - var err error - - _, err = New(Params{}) - require.Error(t, err) - - _, err = New(Params{ - BlobBucket: test.Bucket(), - MetaBucket: test.Bucket(), - Logger: zap.L(), - Collector: newCollector(), - }) - require.NoError(t, err) - }) -} - -func TestLocalstore_Del(t *testing.T) { - t.Run("Del method", func(t *testing.T) { - var ( - err error - ls Localstore - obj *Object - ) - - ls = newLocalstore(t) - - obj = testObject(t) - obj.SetPayload([]byte("Hello, world")) - - k := *obj.Address() - - store, ok := ls.(*localstore) - require.True(t, ok) - require.NotNil(t, store) - - metric, ok := store.col.(*fakeCollector) - require.True(t, ok) - require.NotNil(t, metric) - - err = ls.Put(context.Background(), obj) - require.NoError(t, err) - require.NotEmpty(t, obj.Payload) - require.Contains(t, metric.items, obj.SystemHeader.CID) - require.Equal(t, obj.SystemHeader.PayloadLength, metric.items[obj.SystemHeader.CID]) - - err = ls.Del(k) - require.NoError(t, err) - require.Contains(t, metric.items, obj.SystemHeader.CID) - require.Equal(t, uint64(0), metric.items[obj.SystemHeader.CID]) - - _, err = ls.Get(k) - require.Error(t, err) - }) -} - -func TestLocalstore_Get(t *testing.T) { - t.Run("Get method (default)", func(t *testing.T) { - var ( - err error - ls Localstore - obj *Object - ) - - ls = newLocalstore(t) - - obj = testObject(t) - - err = ls.Put(context.Background(), obj) - require.NoError(t, err) - - k := *obj.Address() - - o, err := ls.Get(k) - require.NoError(t, err) - require.Equal(t, obj, o) - }) -} - -func TestLocalstore_Put(t *testing.T) { - t.Run("Put method", func(t *testing.T) { - var ( - err error - ls Localstore - obj *Object - ) - - ls = newLocalstore(t) - store, ok := ls.(*localstore) - require.True(t, ok) - require.NotNil(t, store) - - metric, ok := store.col.(*fakeCollector) - require.True(t, ok) - require.NotNil(t, metric) - - obj = testObject(t) - - err = ls.Put(context.Background(), obj) - require.NoError(t, err) - require.Contains(t, metric.items, obj.SystemHeader.CID) - require.Equal(t, obj.SystemHeader.PayloadLength, metric.items[obj.SystemHeader.CID]) - - o, err := ls.Get(*obj.Address()) - require.NoError(t, err) - require.Equal(t, obj, o) - }) -} - -func TestLocalstore_List(t *testing.T) { - t.Run("List method (no filters)", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - items, err := ListItems(ls, nil) - require.NoError(t, err) - require.Len(t, items, objCount) - - for i := range items { - require.Contains(t, objs, *items[i].Object) - } - }) - - t.Run("List method ('bad' filter)", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - items, err := ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: ContainerFilterFunc([]CID{}), - })) - require.NoError(t, err) - require.Len(t, items, 0) - }) - - t.Run("List method (filter by cid)", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - cidVals := []CID{objs[0].SystemHeader.CID} - - items, err := ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: ContainerFilterFunc(cidVals), - })) - require.NoError(t, err) - require.Len(t, items, 1) - - for i := range items { - require.Contains(t, objs, *items[i].Object) - } - }) - - t.Run("Filter stored earlier", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - epoch uint64 = 100 - list []ListItem - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - ctx := context.WithValue(context.Background(), StoreEpochValue, epoch) - - for i := range objs { - err = ls.Put(ctx, &objs[i]) - require.NoError(t, err) - } - - list, err = ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: StoredEarlierThanFilterFunc(epoch - 1), - })) - require.NoError(t, err) - require.Empty(t, list) - - list, err = ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: StoredEarlierThanFilterFunc(epoch), - })) - require.NoError(t, err) - require.Empty(t, list) - - list, err = ListItems(ls, NewFilter(&FilterParams{ - FilterFunc: StoredEarlierThanFilterFunc(epoch + 1), - })) - require.NoError(t, err) - require.Len(t, list, objCount) - }) - - t.Run("Filter with complex filter", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - for i := range objs { - err = ls.Put(context.Background(), &objs[i]) - require.NoError(t, err) - } - - cidVals := []CID{objs[0].SystemHeader.CID} - - mainF, err := AllPassIncludingFilter("TEST_FILTER", &FilterParams{ - Name: "CID_LIST", - FilterFunc: ContainerFilterFunc(cidVals), - }) - - items, err := ListItems(ls, mainF) - require.NoError(t, err) - require.Len(t, items, 1) - }) - - t.Run("Meta info", func(t *testing.T) { - var ( - err error - ls Localstore - objCount = 10 - objs = make([]Object, objCount) - epoch uint64 = 100 - ) - - for i := range objs { - objs[i] = *testObject(t) - } - - ls = newLocalstore(t) - - ctx := context.WithValue(context.Background(), StoreEpochValue, epoch) - - for i := range objs { - err = ls.Put(ctx, &objs[i]) - require.NoError(t, err) - - meta, err := ls.Meta(*objs[i].Address()) - require.NoError(t, err) - - noPayload := objs[i] - noPayload.Payload = nil - - require.Equal(t, *meta.Object, noPayload) - require.Equal(t, meta.PayloadHash, hash.Sum(objs[i].Payload)) - require.Equal(t, meta.PayloadSize, uint64(len(objs[i].Payload))) - require.Equal(t, epoch, meta.StoreEpoch) - } - }) -} diff --git a/pkg/local_object_storage/localstore/meta.go b/pkg/local_object_storage/localstore/meta.go index ba1acd14b..e270f84eb 100644 --- a/pkg/local_object_storage/localstore/meta.go +++ b/pkg/local_object_storage/localstore/meta.go @@ -1,52 +1,82 @@ package localstore import ( - "context" + "encoding/binary" + "io" - "github.com/nspcc-dev/neofs-api-go/hash" - "github.com/nspcc-dev/neofs-api-go/refs" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/pkg/errors" ) -// StoreEpochValue is a context key of object storing epoch number. -const StoreEpochValue = "store epoch" +// ObjectMeta represents meta information about +// the object that is stored in meta storage. +type ObjectMeta struct { + head *object.Object -func (l *localstore) Meta(key refs.Address) (*ObjectMeta, error) { - var ( - err error - meta ObjectMeta - k, v []byte - ) - - k, err = key.Hash() - if err != nil { - return nil, errors.Wrap(err, "Localstore Meta failed on key.Marshal") - } - - v, err = l.metaBucket.Get(k) - if err != nil { - return nil, errors.Wrap(err, "Localstore Meta failed on metaBucket.Get") - } - - if err := meta.Unmarshal(v); err != nil { - return nil, errors.Wrap(err, "Localstore Metafailed on ObjectMeta.Unmarshal") - } - - return &meta, nil + savedAtEpoch uint64 } -func metaFromObject(ctx context.Context, obj *Object) *ObjectMeta { - meta := new(ObjectMeta) - o := *obj - meta.Object = &o - meta.Object.Payload = nil - meta.PayloadSize = uint64(len(obj.Payload)) - meta.PayloadHash = hash.Sum(obj.Payload) +// SavedAtEpoch returns the number of epoch +// at which the object was saved locally. +func (m *ObjectMeta) SavedAtEpoch() uint64 { + if m != nil { + return m.savedAtEpoch + } - storeEpoch, ok := ctx.Value(StoreEpochValue).(uint64) - if ok { - meta.StoreEpoch = storeEpoch + return 0 +} + +// Head returns the object w/o payload. +func (m *ObjectMeta) Head() *object.Object { + if m != nil { + return m.head + } + + return nil +} + +// AddressFromMeta extracts the Address from object meta. +func AddressFromMeta(m *ObjectMeta) *object.Address { + return m.Head().Address() +} + +func metaFromObject(o *object.Object) *ObjectMeta { + // FIXME: remove hard-code + meta := new(ObjectMeta) + meta.savedAtEpoch = 10 + meta.head = &object.Object{ + Object: o.CutPayload(), } return meta } + +func metaToBytes(m *ObjectMeta) ([]byte, error) { + data := make([]byte, 8) + + binary.BigEndian.PutUint64(data, m.savedAtEpoch) + + addrBytes, err := m.head.MarshalStableV2() + if err != nil { + return nil, err + } + + return append(data, addrBytes...), nil +} + +func metaFromBytes(data []byte) (*ObjectMeta, error) { + if len(data) < 8 { + return nil, io.ErrUnexpectedEOF + } + + obj, err := object.FromBytes(data[8:]) + if err != nil { + return nil, errors.Wrap(err, "could not get object address from bytes") + } + + meta := new(ObjectMeta) + meta.savedAtEpoch = binary.BigEndian.Uint64(data) + meta.head = obj + + return meta, nil +} diff --git a/pkg/local_object_storage/localstore/methods.go b/pkg/local_object_storage/localstore/methods.go new file mode 100644 index 000000000..39521abbd --- /dev/null +++ b/pkg/local_object_storage/localstore/methods.go @@ -0,0 +1,107 @@ +package localstore + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +func (s *Storage) Put(obj *object.Object) error { + addrBytes, err := obj.Address().MarshalStableV2() + if err != nil { + return errors.Wrap(err, "could not marshal object address") + } + + objBytes, err := obj.MarshalStableV2() + if err != nil { + return errors.Wrap(err, "could not marshal the object") + } + + metaBytes, err := metaToBytes(metaFromObject(obj)) + if err != nil { + return errors.Wrap(err, "could not marshal object meta") + } + + if err := s.blobBucket.Set(addrBytes, objBytes); err != nil { + return errors.Wrap(err, "could no save object in BLOB storage") + } + + if err := s.metaBucket.Set(addrBytes, metaBytes); err != nil { + return errors.Wrap(err, "could not save object in meta storage") + } + + return nil +} + +func (s *Storage) Delete(addr *object.Address) error { + addrBytes, err := addr.MarshalStableV2() + if err != nil { + return errors.Wrap(err, "could not marshal object address") + } + + if err := s.blobBucket.Del(addrBytes); err != nil { + s.log.Warn("could not remove object from BLOB storage", + zap.Error(err), + ) + } + + if err := s.metaBucket.Del(addrBytes); err != nil { + return errors.Wrap(err, "could not remove object from meta storage") + } + + return nil +} + +func (s *Storage) Get(addr *object.Address) (*object.Object, error) { + addrBytes, err := addr.MarshalStableV2() + if err != nil { + return nil, errors.Wrap(err, "could not marshal object address") + } + + objBytes, err := s.blobBucket.Get(addrBytes) + if err != nil { + return nil, errors.Wrap(err, "could not get object from BLOB storage") + } + + return object.FromBytes(objBytes) +} + +func (s *Storage) Head(addr *object.Address) (*ObjectMeta, error) { + addrBytes, err := addr.MarshalStableV2() + if err != nil { + return nil, errors.Wrap(err, "could not marshal object address") + } + + metaBytes, err := s.metaBucket.Get(addrBytes) + if err != nil { + return nil, errors.Wrap(err, "could not get object from meta storage") + } + + return metaFromBytes(metaBytes) +} + +func (s *Storage) Iterate(filter FilterPipeline, handler func(*ObjectMeta) bool) error { + if filter == nil { + filter = NewFilter(&FilterParams{ + Name: "SKIPPING_FILTER", + FilterFunc: func(context.Context, *ObjectMeta) *FilterResult { + return ResultPass() + }, + }) + } + + return s.metaBucket.Iterate(func(_, v []byte) bool { + meta, err := metaFromBytes(v) + if err != nil { + s.log.Error("unmarshal meta bucket item failure", + zap.Error(err), + ) + } else if filter.Pass(context.TODO(), meta).Code() == CodePass { + return !handler(meta) + } + + return true + }) +} diff --git a/pkg/local_object_storage/localstore/put.go b/pkg/local_object_storage/localstore/put.go deleted file mode 100644 index bc18475d8..000000000 --- a/pkg/local_object_storage/localstore/put.go +++ /dev/null @@ -1,47 +0,0 @@ -package localstore - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/refs" - metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics" - "github.com/pkg/errors" -) - -func (l *localstore) Put(ctx context.Context, obj *Object) error { - var ( - oa refs.Address - k, v []byte - err error - ) - - oa = *obj.Address() - k, err = oa.Hash() - - if err != nil { - return errors.Wrap(err, "Localstore Put failed on StorageKey.marshal") - } - - if v, err = obj.Marshal(); err != nil { - return errors.Wrap(err, "Localstore Put failed on blobValue") - } - - if err = l.blobBucket.Set(k, v); err != nil { - return errors.Wrap(err, "Localstore Put failed on BlobBucket.Set") - } - - if v, err = metaFromObject(ctx, obj).Marshal(); err != nil { - return errors.Wrap(err, "Localstore Put failed on metaValue") - } - - if err = l.metaBucket.Set(k, v); err != nil { - return errors.Wrap(err, "Localstore Put failed on MetaBucket.Set") - } - - l.col.UpdateContainer( - obj.SystemHeader.CID, - obj.SystemHeader.PayloadLength, - metrics2.AddSpace) - - return nil -} diff --git a/pkg/local_object_storage/localstore/range.go b/pkg/local_object_storage/localstore/range.go deleted file mode 100644 index 05e43f531..000000000 --- a/pkg/local_object_storage/localstore/range.go +++ /dev/null @@ -1,36 +0,0 @@ -package localstore - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/pkg/errors" -) - -func (l *localstore) PRead(ctx context.Context, key Address, rng object.Range) ([]byte, error) { - var ( - err error - k, v []byte - obj Object - ) - - k, err = key.Hash() - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on key.Marshal") - } - - v, err = l.blobBucket.Get(k) - if err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on blobBucket.Get") - } - - if err := obj.Unmarshal(v); err != nil { - return nil, errors.Wrap(err, "Localstore Get failed on object.Unmarshal") - } - - if rng.Offset+rng.Length > uint64(len(obj.Payload)) { - return nil, ErrOutOfRange - } - - return obj.Payload[rng.Offset : rng.Offset+rng.Length], nil -} diff --git a/pkg/local_object_storage/localstore/storage.go b/pkg/local_object_storage/localstore/storage.go new file mode 100644 index 000000000..284ac3851 --- /dev/null +++ b/pkg/local_object_storage/localstore/storage.go @@ -0,0 +1,53 @@ +package localstore + +import ( + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Storage represents NeoFS local object storage. +type Storage struct { + log *logger.Logger + + metaBucket bucket.Bucket + + blobBucket bucket.Bucket +} + +// Option is an option of Storage constructor. +type Option func(*cfg) + +type cfg struct { + logger *logger.Logger +} + +func defaultCfg() *cfg { + return &cfg{ + logger: zap.L(), + } +} + +// New is a local object storage constructor. +func New(blob, meta bucket.Bucket, opts ...Option) *Storage { + cfg := defaultCfg() + + for i := range opts { + opts[i](cfg) + } + + return &Storage{ + metaBucket: meta, + blobBucket: blob, + log: cfg.logger, + } +} + +// WithLogger returns Storage option of used logger. +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + if l != nil { + c.logger = l + } + } +}