forked from TrueCloudLab/frostfs-node
Remove broken packages
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
92f448f303
commit
03b170237f
11 changed files with 0 additions and 1044 deletions
8
go.mod
8
go.mod
|
@ -4,33 +4,25 @@ go 1.14
|
|||
|
||||
require (
|
||||
bou.ke/monkey v1.0.2
|
||||
github.com/fasthttp/router v1.0.2 // indirect
|
||||
github.com/gogo/protobuf v1.3.1
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 // indirect
|
||||
github.com/mr-tron/base58 v1.1.3
|
||||
github.com/multiformats/go-multiaddr v0.2.0
|
||||
github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2
|
||||
github.com/multiformats/go-multihash v0.0.13 // indirect
|
||||
github.com/nspcc-dev/hrw v1.0.9 // indirect
|
||||
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78
|
||||
github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200902121740-5a6dff8c83ba
|
||||
github.com/nspcc-dev/neofs-crypto v0.3.0
|
||||
github.com/nspcc-dev/netmap v1.7.0 // indirect
|
||||
github.com/nspcc-dev/tzhash v1.4.0 // indirect
|
||||
github.com/panjf2000/ants/v2 v2.3.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.6.0
|
||||
github.com/soheilhy/cmux v0.1.4
|
||||
github.com/spaolacci/murmur3 v1.1.0
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/spf13/viper v1.7.0
|
||||
github.com/stretchr/testify v1.6.1
|
||||
github.com/valyala/fasthttp v1.9.0
|
||||
go.etcd.io/bbolt v1.3.4
|
||||
go.uber.org/atomic v1.5.1
|
||||
go.uber.org/dig v1.8.0 // indirect
|
||||
go.uber.org/multierr v1.4.0 // indirect
|
||||
go.uber.org/zap v1.13.0
|
||||
golang.org/x/crypto v0.0.0-20200117160349-530e935923ad // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -1,144 +0,0 @@
|
|||
package test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
)
|
||||
|
||||
type (
|
||||
testBucket struct {
|
||||
sync.RWMutex
|
||||
items map[string][]byte
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
errOverflow = errors.New("overflow")
|
||||
errNotFound = errors.New("not found")
|
||||
)
|
||||
|
||||
// Bucket constructs test Bucket implementation.
|
||||
func Bucket() bucket.Bucket {
|
||||
return &testBucket{
|
||||
items: make(map[string][]byte),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *testBucket) Get(key []byte) ([]byte, error) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
val, ok := t.items[base58.Encode(key)]
|
||||
if !ok {
|
||||
return nil, bucket.ErrNotFound
|
||||
}
|
||||
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func (t *testBucket) Set(key, value []byte) error {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
t.items[base58.Encode(key)] = value
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testBucket) Del(key []byte) error {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
delete(t.items, base58.Encode(key))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testBucket) Has(key []byte) bool {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
_, ok := t.items[base58.Encode(key)]
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func (t *testBucket) Size() (res int64) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
for _, v := range t.items {
|
||||
res += int64(len(v))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (t *testBucket) List() ([][]byte, error) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
res := make([][]byte, 0)
|
||||
|
||||
for k := range t.items {
|
||||
sk, err := base58.Decode(k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res = append(res, sk)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (t *testBucket) Iterate(f bucket.FilterHandler) error {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
for k, v := range t.items {
|
||||
key, err := base58.Decode(k)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !f(key, v) {
|
||||
return bucket.ErrIteratingAborted
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testBucket) Close() error {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
for k := range t.items {
|
||||
delete(t.items, k)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testBucket) PRead(key []byte, rng object.Range) ([]byte, error) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
k := base58.Encode(key)
|
||||
|
||||
v, ok := t.items[k]
|
||||
if !ok {
|
||||
return nil, errNotFound
|
||||
}
|
||||
|
||||
if rng.Offset+rng.Length > uint64(len(v)) {
|
||||
return nil, errOverflow
|
||||
}
|
||||
|
||||
return v[rng.Offset : rng.Offset+rng.Length], nil
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
)
|
||||
|
||||
type (
|
||||
// Iterator is an interface of the iterator over object storage.
|
||||
Iterator interface {
|
||||
Iterate(IterateFunc) error
|
||||
}
|
||||
|
||||
// IterateFunc is a function that checks whether an object matches a specific criterion.
|
||||
IterateFunc func(*object.Object) error
|
||||
)
|
|
@ -1,33 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta"
|
||||
)
|
||||
|
||||
type metaWrapper struct {
|
||||
sync.Mutex
|
||||
iter meta2.Iterator
|
||||
}
|
||||
|
||||
func newMetaWrapper() *metaWrapper {
|
||||
return &metaWrapper{}
|
||||
}
|
||||
|
||||
func (m *metaWrapper) changeIter(iter meta2.Iterator) {
|
||||
m.Lock()
|
||||
m.iter = iter
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
func (m *metaWrapper) Iterate(h meta2.IterateFunc) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if m.iter == nil {
|
||||
return errEmptyMetaStore
|
||||
}
|
||||
|
||||
return m.iter.Iterate(h)
|
||||
}
|
|
@ -1,175 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// Collector is an interface of the metrics collector.
|
||||
Collector interface {
|
||||
Start(ctx context.Context)
|
||||
UpdateSpaceUsage()
|
||||
|
||||
SetCounter(ObjectCounter)
|
||||
SetIterator(iter meta2.Iterator)
|
||||
UpdateContainer(cid refs.CID, size uint64, op SpaceOp)
|
||||
}
|
||||
|
||||
collector struct {
|
||||
log *zap.Logger
|
||||
interval time.Duration
|
||||
counter *counterWrapper
|
||||
|
||||
sizes *syncStore
|
||||
metas *metaWrapper
|
||||
|
||||
updateSpaceSize func()
|
||||
updateObjectCount func()
|
||||
}
|
||||
|
||||
// Params groups the parameters of metrics collector's constructor.
|
||||
Params struct {
|
||||
Options []string
|
||||
Logger *zap.Logger
|
||||
Interval time.Duration
|
||||
MetricsStore bucket.Bucket
|
||||
}
|
||||
|
||||
// ObjectCounter is an interface of object number storage.
|
||||
ObjectCounter interface {
|
||||
ObjectsCount() (uint64, error)
|
||||
}
|
||||
|
||||
// CounterSetter is an interface of ObjectCounter container.
|
||||
CounterSetter interface {
|
||||
SetCounter(ObjectCounter)
|
||||
}
|
||||
|
||||
counterWrapper struct {
|
||||
sync.Mutex
|
||||
counter ObjectCounter
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
errEmptyCounter = errors.New("empty object counter")
|
||||
errEmptyLogger = errors.New("empty logger")
|
||||
errEmptyMetaStore = errors.New("empty meta store")
|
||||
errEmptyMetricsStore = errors.New("empty metrics store")
|
||||
)
|
||||
|
||||
const defaultMetricsInterval = 5 * time.Second
|
||||
|
||||
// New constructs metrics collector and returns Collector interface.
|
||||
func New(p Params) (Collector, error) {
|
||||
switch {
|
||||
case p.Logger == nil:
|
||||
return nil, errEmptyLogger
|
||||
case p.MetricsStore == nil:
|
||||
return nil, errEmptyMetricsStore
|
||||
}
|
||||
|
||||
if p.Interval <= 0 {
|
||||
p.Interval = defaultMetricsInterval
|
||||
}
|
||||
|
||||
metas := newMetaWrapper()
|
||||
sizes := newSyncStore(p.Logger, p.MetricsStore)
|
||||
|
||||
sizes.Load()
|
||||
|
||||
return &collector{
|
||||
log: p.Logger,
|
||||
interval: p.Interval,
|
||||
counter: new(counterWrapper),
|
||||
|
||||
metas: metas,
|
||||
sizes: sizes,
|
||||
|
||||
updateSpaceSize: spaceUpdater(sizes),
|
||||
updateObjectCount: metricsUpdater(p.Options),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *counterWrapper) SetCounter(counter ObjectCounter) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
c.counter = counter
|
||||
}
|
||||
|
||||
func (c *counterWrapper) ObjectsCount() (uint64, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if c.counter == nil {
|
||||
return 0, errEmptyCounter
|
||||
}
|
||||
|
||||
return c.counter.ObjectsCount()
|
||||
}
|
||||
|
||||
func (c *collector) SetCounter(counter ObjectCounter) {
|
||||
c.counter.SetCounter(counter)
|
||||
}
|
||||
|
||||
func (c *collector) SetIterator(iter meta2.Iterator) {
|
||||
c.metas.changeIter(iter)
|
||||
}
|
||||
|
||||
func (c *collector) UpdateContainer(cid refs.CID, size uint64, op SpaceOp) {
|
||||
c.sizes.Update(cid, size, op)
|
||||
c.updateSpaceSize()
|
||||
}
|
||||
|
||||
func (c *collector) UpdateSpaceUsage() {
|
||||
sizes := make(map[refs.CID]uint64)
|
||||
|
||||
err := c.metas.Iterate(func(obj *object.Object) error {
|
||||
if !obj.IsTombstone() {
|
||||
cid := obj.SystemHeader.CID
|
||||
sizes[cid] += obj.SystemHeader.PayloadLength
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
c.log.Error("could not update space metrics", zap.Error(err))
|
||||
}
|
||||
|
||||
c.sizes.Reset(sizes)
|
||||
c.updateSpaceSize()
|
||||
}
|
||||
|
||||
func (c *collector) Start(ctx context.Context) {
|
||||
t := time.NewTicker(c.interval)
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
c.log.Warn("stop collecting metrics", zap.Error(ctx.Err()))
|
||||
break loop
|
||||
case <-t.C:
|
||||
count, err := c.counter.ObjectsCount()
|
||||
if err != nil {
|
||||
c.log.Warn("get object count failure", zap.Error(err))
|
||||
continue loop
|
||||
}
|
||||
counter.Store(float64(count))
|
||||
c.updateObjectCount()
|
||||
}
|
||||
}
|
||||
|
||||
t.Stop()
|
||||
}
|
|
@ -1,275 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
fakeCounter int
|
||||
fakeIterator string
|
||||
fakeMetaStore []*object.Object
|
||||
)
|
||||
|
||||
var (
|
||||
_ ObjectCounter = (*fakeCounter)(nil)
|
||||
_ meta2.Iterator = (*fakeIterator)(nil)
|
||||
)
|
||||
|
||||
func (f fakeCounter) ObjectsCount() (uint64, error) {
|
||||
return uint64(f), nil
|
||||
}
|
||||
|
||||
func (f fakeIterator) Iterate(_ meta2.IterateFunc) error {
|
||||
if f == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New(string(f))
|
||||
}
|
||||
|
||||
func (f fakeMetaStore) Iterate(cb meta2.IterateFunc) error {
|
||||
if cb == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := range f {
|
||||
if err := cb(f[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCollector(t *testing.T) {
|
||||
buck := &fakeBucket{items: make(map[uint64]int)}
|
||||
|
||||
t.Run("check errors", func(t *testing.T) {
|
||||
t.Run("empty logger", func(t *testing.T) {
|
||||
svc, err := New(Params{MetricsStore: buck})
|
||||
require.Nil(t, svc)
|
||||
require.EqualError(t, err, errEmptyLogger.Error())
|
||||
})
|
||||
|
||||
t.Run("empty metrics store", func(t *testing.T) {
|
||||
svc, err := New(Params{Logger: zap.L()})
|
||||
require.Nil(t, svc)
|
||||
require.EqualError(t, err, errEmptyMetricsStore.Error())
|
||||
})
|
||||
})
|
||||
|
||||
svc, err := New(Params{
|
||||
Logger: zap.L(),
|
||||
MetricsStore: buck,
|
||||
Options: []string{
|
||||
"/Location:Europe/Country:Russia/City:Moscow",
|
||||
"/Some:Another/Key:Value",
|
||||
},
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, svc)
|
||||
|
||||
coll, ok := svc.(*collector)
|
||||
require.True(t, ok)
|
||||
require.NotNil(t, coll)
|
||||
|
||||
t.Run("check start", func(t *testing.T) {
|
||||
coll.interval = time.Second
|
||||
|
||||
t.Run("stop by context", func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(1)
|
||||
|
||||
counter.Store(-1)
|
||||
|
||||
go func() {
|
||||
svc.Start(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
require.Equal(t, float64(-1), counter.Load())
|
||||
})
|
||||
|
||||
t.Run("should fail on empty counter", func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(1)
|
||||
|
||||
counter.Store(0)
|
||||
|
||||
go func() {
|
||||
svc.Start(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
require.Equal(t, float64(0), counter.Load())
|
||||
})
|
||||
|
||||
t.Run("should success on fakeCounter", func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(1)
|
||||
|
||||
coll.SetCounter(fakeCounter(8))
|
||||
counter.Store(0)
|
||||
|
||||
go func() {
|
||||
svc.Start(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
require.Equal(t, float64(8), counter.Load())
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("iterator", func(t *testing.T) {
|
||||
{
|
||||
coll.SetIterator(nil)
|
||||
require.Nil(t, coll.metas.iter)
|
||||
require.EqualError(t, coll.metas.Iterate(nil), errEmptyMetaStore.Error())
|
||||
}
|
||||
|
||||
{
|
||||
iter := fakeIterator("")
|
||||
coll.SetIterator(iter)
|
||||
require.Equal(t, iter, coll.metas.iter)
|
||||
require.NoError(t, coll.metas.Iterate(nil))
|
||||
}
|
||||
|
||||
{
|
||||
iter := fakeIterator("test")
|
||||
coll.SetIterator(iter)
|
||||
require.Equal(t, iter, coll.metas.iter)
|
||||
require.EqualError(t, coll.metas.Iterate(nil), string(iter))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add-rem space", func(t *testing.T) {
|
||||
cid := refs.CID{1, 2, 3, 4, 5}
|
||||
buf := make([]byte, 8)
|
||||
key := keyFromBytes(cid.Bytes())
|
||||
|
||||
zero := make([]byte, 8)
|
||||
size := uint64(100)
|
||||
|
||||
binary.BigEndian.PutUint64(buf, size)
|
||||
|
||||
{
|
||||
coll.UpdateContainer(cid, size, AddSpace)
|
||||
require.Len(t, coll.sizes.items, 1)
|
||||
require.Len(t, buck.items, 1)
|
||||
require.Contains(t, buck.items, key)
|
||||
require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: buf})
|
||||
}
|
||||
|
||||
{
|
||||
coll.UpdateContainer(cid, size, RemSpace)
|
||||
require.Len(t, coll.sizes.items, 1)
|
||||
require.Len(t, buck.items, 1)
|
||||
require.Contains(t, buck.items, key)
|
||||
require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: zero})
|
||||
}
|
||||
|
||||
{
|
||||
coll.UpdateContainer(cid, size, RemSpace)
|
||||
require.Len(t, coll.sizes.items, 1)
|
||||
require.Len(t, buck.items, 1)
|
||||
require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: zero})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("add-rem multi thread", func(t *testing.T) {
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(10)
|
||||
|
||||
size := uint64(100)
|
||||
zero := make([]byte, 8)
|
||||
|
||||
// reset
|
||||
coll.UpdateSpaceUsage()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
cid := refs.CID{1, 2, 3, 4, byte(i)}
|
||||
coll.UpdateContainer(cid, size, AddSpace)
|
||||
|
||||
go func() {
|
||||
coll.UpdateContainer(cid, size, RemSpace)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
require.Len(t, coll.sizes.items, 10)
|
||||
require.Len(t, buck.items, 10)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
cid := refs.CID{1, 2, 3, 4, byte(i)}
|
||||
require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: zero})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("reset buckets", func(t *testing.T) {
|
||||
coll.UpdateSpaceUsage()
|
||||
require.Len(t, coll.sizes.items, 0)
|
||||
require.Len(t, buck.items, 0)
|
||||
})
|
||||
|
||||
t.Run("reset from metaStore", func(t *testing.T) {
|
||||
cid := refs.CID{1, 2, 3, 4, 5}
|
||||
buf := make([]byte, 8)
|
||||
key := keyFromBytes(cid.Bytes())
|
||||
size := uint64(100)
|
||||
binary.BigEndian.PutUint64(buf, size)
|
||||
|
||||
iter := fakeMetaStore{
|
||||
{
|
||||
SystemHeader: object.SystemHeader{
|
||||
PayloadLength: size,
|
||||
CID: cid,
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
Headers: []object.Header{
|
||||
{
|
||||
Value: &object.Header_Tombstone{Tombstone: &object.Tombstone{}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
coll.SetIterator(iter)
|
||||
|
||||
coll.UpdateSpaceUsage()
|
||||
require.Len(t, coll.sizes.items, 1)
|
||||
require.Len(t, buck.items, 1)
|
||||
|
||||
require.Contains(t, buck.items, key)
|
||||
require.Contains(t, buck.kv, fakeKV{key: cid.Bytes(), val: buf})
|
||||
})
|
||||
}
|
|
@ -1,83 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
locationLabel = "location"
|
||||
countryLabel = "country"
|
||||
cityLabel = "city"
|
||||
|
||||
containerLabel = "cid"
|
||||
)
|
||||
|
||||
var (
|
||||
objectsCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "neofs",
|
||||
Name: "count_objects_on_node",
|
||||
Help: "Number of objects stored on this node",
|
||||
}, []string{locationLabel, countryLabel, cityLabel})
|
||||
|
||||
spaceCounter = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "neofs",
|
||||
Name: "container_space_sizes",
|
||||
Help: "Space allocated by ContainerID",
|
||||
}, []string{containerLabel})
|
||||
|
||||
counter = atomic.NewFloat64(0)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(
|
||||
objectsCount,
|
||||
spaceCounter,
|
||||
)
|
||||
}
|
||||
|
||||
func spaceUpdater(m *syncStore) func() {
|
||||
return func() {
|
||||
m.mutex.RLock()
|
||||
for cid := range m.items {
|
||||
spaceCounter.
|
||||
With(prometheus.Labels{
|
||||
containerLabel: cid.String(),
|
||||
}).
|
||||
Set(float64(m.items[cid]))
|
||||
}
|
||||
m.mutex.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func metricsUpdater(opts []string) func() {
|
||||
var (
|
||||
locationCode string
|
||||
countryCode string
|
||||
cityCode string
|
||||
)
|
||||
|
||||
for i := range opts {
|
||||
ss := strings.Split(opts[i], "/")
|
||||
for j := range ss {
|
||||
switch s := strings.SplitN(ss[j], ":", 2); strings.ToLower(s[0]) {
|
||||
case locationLabel:
|
||||
locationCode = s[1]
|
||||
case countryLabel:
|
||||
countryCode = s[1]
|
||||
case cityLabel:
|
||||
cityCode = s[1]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return func() {
|
||||
objectsCount.With(prometheus.Labels{
|
||||
locationLabel: locationCode,
|
||||
countryLabel: countryCode,
|
||||
cityLabel: cityCode,
|
||||
}).Set(counter.Load())
|
||||
}
|
||||
}
|
|
@ -1,122 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
syncStore struct {
|
||||
log *zap.Logger
|
||||
store bucket.Bucket
|
||||
mutex sync.RWMutex
|
||||
items map[refs.CID]uint64
|
||||
}
|
||||
|
||||
// SpaceOp is an enumeration of space size operations.
|
||||
SpaceOp int
|
||||
)
|
||||
|
||||
const (
|
||||
_ SpaceOp = iota
|
||||
|
||||
// AddSpace is a SpaceOp of space size increasing.
|
||||
AddSpace
|
||||
|
||||
// RemSpace is a SpaceOp of space size decreasing.
|
||||
RemSpace
|
||||
)
|
||||
|
||||
func newSyncStore(log *zap.Logger, store bucket.Bucket) *syncStore {
|
||||
return &syncStore{
|
||||
log: log,
|
||||
store: store,
|
||||
items: make(map[refs.CID]uint64),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *syncStore) Load() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
_ = m.store.Iterate(func(key, val []byte) bool {
|
||||
cid, err := refs.CIDFromBytes(key)
|
||||
if err != nil {
|
||||
m.log.Error("could not load space value", zap.Error(err))
|
||||
return true
|
||||
}
|
||||
|
||||
m.items[cid] += binary.BigEndian.Uint64(val)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (m *syncStore) Reset(items map[refs.CID]uint64) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.items = items
|
||||
if items == nil {
|
||||
m.items = make(map[refs.CID]uint64)
|
||||
}
|
||||
|
||||
keys, err := m.store.List()
|
||||
if err != nil {
|
||||
m.log.Error("could not fetch keys space metrics", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// cleanup metrics store
|
||||
for i := range keys {
|
||||
if err := m.store.Del(keys[i]); err != nil {
|
||||
cid := hex.EncodeToString(keys[i])
|
||||
m.log.Error("could not remove key",
|
||||
zap.String("cid", cid),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
buf := make([]byte, 8)
|
||||
|
||||
for cid := range items {
|
||||
binary.BigEndian.PutUint64(buf, items[cid])
|
||||
|
||||
if err := m.store.Set(cid.Bytes(), buf); err != nil {
|
||||
m.log.Error("could not store space value",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *syncStore) Update(cid refs.CID, size uint64, op SpaceOp) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
switch op {
|
||||
case RemSpace:
|
||||
if m.items[cid] < size {
|
||||
m.log.Error("space could not be negative")
|
||||
return
|
||||
}
|
||||
|
||||
m.items[cid] -= size
|
||||
case AddSpace:
|
||||
m.items[cid] += size
|
||||
default:
|
||||
m.log.Error("unknown space operation", zap.Int("op", int(op)))
|
||||
return
|
||||
}
|
||||
|
||||
buf := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf, m.items[cid])
|
||||
|
||||
if err := m.store.Set(cid.Bytes(), buf); err != nil {
|
||||
m.log.Error("could not update space size", zap.Int("op", int(op)))
|
||||
}
|
||||
}
|
|
@ -1,154 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
"github.com/spaolacci/murmur3"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
fakeKV struct {
|
||||
key []byte
|
||||
val []byte
|
||||
}
|
||||
|
||||
fakeBucket struct {
|
||||
sync.RWMutex
|
||||
kv []fakeKV
|
||||
items map[uint64]int
|
||||
}
|
||||
)
|
||||
|
||||
func keyFromBytes(b []byte) uint64 {
|
||||
return murmur3.Sum64(b)
|
||||
}
|
||||
|
||||
func (f *fakeBucket) Set(key, value []byte) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
var (
|
||||
id int
|
||||
ok bool
|
||||
uid = keyFromBytes(key)
|
||||
)
|
||||
|
||||
if id, ok = f.items[uid]; !ok || id >= len(f.kv) {
|
||||
id = len(f.kv)
|
||||
f.items[uid] = id
|
||||
f.kv = append(f.kv, fakeKV{
|
||||
key: key,
|
||||
val: value,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
f.kv[id] = fakeKV{
|
||||
key: key,
|
||||
val: value,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeBucket) Del(key []byte) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
delete(f.items, keyFromBytes(key))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeBucket) List() ([][]byte, error) {
|
||||
f.RLock()
|
||||
defer f.RUnlock()
|
||||
|
||||
items := make([][]byte, 0, len(f.items))
|
||||
for _, id := range f.items {
|
||||
// ignore unknown KV
|
||||
if id >= len(f.kv) {
|
||||
continue
|
||||
}
|
||||
|
||||
items = append(items, f.kv[id].key)
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (f *fakeBucket) Iterate(handler bucket.FilterHandler) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
for _, id := range f.items {
|
||||
// ignore unknown KV
|
||||
if id >= len(f.kv) {
|
||||
continue
|
||||
}
|
||||
|
||||
kv := f.kv[id]
|
||||
|
||||
if !handler(kv.key, kv.val) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeBucket) Get(_ []byte) ([]byte, error) { panic("implement me") }
|
||||
func (f *fakeBucket) Has(_ []byte) bool { panic("implement me") }
|
||||
func (f *fakeBucket) Size() int64 { panic("implement me") }
|
||||
func (f *fakeBucket) Close() error { panic("implement me") }
|
||||
|
||||
func TestSyncStore(t *testing.T) {
|
||||
buck := &fakeBucket{items: make(map[uint64]int)}
|
||||
sizes := newSyncStore(zap.L(), buck)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
cid := refs.CID{0, 0, 0, byte(i)}
|
||||
require.NoError(t, buck.Set(cid.Bytes(), []byte{1, 2, 3, 4, 5, 6, 7, byte(i)}))
|
||||
}
|
||||
|
||||
t.Run("load", func(t *testing.T) {
|
||||
sizes.Load()
|
||||
require.Len(t, sizes.items, len(buck.items))
|
||||
})
|
||||
|
||||
t.Run("reset", func(t *testing.T) {
|
||||
sizes.Reset(nil)
|
||||
require.Len(t, sizes.items, 0)
|
||||
})
|
||||
|
||||
t.Run("update", func(t *testing.T) {
|
||||
cid := refs.CID{1, 2, 3, 4, 5}
|
||||
|
||||
{ // add space
|
||||
sizes.Update(cid, 8, AddSpace)
|
||||
val, ok := sizes.items[cid]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, uint64(8), val)
|
||||
}
|
||||
|
||||
{ // rem space
|
||||
sizes.Update(cid, 8, RemSpace)
|
||||
val, ok := sizes.items[cid]
|
||||
require.True(t, ok)
|
||||
require.Zero(t, val)
|
||||
}
|
||||
|
||||
{ // rem space (zero - val)
|
||||
sizes.Update(cid, 8, RemSpace)
|
||||
val, ok := sizes.items[cid]
|
||||
require.True(t, ok)
|
||||
require.Zero(t, val)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
package verifier
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
)
|
||||
|
||||
// Verifier is an interface for checking whether an object conforms to a certain criterion.
|
||||
// Nil error is equivalent to matching the criterion.
|
||||
type Verifier interface {
|
||||
Verify(context.Context, *object.Object) error
|
||||
}
|
||||
|
||||
// MarshalHeaders marshals all object headers which are "higher" than to-th extended header.
|
||||
func MarshalHeaders(obj *object.Object, to int) ([]byte, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
if sysHdr, err := obj.SystemHeader.Marshal(); err != nil {
|
||||
return nil, err
|
||||
} else if _, err := buf.Write(sysHdr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range obj.Headers[:to] {
|
||||
if header, err := obj.Headers[i].Marshal(); err != nil {
|
||||
return nil, err
|
||||
} else if _, err := buf.Write(header); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
Loading…
Reference in a new issue