diff --git a/pkg/local_object_storage/bucket/bucket.go b/pkg/local_object_storage/bucket/bucket.go deleted file mode 100644 index 612e89919..000000000 --- a/pkg/local_object_storage/bucket/bucket.go +++ /dev/null @@ -1,41 +0,0 @@ -package bucket - -import ( - "errors" -) - -// FilterHandler where you receive key/val in your closure. -type FilterHandler func(key, val []byte) bool - -// BucketItem used in filter. -type BucketItem struct { - Key []byte - Val []byte -} - -// Bucket is sub-store interface. -type Bucket interface { - Get(key []byte) ([]byte, error) - Set(key, value []byte) error - Del(key []byte) error - Has(key []byte) bool - Size() int64 - List() ([][]byte, error) - Iterate(FilterHandler) error - // Steam can be implemented by badger.Stream, but not for now - // Stream(ctx context.Context, key []byte, cb func(io.ReadWriter) error) error - Close() error -} - -var ( - // ErrNilFilterHandler when FilterHandler is empty - ErrNilFilterHandler = errors.New("handler can't be nil") - - // ErrNotFound is returned by key-value storage methods - // that could not find element by key. - ErrNotFound = errors.New("key not found") -) - -// ErrIteratingAborted is returned by storage iterator -// after iteration has been interrupted. -var ErrIteratingAborted = errors.New("iteration aborted") diff --git a/pkg/local_object_storage/bucket/fsbucket/bucket.go b/pkg/local_object_storage/bucket/fsbucket/bucket.go deleted file mode 100644 index ea33a7933..000000000 --- a/pkg/local_object_storage/bucket/fsbucket/bucket.go +++ /dev/null @@ -1,100 +0,0 @@ -package fsbucket - -import ( - "os" - - "github.com/mr-tron/base58" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - "github.com/pkg/errors" - "github.com/spf13/viper" - "go.uber.org/atomic" -) - -type ( - Bucket struct { - dir string - perm os.FileMode - } - - treeBucket struct { - dir string - perm os.FileMode - - depth int - prefixLength int - sz *atomic.Int64 - } -) - -const Name = "filesystem" - -const ( - defaultDirectory = "fsbucket" - defaultPermissions = 0755 - defaultDepth = 2 - defaultPrefixLen = 2 -) - -var errShortKey = errors.New("key is too short for tree fs bucket") - -func stringifyKey(key []byte) string { - return base58.Encode(key) -} - -func decodeKey(key string) []byte { - k, err := base58.Decode(key) - if err != nil { - panic(err) // it can fail only for not base58 strings - } - - return k -} - -// NewBucket creates new file system bucket instance. -func NewBucket(prefix string, v *viper.Viper) (bucket.Bucket, error) { - prefix = prefix + "." + Name - var ( - dir string - perm os.FileMode - - prefixLen int - depth int - ) - - if dir = v.GetString(prefix + ".directory"); dir == "" { - dir = defaultDirectory - } - - if perm = os.FileMode(v.GetInt(prefix + ".permissions")); perm == 0 { - perm = defaultPermissions - } - - if depth = v.GetInt(prefix + ".depth"); depth <= 0 { - depth = defaultDepth - } - - if prefixLen = v.GetInt(prefix + ".prefix_len"); prefixLen <= 0 { - prefixLen = defaultPrefixLen - } - - if err := os.MkdirAll(dir, perm); err != nil { - return nil, errors.Wrapf(err, "could not create bucket %s", Name) - } - - if v.GetBool(prefix + ".tree_enabled") { - b := &treeBucket{ - dir: dir, - perm: perm, - depth: depth, - prefixLength: prefixLen, - } - b.sz = atomic.NewInt64(b.size()) - - return b, nil - } - - return &Bucket{ - dir: dir, - perm: perm, - }, nil -} diff --git a/pkg/local_object_storage/bucket/fsbucket/methods.go b/pkg/local_object_storage/bucket/fsbucket/methods.go deleted file mode 100644 index 1dd2aea92..000000000 --- a/pkg/local_object_storage/bucket/fsbucket/methods.go +++ /dev/null @@ -1,107 +0,0 @@ -package fsbucket - -import ( - "io/ioutil" - "os" - "path" - "path/filepath" - - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" -) - -// Get value by key. -func (b *Bucket) Get(key []byte) ([]byte, error) { - p := path.Join(b.dir, stringifyKey(key)) - if _, err := os.Stat(p); os.IsNotExist(err) { - return nil, bucket.ErrNotFound - } - - return ioutil.ReadFile(p) -} - -// Set value by key. -func (b *Bucket) Set(key, value []byte) error { - p := path.Join(b.dir, stringifyKey(key)) - - return ioutil.WriteFile(p, value, b.perm) -} - -// Del value by key. -func (b *Bucket) Del(key []byte) error { - p := path.Join(b.dir, stringifyKey(key)) - if _, err := os.Stat(p); os.IsNotExist(err) { - return bucket.ErrNotFound - } - - return os.Remove(p) -} - -// Has checks key exists. -func (b *Bucket) Has(key []byte) bool { - p := path.Join(b.dir, stringifyKey(key)) - _, err := os.Stat(p) - - return err == nil -} - -func listing(root string, fn func(path string, info os.FileInfo) error) error { - return filepath.Walk(root, func(p string, info os.FileInfo, err error) error { - if err != nil || info.IsDir() { - return err - } - - if fn == nil { - return nil - } - - return fn(p, info) - }) -} - -// Size of bucket. -func (b *Bucket) Size() (size int64) { - err := listing(b.dir, func(_ string, info os.FileInfo) error { - size += info.Size() - return nil - }) - - if err != nil { - size = 0 - } - - return -} - -// List all bucket items. -func (b *Bucket) List() ([][]byte, error) { - buckets := make([][]byte, 0) - - err := listing(b.dir, func(p string, info os.FileInfo) error { - buckets = append(buckets, decodeKey(info.Name())) - return nil - }) - - return buckets, err -} - -// Filter bucket items by closure. -func (b *Bucket) Iterate(handler bucket.FilterHandler) error { - return listing(b.dir, func(p string, info os.FileInfo) error { - key := decodeKey(info.Name()) - val, err := ioutil.ReadFile(p) - if err != nil { - return err - } - - if !handler(key, val) { - return bucket.ErrIteratingAborted - } - - return nil - }) -} - -// Close bucket (just empty). -func (b *Bucket) Close() error { - return os.RemoveAll(b.dir) -} diff --git a/pkg/local_object_storage/bucket/fsbucket/queue.go b/pkg/local_object_storage/bucket/fsbucket/queue.go deleted file mode 100644 index e2b036162..000000000 --- a/pkg/local_object_storage/bucket/fsbucket/queue.go +++ /dev/null @@ -1,44 +0,0 @@ -package fsbucket - -import "sync" - -type ( - queue struct { - *sync.RWMutex - buf []elem - } - - elem struct { - depth int - prefix string - path string - } -) - -func newQueue(n int) *queue { - return &queue{ - RWMutex: new(sync.RWMutex), - buf: make([]elem, 0, n), - } -} - -func (q *queue) Len() int { - return len(q.buf) -} - -func (q *queue) Push(s elem) { - q.Lock() - q.buf = append(q.buf, s) - q.Unlock() -} - -func (q *queue) Pop() (s elem) { - q.Lock() - if len(q.buf) > 0 { - s = q.buf[0] - q.buf = q.buf[1:] - } - q.Unlock() - - return -} diff --git a/pkg/local_object_storage/bucket/fsbucket/treemethods.go b/pkg/local_object_storage/bucket/fsbucket/treemethods.go deleted file mode 100644 index b427e0c72..000000000 --- a/pkg/local_object_storage/bucket/fsbucket/treemethods.go +++ /dev/null @@ -1,261 +0,0 @@ -package fsbucket - -import ( - "encoding/hex" - "io/ioutil" - "os" - "path" - "strings" - - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" -) - -const queueCap = 1000 - -func stringifyHexKey(key []byte) string { - return hex.EncodeToString(key) -} - -func decodeHexKey(key string) ([]byte, error) { - k, err := hex.DecodeString(key) - if err != nil { - return nil, err - } - - return k, nil -} - -// treePath returns slice of the dir names that contain the path -// and filename, e.g. 0xabcdef => []string{"ab", "cd"}, "abcdef". -// In case of errors - return nil slice. -func (b *treeBucket) treePath(key []byte) ([]string, string) { - filename := stringifyHexKey(key) - if len(filename) <= b.prefixLength*b.depth { - return nil, filename - } - - filepath := filename - dirs := make([]string, 0, b.depth) - - for i := 0; i < b.depth; i++ { - dirs = append(dirs, filepath[:b.prefixLength]) - filepath = filepath[b.prefixLength:] - } - - return dirs, filename -} - -// Get value by key. -func (b *treeBucket) Get(key []byte) ([]byte, error) { - dirPaths, filename := b.treePath(key) - if dirPaths == nil { - return nil, errShortKey - } - - p := path.Join(b.dir, path.Join(dirPaths...), filename) - - if _, err := os.Stat(p); os.IsNotExist(err) { - return nil, bucket.ErrNotFound - } - - return ioutil.ReadFile(p) -} - -// Set value by key. -func (b *treeBucket) Set(key, value []byte) error { - dirPaths, filename := b.treePath(key) - if dirPaths == nil { - return errShortKey - } - - var ( - dirPath = path.Join(dirPaths...) - p = path.Join(b.dir, dirPath, filename) - ) - - if err := os.MkdirAll(path.Join(b.dir, dirPath), b.perm); err != nil { - return err - } - - err := ioutil.WriteFile(p, value, b.perm) - if err == nil { - b.sz.Add(int64(len(value))) - } - - return err -} - -// Del value by key. -func (b *treeBucket) Del(key []byte) error { - dirPaths, filename := b.treePath(key) - if dirPaths == nil { - return errShortKey - } - - var ( - err error - fi os.FileInfo - p = path.Join(b.dir, path.Join(dirPaths...), filename) - ) - - if fi, err = os.Stat(p); os.IsNotExist(err) { - return bucket.ErrNotFound - } else if err = os.Remove(p); err == nil { - b.sz.Sub(fi.Size()) - } - - return err -} - -// Has checks if key exists. -func (b *treeBucket) Has(key []byte) bool { - dirPaths, filename := b.treePath(key) - if dirPaths == nil { - return false - } - - p := path.Join(b.dir, path.Join(dirPaths...), filename) - - _, err := os.Stat(p) - - return err == nil -} - -// There might be two implementation of listing method: simple with `filepath.Walk()` -// or more complex implementation with path checks, BFS etc. `filepath.Walk()` might -// be slow in large dirs due to sorting operations and non controllable depth. -func (b *treeBucket) listing(root string, fn func(path string, info os.FileInfo) error) error { - // todo: DFS might be better since it won't store many files in queue. - // todo: queue length can be specified as a parameter - q := newQueue(queueCap) - q.Push(elem{path: root}) - - for q.Len() > 0 { - e := q.Pop() - - s, err := os.Lstat(e.path) - if err != nil { - // might be better to log and ignore - return err - } - - // check if it is correct file - if !s.IsDir() { - // we accept files that located in excepted depth and have correct prefix - // e.g. file 'abcdef0123' => /ab/cd/abcdef0123 - if e.depth == b.depth+1 && strings.HasPrefix(s.Name(), e.prefix) { - err = fn(e.path, s) - if err != nil { - // might be better to log and ignore - return err - } - } - - continue - } - - // ignore dirs with inappropriate length or depth - if e.depth > b.depth || (e.depth > 0 && len(s.Name()) > b.prefixLength) { - continue - } - - files, err := readDirNames(e.path) - if err != nil { - // might be better to log and ignore - return err - } - - for i := range files { - // add prefix of all dirs in path except root dir - var prefix string - if e.depth > 0 { - prefix = e.prefix + s.Name() - } - - q.Push(elem{ - depth: e.depth + 1, - prefix: prefix, - path: path.Join(e.path, files[i]), - }) - } - } - - return nil -} - -// Size returns the size of the bucket in bytes. -func (b *treeBucket) Size() int64 { - return b.sz.Load() -} - -func (b *treeBucket) size() (size int64) { - err := b.listing(b.dir, func(_ string, info os.FileInfo) error { - size += info.Size() - return nil - }) - - if err != nil { - size = 0 - } - - return -} - -// List all bucket items. -func (b *treeBucket) List() ([][]byte, error) { - buckets := make([][]byte, 0) - - err := b.listing(b.dir, func(p string, info os.FileInfo) error { - key, err := decodeHexKey(info.Name()) - if err != nil { - return err - } - buckets = append(buckets, key) - return nil - }) - - return buckets, err -} - -// Filter bucket items by closure. -func (b *treeBucket) Iterate(handler bucket.FilterHandler) error { - return b.listing(b.dir, func(p string, info os.FileInfo) error { - val, err := ioutil.ReadFile(p) - if err != nil { - return err - } - - key, err := decodeHexKey(info.Name()) - if err != nil { - return err - } - - if !handler(key, val) { - return bucket.ErrIteratingAborted - } - - return nil - }) -} - -// Close bucket (remove all available data). -func (b *treeBucket) Close() error { - return os.RemoveAll(b.dir) -} - -// readDirNames copies `filepath.readDirNames()` without sorting the output. -func readDirNames(dirname string) ([]string, error) { - f, err := os.Open(dirname) - if err != nil { - return nil, err - } - - names, err := f.Readdirnames(-1) - if err != nil { - return nil, err - } - - f.Close() - - return names, nil -} diff --git a/pkg/local_object_storage/bucket/fsbucket/treemethods_test.go b/pkg/local_object_storage/bucket/fsbucket/treemethods_test.go deleted file mode 100644 index 402fcf00d..000000000 --- a/pkg/local_object_storage/bucket/fsbucket/treemethods_test.go +++ /dev/null @@ -1,323 +0,0 @@ -package fsbucket - -import ( - "crypto/rand" - "crypto/sha256" - "encoding/hex" - "io/ioutil" - "os" - "path" - "path/filepath" - "strings" - "testing" - - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" -) - -func prepareTree(badFiles bool) (string, error) { - name := make([]byte, 32) - root, err := ioutil.TempDir("", "treeBucket_test") - if err != nil { - return "", err - } - - // paths must contain strings with hex ascii symbols - paths := [][]string{ - {root, "abcd"}, - {root, "abcd", "cdef"}, - {root, "abcd", "cd01"}, - {root, "0123", "2345"}, - {root, "0123", "2345", "4567"}, - } - - dirs := make([]string, len(paths)) - - for i := range paths { - dirs[i] = path.Join(paths[i]...) - - err = os.MkdirAll(dirs[i], 0700) - if err != nil { - return "", err - } - - // create couple correct files - for j := 0; j < 2; j++ { - _, err := rand.Read(name) - if err != nil { - return "", err - } - - filePrefix := new(strings.Builder) - for k := 1; k < len(paths[i]); k++ { - filePrefix.WriteString(paths[i][k]) - } - filePrefix.WriteString(hex.EncodeToString(name)) - - file, err := os.OpenFile(path.Join(dirs[i], filePrefix.String()), os.O_CREATE, 0700) - if err != nil { - return "", err - } - file.Close() - } - - if !badFiles { - continue - } - - // create one bad file - _, err := rand.Read(name) - if err != nil { - return "", err - } - - file, err := os.OpenFile(path.Join(dirs[i], "fff"+hex.EncodeToString(name)), os.O_CREATE, 0700) - if err != nil { - return "", err - } - file.Close() - } - - return root, nil -} - -func TestTreebucket_List(t *testing.T) { - root, err := prepareTree(true) - require.NoError(t, err) - defer os.RemoveAll(root) - - b := treeBucket{ - dir: root, - perm: 0700, - depth: 1, - prefixLength: 4, - } - results, err := b.List() - require.NoError(t, err) - require.Len(t, results, 2) - - b.depth = 2 - results, err = b.List() - require.NoError(t, err) - require.Len(t, results, 6) - - b.depth = 3 - results, err = b.List() - require.NoError(t, err) - require.Len(t, results, 2) - - b.depth = 4 - results, err = b.List() - require.NoError(t, err) - require.Len(t, results, 0) -} - -func TestTreebucket(t *testing.T) { - root, err := prepareTree(true) - require.NoError(t, err) - defer os.RemoveAll(root) - - b := treeBucket{ - dir: root, - perm: 0700, - depth: 2, - prefixLength: 4, - sz: atomic.NewInt64(0), - } - - results, err := b.List() - require.NoError(t, err) - require.Len(t, results, 6) - - t.Run("Get", func(t *testing.T) { - for i := range results { - _, err = b.Get(results[i]) - require.NoError(t, err) - } - _, err = b.Get([]byte("Hello world!")) - require.Error(t, err) - }) - - t.Run("Has", func(t *testing.T) { - for i := range results { - require.True(t, b.Has(results[i])) - } - require.False(t, b.Has([]byte("Unknown key"))) - }) - - t.Run("Set", func(t *testing.T) { - keyHash := sha256.Sum256([]byte("Set this key")) - key := keyHash[:] - value := make([]byte, 32) - rand.Read(value) - - // set sha256 key - err := b.Set(key, value) - require.NoError(t, err) - - require.True(t, b.Has(key)) - data, err := b.Get(key) - require.NoError(t, err) - require.Equal(t, data, value) - - filename := hex.EncodeToString(key) - _, err = os.Lstat(path.Join(root, filename[:4], filename[4:8], filename)) - require.NoError(t, err) - - // set key that cannot be placed in the required dir depth - key, err = hex.DecodeString("abcdef") - require.NoError(t, err) - - err = b.Set(key, value) - require.Error(t, err) - }) - - t.Run("Delete", func(t *testing.T) { - keyHash := sha256.Sum256([]byte("Delete this key")) - key := keyHash[:] - value := make([]byte, 32) - rand.Read(value) - - err := b.Set(key, value) - require.NoError(t, err) - - // delete sha256 key - err = b.Del(key) - require.NoError(t, err) - - _, err = b.Get(key) - require.Error(t, err) - filename := hex.EncodeToString(key) - _, err = os.Lstat(path.Join(root, filename[:4], filename[4:8], filename)) - require.Error(t, err) - }) -} - -func TestTreebucket_Close(t *testing.T) { - root, err := prepareTree(true) - require.NoError(t, err) - defer os.RemoveAll(root) - - b := treeBucket{ - dir: root, - perm: 0700, - depth: 2, - prefixLength: 4, - } - err = b.Close() - require.NoError(t, err) - - _, err = os.Lstat(root) - require.Error(t, err) -} - -func TestTreebucket_Size(t *testing.T) { - root, err := prepareTree(true) - require.NoError(t, err) - defer os.RemoveAll(root) - - var size int64 = 1024 - key := []byte("Set this key") - value := make([]byte, size) - rand.Read(value) - - b := treeBucket{ - dir: root, - perm: 0700, - depth: 2, - prefixLength: 4, - sz: atomic.NewInt64(0), - } - - err = b.Set(key, value) - require.NoError(t, err) - require.Equal(t, size, b.Size()) -} - -func BenchmarkTreebucket_List(b *testing.B) { - root, err := prepareTree(false) - defer os.RemoveAll(root) - if err != nil { - b.Error(err) - } - - treeFSBucket := &treeBucket{ - dir: root, - perm: 0755, - depth: 2, - prefixLength: 4, - } - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _, err := treeFSBucket.List() - if err != nil { - b.Error(err) - } - } -} - -func BenchmarkFilewalkBucket_List(b *testing.B) { - root, err := prepareTree(false) - defer os.RemoveAll(root) - if err != nil { - b.Error(err) - } - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - buckets := make([]bucket.BucketItem, 0) - - filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil || info.IsDir() { - return nil - } - - val, err := ioutil.ReadFile(path) - if err != nil { - return err - } - - key, err := decodeHexKey(info.Name()) - if err != nil { - return err - } - - buckets = append(buckets, bucket.BucketItem{ - Key: key, - Val: val, - }) - - return nil - }) - } -} - -func BenchmarkTreeBucket_Size(b *testing.B) { - root, err := prepareTree(false) - defer os.RemoveAll(root) - if err != nil { - b.Error(err) - } - - treeFSBucket := &treeBucket{ - dir: root, - perm: 0755, - depth: 2, - prefixLength: 4, - } - - treeFSBucket.sz = atomic.NewInt64(treeFSBucket.size()) - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = treeFSBucket.Size() - } -}