diff --git a/go.mod b/go.mod index 7d5f0ff3..0b66e86c 100644 --- a/go.mod +++ b/go.mod @@ -4,33 +4,25 @@ go 1.14 require ( bou.ke/monkey v1.0.2 - github.com/fasthttp/router v1.0.2 // indirect - github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.4.2 github.com/google/uuid v1.1.1 - github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 // indirect github.com/mr-tron/base58 v1.1.3 github.com/multiformats/go-multiaddr v0.2.0 github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2 github.com/multiformats/go-multihash v0.0.13 // indirect - github.com/nspcc-dev/hrw v1.0.9 // indirect github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78 github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200902121740-5a6dff8c83ba github.com/nspcc-dev/neofs-crypto v0.3.0 - github.com/nspcc-dev/netmap v1.7.0 // indirect - github.com/nspcc-dev/tzhash v1.4.0 // indirect github.com/panjf2000/ants/v2 v2.3.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.6.0 github.com/soheilhy/cmux v0.1.4 - github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.7.0 github.com/stretchr/testify v1.6.1 github.com/valyala/fasthttp v1.9.0 go.etcd.io/bbolt v1.3.4 go.uber.org/atomic v1.5.1 - go.uber.org/dig v1.8.0 // indirect go.uber.org/multierr v1.4.0 // indirect go.uber.org/zap v1.13.0 golang.org/x/crypto v0.0.0-20200117160349-530e935923ad // indirect diff --git a/go.sum b/go.sum index 1bc61495..a99aeddd 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/local_object_storage/bucket/test/bucket.go b/pkg/local_object_storage/bucket/test/bucket.go deleted file mode 100644 index 78cdf240..00000000 --- a/pkg/local_object_storage/bucket/test/bucket.go +++ /dev/null @@ -1,144 +0,0 @@ -package test - -import ( - "errors" - "sync" - - "github.com/mr-tron/base58" - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" -) - -type ( - testBucket struct { - sync.RWMutex - items map[string][]byte - } -) - -var ( - errOverflow = errors.New("overflow") - errNotFound = errors.New("not found") -) - -// Bucket constructs test Bucket implementation. -func Bucket() bucket.Bucket { - return &testBucket{ - items: make(map[string][]byte), - } -} - -func (t *testBucket) Get(key []byte) ([]byte, error) { - t.Lock() - defer t.Unlock() - - val, ok := t.items[base58.Encode(key)] - if !ok { - return nil, bucket.ErrNotFound - } - - return val, nil -} - -func (t *testBucket) Set(key, value []byte) error { - t.Lock() - defer t.Unlock() - - t.items[base58.Encode(key)] = value - - return nil -} - -func (t *testBucket) Del(key []byte) error { - t.RLock() - defer t.RUnlock() - - delete(t.items, base58.Encode(key)) - - return nil -} - -func (t *testBucket) Has(key []byte) bool { - t.RLock() - defer t.RUnlock() - - _, ok := t.items[base58.Encode(key)] - - return ok -} - -func (t *testBucket) Size() (res int64) { - t.RLock() - defer t.RUnlock() - - for _, v := range t.items { - res += int64(len(v)) - } - - return -} - -func (t *testBucket) List() ([][]byte, error) { - t.Lock() - defer t.Unlock() - - res := make([][]byte, 0) - - for k := range t.items { - sk, err := base58.Decode(k) - if err != nil { - return nil, err - } - - res = append(res, sk) - } - - return res, nil -} - -func (t *testBucket) Iterate(f bucket.FilterHandler) error { - t.RLock() - defer t.RUnlock() - - for k, v := range t.items { - key, err := base58.Decode(k) - if err != nil { - continue - } - - if !f(key, v) { - return bucket.ErrIteratingAborted - } - } - - return nil -} - -func (t *testBucket) Close() error { - t.Lock() - defer t.Unlock() - - for k := range t.items { - delete(t.items, k) - } - - return nil -} - -func (t *testBucket) PRead(key []byte, rng object.Range) ([]byte, error) { - t.RLock() - defer t.RUnlock() - - k := base58.Encode(key) - - v, ok := t.items[k] - if !ok { - return nil, errNotFound - } - - if rng.Offset+rng.Length > uint64(len(v)) { - return nil, errOverflow - } - - return v[rng.Offset : rng.Offset+rng.Length], nil -} diff --git a/pkg/local_object_storage/meta/iterator.go b/pkg/local_object_storage/meta/iterator.go deleted file mode 100644 index f5d3642f..00000000 --- a/pkg/local_object_storage/meta/iterator.go +++ /dev/null @@ -1,15 +0,0 @@ -package meta - -import ( - "github.com/nspcc-dev/neofs-api-go/object" -) - -type ( - // Iterator is an interface of the iterator over object storage. - Iterator interface { - Iterate(IterateFunc) error - } - - // IterateFunc is a function that checks whether an object matches a specific criterion. - IterateFunc func(*object.Object) error -) diff --git a/pkg/services/metrics/meta.go b/pkg/services/metrics/meta.go deleted file mode 100644 index 86f2a63d..00000000 --- a/pkg/services/metrics/meta.go +++ /dev/null @@ -1,33 +0,0 @@ -package metrics - -import ( - "sync" - - meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta" -) - -type metaWrapper struct { - sync.Mutex - iter meta2.Iterator -} - -func newMetaWrapper() *metaWrapper { - return &metaWrapper{} -} - -func (m *metaWrapper) changeIter(iter meta2.Iterator) { - m.Lock() - m.iter = iter - m.Unlock() -} - -func (m *metaWrapper) Iterate(h meta2.IterateFunc) error { - m.Lock() - defer m.Unlock() - - if m.iter == nil { - return errEmptyMetaStore - } - - return m.iter.Iterate(h) -} diff --git a/pkg/services/metrics/metrics.go b/pkg/services/metrics/metrics.go deleted file mode 100644 index ed9ce8bf..00000000 --- a/pkg/services/metrics/metrics.go +++ /dev/null @@ -1,175 +0,0 @@ -package metrics - -import ( - "context" - "errors" - "sync" - "time" - - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta" - "go.uber.org/zap" -) - -type ( - // Collector is an interface of the metrics collector. - Collector interface { - Start(ctx context.Context) - UpdateSpaceUsage() - - SetCounter(ObjectCounter) - SetIterator(iter meta2.Iterator) - UpdateContainer(cid refs.CID, size uint64, op SpaceOp) - } - - collector struct { - log *zap.Logger - interval time.Duration - counter *counterWrapper - - sizes *syncStore - metas *metaWrapper - - updateSpaceSize func() - updateObjectCount func() - } - - // Params groups the parameters of metrics collector's constructor. - Params struct { - Options []string - Logger *zap.Logger - Interval time.Duration - MetricsStore bucket.Bucket - } - - // ObjectCounter is an interface of object number storage. - ObjectCounter interface { - ObjectsCount() (uint64, error) - } - - // CounterSetter is an interface of ObjectCounter container. - CounterSetter interface { - SetCounter(ObjectCounter) - } - - counterWrapper struct { - sync.Mutex - counter ObjectCounter - } -) - -var ( - errEmptyCounter = errors.New("empty object counter") - errEmptyLogger = errors.New("empty logger") - errEmptyMetaStore = errors.New("empty meta store") - errEmptyMetricsStore = errors.New("empty metrics store") -) - -const defaultMetricsInterval = 5 * time.Second - -// New constructs metrics collector and returns Collector interface. -func New(p Params) (Collector, error) { - switch { - case p.Logger == nil: - return nil, errEmptyLogger - case p.MetricsStore == nil: - return nil, errEmptyMetricsStore - } - - if p.Interval <= 0 { - p.Interval = defaultMetricsInterval - } - - metas := newMetaWrapper() - sizes := newSyncStore(p.Logger, p.MetricsStore) - - sizes.Load() - - return &collector{ - log: p.Logger, - interval: p.Interval, - counter: new(counterWrapper), - - metas: metas, - sizes: sizes, - - updateSpaceSize: spaceUpdater(sizes), - updateObjectCount: metricsUpdater(p.Options), - }, nil -} - -func (c *counterWrapper) SetCounter(counter ObjectCounter) { - c.Lock() - defer c.Unlock() - - c.counter = counter -} - -func (c *counterWrapper) ObjectsCount() (uint64, error) { - c.Lock() - defer c.Unlock() - - if c.counter == nil { - return 0, errEmptyCounter - } - - return c.counter.ObjectsCount() -} - -func (c *collector) SetCounter(counter ObjectCounter) { - c.counter.SetCounter(counter) -} - -func (c *collector) SetIterator(iter meta2.Iterator) { - c.metas.changeIter(iter) -} - -func (c *collector) UpdateContainer(cid refs.CID, size uint64, op SpaceOp) { - c.sizes.Update(cid, size, op) - c.updateSpaceSize() -} - -func (c *collector) UpdateSpaceUsage() { - sizes := make(map[refs.CID]uint64) - - err := c.metas.Iterate(func(obj *object.Object) error { - if !obj.IsTombstone() { - cid := obj.SystemHeader.CID - sizes[cid] += obj.SystemHeader.PayloadLength - } - - return nil - }) - - if err != nil { - c.log.Error("could not update space metrics", zap.Error(err)) - } - - c.sizes.Reset(sizes) - c.updateSpaceSize() -} - -func (c *collector) Start(ctx context.Context) { - t := time.NewTicker(c.interval) - -loop: - for { - select { - case <-ctx.Done(): - c.log.Warn("stop collecting metrics", zap.Error(ctx.Err())) - break loop - case <-t.C: - count, err := c.counter.ObjectsCount() - if err != nil { - c.log.Warn("get object count failure", zap.Error(err)) - continue loop - } - counter.Store(float64(count)) - c.updateObjectCount() - } - } - - t.Stop() -} diff --git a/pkg/services/metrics/metrics_test.go b/pkg/services/metrics/metrics_test.go deleted file mode 100644 index 2b9d017e..00000000 --- a/pkg/services/metrics/metrics_test.go +++ /dev/null @@ -1,275 +0,0 @@ -package metrics - -import ( - "context" - "encoding/binary" - "sync" - "testing" - "time" - - "github.com/nspcc-dev/neofs-api-go/object" - "github.com/nspcc-dev/neofs-api-go/refs" - meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta" - "github.com/pkg/errors" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -type ( - fakeCounter int - fakeIterator string - fakeMetaStore []*object.Object -) - -var ( - _ ObjectCounter = (*fakeCounter)(nil) - _ meta2.Iterator = (*fakeIterator)(nil) -) - -func (f fakeCounter) ObjectsCount() (uint64, error) { - return uint64(f), nil -} - -func (f fakeIterator) Iterate(_ meta2.IterateFunc) error { - if f == "" { - return nil - } - - return errors.New(string(f)) -} - -func (f fakeMetaStore) Iterate(cb meta2.IterateFunc) error { - if cb == nil { - return nil - } - - for i := range f { - if err := cb(f[i]); err != nil { - return err - } - } - - return nil -} - -func TestCollector(t *testing.T) { - buck := &fakeBucket{items: make(map[uint64]int)} - - t.Run("check errors", func(t *testing.T) { - t.Run("empty logger", func(t *testing.T) { - svc, err := New(Params{MetricsStore: buck}) - require.Nil(t, svc) - require.EqualError(t, err, errEmptyLogger.Error()) - }) - - t.Run("empty metrics store", func(t *testing.T) { - svc, err := New(Params{Logger: zap.L()}) - require.Nil(t, svc) - require.EqualError(t, err, errEmptyMetricsStore.Error()) - }) - }) - - svc, err := New(Params{ - Logger: zap.L(), - MetricsStore: buck, - Options: []string{ - "/Location:Europe/Country:Russia/City:Moscow", - "/Some:Another/Key:Value", - }, - }) - - require.NoError(t, err) - require.NotNil(t, svc) - - coll, ok := svc.(*collector) - require.True(t, ok) - require.NotNil(t, coll) - - t.Run("check start", func(t *testing.T) { - coll.interval = time.Second - - t.Run("stop by context", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - wg := new(sync.WaitGroup) - wg.Add(1) - - counter.Store(-1) - - go func() { - svc.Start(ctx) - wg.Done() - }() - - cancel() - wg.Wait() - - require.Equal(t, float64(-1), counter.Load()) - }) - - t.Run("should fail on empty counter", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - wg := new(sync.WaitGroup) - wg.Add(1) - - counter.Store(0) - - go func() { - svc.Start(ctx) - wg.Done() - }() - - time.Sleep(2 * time.Second) - cancel() - wg.Wait() - - require.Equal(t, float64(0), counter.Load()) - }) - - t.Run("should success on fakeCounter", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - wg := new(sync.WaitGroup) - wg.Add(1) - - coll.SetCounter(fakeCounter(8)) - counter.Store(0) - - go func() { - svc.Start(ctx) - wg.Done() - }() - - time.Sleep(2 * time.Second) - cancel() - wg.Wait() - - require.Equal(t, float64(8), counter.Load()) - }) - }) - - t.Run("iterator", func(t *testing.T) { - { - coll.SetIterator(nil) - require.Nil(t, coll.metas.iter) - require.EqualError(t, coll.metas.Iterate(nil), errEmptyMetaStore.Error()) - } - - { - iter := fakeIterator("") - coll.SetIterator(iter) - require.Equal(t, iter, coll.metas.iter) - require.NoError(t, coll.metas.Iterate(nil)) - } - - { - iter := fakeIterator("test") - coll.SetIterator(iter) - require.Equal(t, iter, coll.metas.iter) - require.EqualError(t, coll.metas.Iterate(nil), string(iter)) - } - }) - - t.Run("add-rem space", func(t *testing.T) { - cid := refs.CID{1, 2, 3, 4, 5} - buf := make([]byte, 8) - key := keyFromBytes(cid.Bytes()) - - zero := make([]byte, 8) - size := uint64(100) - - binary.BigEndian.PutUint64(buf, size) - - { - coll.UpdateContainer(cid, size, AddSpace) - require.Len(t, coll.sizes.items, 1) - require.Len(t, buck.items, 1) - require.Contains(t, buck.items, key) - require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: buf}) - } - - { - coll.UpdateContainer(cid, size, RemSpace) - require.Len(t, coll.sizes.items, 1) - require.Len(t, buck.items, 1) - require.Contains(t, buck.items, key) - require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: zero}) - } - - { - coll.UpdateContainer(cid, size, RemSpace) - require.Len(t, coll.sizes.items, 1) - require.Len(t, buck.items, 1) - require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: zero}) - } - }) - - t.Run("add-rem multi thread", func(t *testing.T) { - wg := new(sync.WaitGroup) - wg.Add(10) - - size := uint64(100) - zero := make([]byte, 8) - - // reset - coll.UpdateSpaceUsage() - - for i := 0; i < 10; i++ { - cid := refs.CID{1, 2, 3, 4, byte(i)} - coll.UpdateContainer(cid, size, AddSpace) - - go func() { - coll.UpdateContainer(cid, size, RemSpace) - wg.Done() - }() - } - - wg.Wait() - - require.Len(t, coll.sizes.items, 10) - require.Len(t, buck.items, 10) - - for i := 0; i < 10; i++ { - cid := refs.CID{1, 2, 3, 4, byte(i)} - require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: zero}) - } - }) - - t.Run("reset buckets", func(t *testing.T) { - coll.UpdateSpaceUsage() - require.Len(t, coll.sizes.items, 0) - require.Len(t, buck.items, 0) - }) - - t.Run("reset from metaStore", func(t *testing.T) { - cid := refs.CID{1, 2, 3, 4, 5} - buf := make([]byte, 8) - key := keyFromBytes(cid.Bytes()) - size := uint64(100) - binary.BigEndian.PutUint64(buf, size) - - iter := fakeMetaStore{ - { - SystemHeader: object.SystemHeader{ - PayloadLength: size, - CID: cid, - }, - }, - - { - Headers: []object.Header{ - { - Value: &object.Header_Tombstone{Tombstone: &object.Tombstone{}}, - }, - }, - }, - } - - coll.SetIterator(iter) - - coll.UpdateSpaceUsage() - require.Len(t, coll.sizes.items, 1) - require.Len(t, buck.items, 1) - - require.Contains(t, buck.items, key) - require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: buf}) - }) -} diff --git a/pkg/services/metrics/prometeus.go b/pkg/services/metrics/prometeus.go deleted file mode 100644 index 438e85f5..00000000 --- a/pkg/services/metrics/prometeus.go +++ /dev/null @@ -1,83 +0,0 @@ -package metrics - -import ( - "strings" - - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" -) - -const ( - locationLabel = "location" - countryLabel = "country" - cityLabel = "city" - - containerLabel = "cid" -) - -var ( - objectsCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "neofs", - Name: "count_objects_on_node", - Help: "Number of objects stored on this node", - }, []string{locationLabel, countryLabel, cityLabel}) - - spaceCounter = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "neofs", - Name: "container_space_sizes", - Help: "Space allocated by ContainerID", - }, []string{containerLabel}) - - counter = atomic.NewFloat64(0) -) - -func init() { - prometheus.MustRegister( - objectsCount, - spaceCounter, - ) -} - -func spaceUpdater(m *syncStore) func() { - return func() { - m.mutex.RLock() - for cid := range m.items { - spaceCounter. - With(prometheus.Labels{ - containerLabel: cid.String(), - }). - Set(float64(m.items[cid])) - } - m.mutex.RUnlock() - } -} - -func metricsUpdater(opts []string) func() { - var ( - locationCode string - countryCode string - cityCode string - ) - - for i := range opts { - ss := strings.Split(opts[i], "/") - for j := range ss { - switch s := strings.SplitN(ss[j], ":", 2); strings.ToLower(s[0]) { - case locationLabel: - locationCode = s[1] - case countryLabel: - countryCode = s[1] - case cityLabel: - cityCode = s[1] - } - } - } - - return func() { - objectsCount.With(prometheus.Labels{ - locationLabel: locationCode, - countryLabel: countryCode, - cityLabel: cityCode, - }).Set(counter.Load()) - } -} diff --git a/pkg/services/metrics/store.go b/pkg/services/metrics/store.go deleted file mode 100644 index ef17ec2c..00000000 --- a/pkg/services/metrics/store.go +++ /dev/null @@ -1,122 +0,0 @@ -package metrics - -import ( - "encoding/binary" - "encoding/hex" - "sync" - - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - "go.uber.org/zap" -) - -type ( - syncStore struct { - log *zap.Logger - store bucket.Bucket - mutex sync.RWMutex - items map[refs.CID]uint64 - } - - // SpaceOp is an enumeration of space size operations. - SpaceOp int -) - -const ( - _ SpaceOp = iota - - // AddSpace is a SpaceOp of space size increasing. - AddSpace - - // RemSpace is a SpaceOp of space size decreasing. - RemSpace -) - -func newSyncStore(log *zap.Logger, store bucket.Bucket) *syncStore { - return &syncStore{ - log: log, - store: store, - items: make(map[refs.CID]uint64), - } -} - -func (m *syncStore) Load() { - m.mutex.Lock() - defer m.mutex.Unlock() - - _ = m.store.Iterate(func(key, val []byte) bool { - cid, err := refs.CIDFromBytes(key) - if err != nil { - m.log.Error("could not load space value", zap.Error(err)) - return true - } - - m.items[cid] += binary.BigEndian.Uint64(val) - return true - }) -} - -func (m *syncStore) Reset(items map[refs.CID]uint64) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.items = items - if items == nil { - m.items = make(map[refs.CID]uint64) - } - - keys, err := m.store.List() - if err != nil { - m.log.Error("could not fetch keys space metrics", zap.Error(err)) - return - } - - // cleanup metrics store - for i := range keys { - if err := m.store.Del(keys[i]); err != nil { - cid := hex.EncodeToString(keys[i]) - m.log.Error("could not remove key", - zap.String("cid", cid), - zap.Error(err)) - } - } - - buf := make([]byte, 8) - - for cid := range items { - binary.BigEndian.PutUint64(buf, items[cid]) - - if err := m.store.Set(cid.Bytes(), buf); err != nil { - m.log.Error("could not store space value", - zap.Stringer("cid", cid), - zap.Error(err)) - } - } -} - -func (m *syncStore) Update(cid refs.CID, size uint64, op SpaceOp) { - m.mutex.Lock() - defer m.mutex.Unlock() - - switch op { - case RemSpace: - if m.items[cid] < size { - m.log.Error("space could not be negative") - return - } - - m.items[cid] -= size - case AddSpace: - m.items[cid] += size - default: - m.log.Error("unknown space operation", zap.Int("op", int(op))) - return - } - - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, m.items[cid]) - - if err := m.store.Set(cid.Bytes(), buf); err != nil { - m.log.Error("could not update space size", zap.Int("op", int(op))) - } -} diff --git a/pkg/services/metrics/store_test.go b/pkg/services/metrics/store_test.go deleted file mode 100644 index dd4b760f..00000000 --- a/pkg/services/metrics/store_test.go +++ /dev/null @@ -1,154 +0,0 @@ -package metrics - -import ( - "sync" - "testing" - - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - "github.com/spaolacci/murmur3" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -type ( - fakeKV struct { - key []byte - val []byte - } - - fakeBucket struct { - sync.RWMutex - kv []fakeKV - items map[uint64]int - } -) - -func keyFromBytes(b []byte) uint64 { - return murmur3.Sum64(b) -} - -func (f *fakeBucket) Set(key, value []byte) error { - f.Lock() - defer f.Unlock() - - var ( - id int - ok bool - uid = keyFromBytes(key) - ) - - if id, ok = f.items[uid]; !ok || id >= len(f.kv) { - id = len(f.kv) - f.items[uid] = id - f.kv = append(f.kv, fakeKV{ - key: key, - val: value, - }) - - return nil - } - - f.kv[id] = fakeKV{ - key: key, - val: value, - } - - return nil -} - -func (f *fakeBucket) Del(key []byte) error { - f.Lock() - defer f.Unlock() - - delete(f.items, keyFromBytes(key)) - - return nil -} - -func (f *fakeBucket) List() ([][]byte, error) { - f.RLock() - defer f.RUnlock() - - items := make([][]byte, 0, len(f.items)) - for _, id := range f.items { - // ignore unknown KV - if id >= len(f.kv) { - continue - } - - items = append(items, f.kv[id].key) - } - - return items, nil -} - -func (f *fakeBucket) Iterate(handler bucket.FilterHandler) error { - f.Lock() - defer f.Unlock() - - for _, id := range f.items { - // ignore unknown KV - if id >= len(f.kv) { - continue - } - - kv := f.kv[id] - - if !handler(kv.key, kv.val) { - break - } - } - - return nil -} - -func (f *fakeBucket) Get(_ []byte) ([]byte, error) { panic("implement me") } -func (f *fakeBucket) Has(_ []byte) bool { panic("implement me") } -func (f *fakeBucket) Size() int64 { panic("implement me") } -func (f *fakeBucket) Close() error { panic("implement me") } - -func TestSyncStore(t *testing.T) { - buck := &fakeBucket{items: make(map[uint64]int)} - sizes := newSyncStore(zap.L(), buck) - - for i := 0; i < 10; i++ { - cid := refs.CID{0, 0, 0, byte(i)} - require.NoError(t, buck.Set(cid.Bytes(), []byte{1, 2, 3, 4, 5, 6, 7, byte(i)})) - } - - t.Run("load", func(t *testing.T) { - sizes.Load() - require.Len(t, sizes.items, len(buck.items)) - }) - - t.Run("reset", func(t *testing.T) { - sizes.Reset(nil) - require.Len(t, sizes.items, 0) - }) - - t.Run("update", func(t *testing.T) { - cid := refs.CID{1, 2, 3, 4, 5} - - { // add space - sizes.Update(cid, 8, AddSpace) - val, ok := sizes.items[cid] - require.True(t, ok) - require.Equal(t, uint64(8), val) - } - - { // rem space - sizes.Update(cid, 8, RemSpace) - val, ok := sizes.items[cid] - require.True(t, ok) - require.Zero(t, val) - } - - { // rem space (zero - val) - sizes.Update(cid, 8, RemSpace) - val, ok := sizes.items[cid] - require.True(t, ok) - require.Zero(t, val) - } - }) -} diff --git a/pkg/services/object_manager/verifier/verifier.go b/pkg/services/object_manager/verifier/verifier.go deleted file mode 100644 index 46c83ce7..00000000 --- a/pkg/services/object_manager/verifier/verifier.go +++ /dev/null @@ -1,35 +0,0 @@ -package verifier - -import ( - "bytes" - "context" - - "github.com/nspcc-dev/neofs-api-go/object" -) - -// Verifier is an interface for checking whether an object conforms to a certain criterion. -// Nil error is equivalent to matching the criterion. -type Verifier interface { - Verify(context.Context, *object.Object) error -} - -// MarshalHeaders marshals all object headers which are "higher" than to-th extended header. -func MarshalHeaders(obj *object.Object, to int) ([]byte, error) { - buf := new(bytes.Buffer) - - if sysHdr, err := obj.SystemHeader.Marshal(); err != nil { - return nil, err - } else if _, err := buf.Write(sysHdr); err != nil { - return nil, err - } - - for i := range obj.Headers[:to] { - if header, err := obj.Headers[i].Marshal(); err != nil { - return nil, err - } else if _, err := buf.Write(header); err != nil { - return nil, err - } - } - - return buf.Bytes(), nil -}