[#302] Remove unused FSBucket component

FSBucket became obsolete when storage object engine has
been implemented.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-01-11 19:27:44 +03:00 committed by Alex Vanin
parent b62b19cb5a
commit 1d56e60589
6 changed files with 0 additions and 876 deletions

View file

@ -1,41 +0,0 @@
package bucket
import (
"errors"
)
// FilterHandler where you receive key/val in your closure.
type FilterHandler func(key, val []byte) bool
// BucketItem used in filter.
type BucketItem struct {
Key []byte
Val []byte
}
// Bucket is sub-store interface.
type Bucket interface {
Get(key []byte) ([]byte, error)
Set(key, value []byte) error
Del(key []byte) error
Has(key []byte) bool
Size() int64
List() ([][]byte, error)
Iterate(FilterHandler) error
// Steam can be implemented by badger.Stream, but not for now
// Stream(ctx context.Context, key []byte, cb func(io.ReadWriter) error) error
Close() error
}
var (
// ErrNilFilterHandler when FilterHandler is empty
ErrNilFilterHandler = errors.New("handler can't be nil")
// ErrNotFound is returned by key-value storage methods
// that could not find element by key.
ErrNotFound = errors.New("key not found")
)
// ErrIteratingAborted is returned by storage iterator
// after iteration has been interrupted.
var ErrIteratingAborted = errors.New("iteration aborted")

View file

@ -1,100 +0,0 @@
package fsbucket
import (
"os"
"github.com/mr-tron/base58"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/atomic"
)
type (
Bucket struct {
dir string
perm os.FileMode
}
treeBucket struct {
dir string
perm os.FileMode
depth int
prefixLength int
sz *atomic.Int64
}
)
const Name = "filesystem"
const (
defaultDirectory = "fsbucket"
defaultPermissions = 0755
defaultDepth = 2
defaultPrefixLen = 2
)
var errShortKey = errors.New("key is too short for tree fs bucket")
func stringifyKey(key []byte) string {
return base58.Encode(key)
}
func decodeKey(key string) []byte {
k, err := base58.Decode(key)
if err != nil {
panic(err) // it can fail only for not base58 strings
}
return k
}
// NewBucket creates new file system bucket instance.
func NewBucket(prefix string, v *viper.Viper) (bucket.Bucket, error) {
prefix = prefix + "." + Name
var (
dir string
perm os.FileMode
prefixLen int
depth int
)
if dir = v.GetString(prefix + ".directory"); dir == "" {
dir = defaultDirectory
}
if perm = os.FileMode(v.GetInt(prefix + ".permissions")); perm == 0 {
perm = defaultPermissions
}
if depth = v.GetInt(prefix + ".depth"); depth <= 0 {
depth = defaultDepth
}
if prefixLen = v.GetInt(prefix + ".prefix_len"); prefixLen <= 0 {
prefixLen = defaultPrefixLen
}
if err := os.MkdirAll(dir, perm); err != nil {
return nil, errors.Wrapf(err, "could not create bucket %s", Name)
}
if v.GetBool(prefix + ".tree_enabled") {
b := &treeBucket{
dir: dir,
perm: perm,
depth: depth,
prefixLength: prefixLen,
}
b.sz = atomic.NewInt64(b.size())
return b, nil
}
return &Bucket{
dir: dir,
perm: perm,
}, nil
}

View file

@ -1,107 +0,0 @@
package fsbucket
import (
"io/ioutil"
"os"
"path"
"path/filepath"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
)
// Get value by key.
func (b *Bucket) Get(key []byte) ([]byte, error) {
p := path.Join(b.dir, stringifyKey(key))
if _, err := os.Stat(p); os.IsNotExist(err) {
return nil, bucket.ErrNotFound
}
return ioutil.ReadFile(p)
}
// Set value by key.
func (b *Bucket) Set(key, value []byte) error {
p := path.Join(b.dir, stringifyKey(key))
return ioutil.WriteFile(p, value, b.perm)
}
// Del value by key.
func (b *Bucket) Del(key []byte) error {
p := path.Join(b.dir, stringifyKey(key))
if _, err := os.Stat(p); os.IsNotExist(err) {
return bucket.ErrNotFound
}
return os.Remove(p)
}
// Has checks key exists.
func (b *Bucket) Has(key []byte) bool {
p := path.Join(b.dir, stringifyKey(key))
_, err := os.Stat(p)
return err == nil
}
func listing(root string, fn func(path string, info os.FileInfo) error) error {
return filepath.Walk(root, func(p string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return err
}
if fn == nil {
return nil
}
return fn(p, info)
})
}
// Size of bucket.
func (b *Bucket) Size() (size int64) {
err := listing(b.dir, func(_ string, info os.FileInfo) error {
size += info.Size()
return nil
})
if err != nil {
size = 0
}
return
}
// List all bucket items.
func (b *Bucket) List() ([][]byte, error) {
buckets := make([][]byte, 0)
err := listing(b.dir, func(p string, info os.FileInfo) error {
buckets = append(buckets, decodeKey(info.Name()))
return nil
})
return buckets, err
}
// Filter bucket items by closure.
func (b *Bucket) Iterate(handler bucket.FilterHandler) error {
return listing(b.dir, func(p string, info os.FileInfo) error {
key := decodeKey(info.Name())
val, err := ioutil.ReadFile(p)
if err != nil {
return err
}
if !handler(key, val) {
return bucket.ErrIteratingAborted
}
return nil
})
}
// Close bucket (just empty).
func (b *Bucket) Close() error {
return os.RemoveAll(b.dir)
}

View file

@ -1,44 +0,0 @@
package fsbucket
import "sync"
type (
queue struct {
*sync.RWMutex
buf []elem
}
elem struct {
depth int
prefix string
path string
}
)
func newQueue(n int) *queue {
return &queue{
RWMutex: new(sync.RWMutex),
buf: make([]elem, 0, n),
}
}
func (q *queue) Len() int {
return len(q.buf)
}
func (q *queue) Push(s elem) {
q.Lock()
q.buf = append(q.buf, s)
q.Unlock()
}
func (q *queue) Pop() (s elem) {
q.Lock()
if len(q.buf) > 0 {
s = q.buf[0]
q.buf = q.buf[1:]
}
q.Unlock()
return
}

View file

@ -1,261 +0,0 @@
package fsbucket
import (
"encoding/hex"
"io/ioutil"
"os"
"path"
"strings"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
)
const queueCap = 1000
func stringifyHexKey(key []byte) string {
return hex.EncodeToString(key)
}
func decodeHexKey(key string) ([]byte, error) {
k, err := hex.DecodeString(key)
if err != nil {
return nil, err
}
return k, nil
}
// treePath returns slice of the dir names that contain the path
// and filename, e.g. 0xabcdef => []string{"ab", "cd"}, "abcdef".
// In case of errors - return nil slice.
func (b *treeBucket) treePath(key []byte) ([]string, string) {
filename := stringifyHexKey(key)
if len(filename) <= b.prefixLength*b.depth {
return nil, filename
}
filepath := filename
dirs := make([]string, 0, b.depth)
for i := 0; i < b.depth; i++ {
dirs = append(dirs, filepath[:b.prefixLength])
filepath = filepath[b.prefixLength:]
}
return dirs, filename
}
// Get value by key.
func (b *treeBucket) Get(key []byte) ([]byte, error) {
dirPaths, filename := b.treePath(key)
if dirPaths == nil {
return nil, errShortKey
}
p := path.Join(b.dir, path.Join(dirPaths...), filename)
if _, err := os.Stat(p); os.IsNotExist(err) {
return nil, bucket.ErrNotFound
}
return ioutil.ReadFile(p)
}
// Set value by key.
func (b *treeBucket) Set(key, value []byte) error {
dirPaths, filename := b.treePath(key)
if dirPaths == nil {
return errShortKey
}
var (
dirPath = path.Join(dirPaths...)
p = path.Join(b.dir, dirPath, filename)
)
if err := os.MkdirAll(path.Join(b.dir, dirPath), b.perm); err != nil {
return err
}
err := ioutil.WriteFile(p, value, b.perm)
if err == nil {
b.sz.Add(int64(len(value)))
}
return err
}
// Del value by key.
func (b *treeBucket) Del(key []byte) error {
dirPaths, filename := b.treePath(key)
if dirPaths == nil {
return errShortKey
}
var (
err error
fi os.FileInfo
p = path.Join(b.dir, path.Join(dirPaths...), filename)
)
if fi, err = os.Stat(p); os.IsNotExist(err) {
return bucket.ErrNotFound
} else if err = os.Remove(p); err == nil {
b.sz.Sub(fi.Size())
}
return err
}
// Has checks if key exists.
func (b *treeBucket) Has(key []byte) bool {
dirPaths, filename := b.treePath(key)
if dirPaths == nil {
return false
}
p := path.Join(b.dir, path.Join(dirPaths...), filename)
_, err := os.Stat(p)
return err == nil
}
// There might be two implementation of listing method: simple with `filepath.Walk()`
// or more complex implementation with path checks, BFS etc. `filepath.Walk()` might
// be slow in large dirs due to sorting operations and non controllable depth.
func (b *treeBucket) listing(root string, fn func(path string, info os.FileInfo) error) error {
// todo: DFS might be better since it won't store many files in queue.
// todo: queue length can be specified as a parameter
q := newQueue(queueCap)
q.Push(elem{path: root})
for q.Len() > 0 {
e := q.Pop()
s, err := os.Lstat(e.path)
if err != nil {
// might be better to log and ignore
return err
}
// check if it is correct file
if !s.IsDir() {
// we accept files that located in excepted depth and have correct prefix
// e.g. file 'abcdef0123' => /ab/cd/abcdef0123
if e.depth == b.depth+1 && strings.HasPrefix(s.Name(), e.prefix) {
err = fn(e.path, s)
if err != nil {
// might be better to log and ignore
return err
}
}
continue
}
// ignore dirs with inappropriate length or depth
if e.depth > b.depth || (e.depth > 0 && len(s.Name()) > b.prefixLength) {
continue
}
files, err := readDirNames(e.path)
if err != nil {
// might be better to log and ignore
return err
}
for i := range files {
// add prefix of all dirs in path except root dir
var prefix string
if e.depth > 0 {
prefix = e.prefix + s.Name()
}
q.Push(elem{
depth: e.depth + 1,
prefix: prefix,
path: path.Join(e.path, files[i]),
})
}
}
return nil
}
// Size returns the size of the bucket in bytes.
func (b *treeBucket) Size() int64 {
return b.sz.Load()
}
func (b *treeBucket) size() (size int64) {
err := b.listing(b.dir, func(_ string, info os.FileInfo) error {
size += info.Size()
return nil
})
if err != nil {
size = 0
}
return
}
// List all bucket items.
func (b *treeBucket) List() ([][]byte, error) {
buckets := make([][]byte, 0)
err := b.listing(b.dir, func(p string, info os.FileInfo) error {
key, err := decodeHexKey(info.Name())
if err != nil {
return err
}
buckets = append(buckets, key)
return nil
})
return buckets, err
}
// Filter bucket items by closure.
func (b *treeBucket) Iterate(handler bucket.FilterHandler) error {
return b.listing(b.dir, func(p string, info os.FileInfo) error {
val, err := ioutil.ReadFile(p)
if err != nil {
return err
}
key, err := decodeHexKey(info.Name())
if err != nil {
return err
}
if !handler(key, val) {
return bucket.ErrIteratingAborted
}
return nil
})
}
// Close bucket (remove all available data).
func (b *treeBucket) Close() error {
return os.RemoveAll(b.dir)
}
// readDirNames copies `filepath.readDirNames()` without sorting the output.
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
names, err := f.Readdirnames(-1)
if err != nil {
return nil, err
}
f.Close()
return names, nil
}

View file

@ -1,323 +0,0 @@
package fsbucket
import (
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"testing"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
func prepareTree(badFiles bool) (string, error) {
name := make([]byte, 32)
root, err := ioutil.TempDir("", "treeBucket_test")
if err != nil {
return "", err
}
// paths must contain strings with hex ascii symbols
paths := [][]string{
{root, "abcd"},
{root, "abcd", "cdef"},
{root, "abcd", "cd01"},
{root, "0123", "2345"},
{root, "0123", "2345", "4567"},
}
dirs := make([]string, len(paths))
for i := range paths {
dirs[i] = path.Join(paths[i]...)
err = os.MkdirAll(dirs[i], 0700)
if err != nil {
return "", err
}
// create couple correct files
for j := 0; j < 2; j++ {
_, err := rand.Read(name)
if err != nil {
return "", err
}
filePrefix := new(strings.Builder)
for k := 1; k < len(paths[i]); k++ {
filePrefix.WriteString(paths[i][k])
}
filePrefix.WriteString(hex.EncodeToString(name))
file, err := os.OpenFile(path.Join(dirs[i], filePrefix.String()), os.O_CREATE, 0700)
if err != nil {
return "", err
}
file.Close()
}
if !badFiles {
continue
}
// create one bad file
_, err := rand.Read(name)
if err != nil {
return "", err
}
file, err := os.OpenFile(path.Join(dirs[i], "fff"+hex.EncodeToString(name)), os.O_CREATE, 0700)
if err != nil {
return "", err
}
file.Close()
}
return root, nil
}
func TestTreebucket_List(t *testing.T) {
root, err := prepareTree(true)
require.NoError(t, err)
defer os.RemoveAll(root)
b := treeBucket{
dir: root,
perm: 0700,
depth: 1,
prefixLength: 4,
}
results, err := b.List()
require.NoError(t, err)
require.Len(t, results, 2)
b.depth = 2
results, err = b.List()
require.NoError(t, err)
require.Len(t, results, 6)
b.depth = 3
results, err = b.List()
require.NoError(t, err)
require.Len(t, results, 2)
b.depth = 4
results, err = b.List()
require.NoError(t, err)
require.Len(t, results, 0)
}
func TestTreebucket(t *testing.T) {
root, err := prepareTree(true)
require.NoError(t, err)
defer os.RemoveAll(root)
b := treeBucket{
dir: root,
perm: 0700,
depth: 2,
prefixLength: 4,
sz: atomic.NewInt64(0),
}
results, err := b.List()
require.NoError(t, err)
require.Len(t, results, 6)
t.Run("Get", func(t *testing.T) {
for i := range results {
_, err = b.Get(results[i])
require.NoError(t, err)
}
_, err = b.Get([]byte("Hello world!"))
require.Error(t, err)
})
t.Run("Has", func(t *testing.T) {
for i := range results {
require.True(t, b.Has(results[i]))
}
require.False(t, b.Has([]byte("Unknown key")))
})
t.Run("Set", func(t *testing.T) {
keyHash := sha256.Sum256([]byte("Set this key"))
key := keyHash[:]
value := make([]byte, 32)
rand.Read(value)
// set sha256 key
err := b.Set(key, value)
require.NoError(t, err)
require.True(t, b.Has(key))
data, err := b.Get(key)
require.NoError(t, err)
require.Equal(t, data, value)
filename := hex.EncodeToString(key)
_, err = os.Lstat(path.Join(root, filename[:4], filename[4:8], filename))
require.NoError(t, err)
// set key that cannot be placed in the required dir depth
key, err = hex.DecodeString("abcdef")
require.NoError(t, err)
err = b.Set(key, value)
require.Error(t, err)
})
t.Run("Delete", func(t *testing.T) {
keyHash := sha256.Sum256([]byte("Delete this key"))
key := keyHash[:]
value := make([]byte, 32)
rand.Read(value)
err := b.Set(key, value)
require.NoError(t, err)
// delete sha256 key
err = b.Del(key)
require.NoError(t, err)
_, err = b.Get(key)
require.Error(t, err)
filename := hex.EncodeToString(key)
_, err = os.Lstat(path.Join(root, filename[:4], filename[4:8], filename))
require.Error(t, err)
})
}
func TestTreebucket_Close(t *testing.T) {
root, err := prepareTree(true)
require.NoError(t, err)
defer os.RemoveAll(root)
b := treeBucket{
dir: root,
perm: 0700,
depth: 2,
prefixLength: 4,
}
err = b.Close()
require.NoError(t, err)
_, err = os.Lstat(root)
require.Error(t, err)
}
func TestTreebucket_Size(t *testing.T) {
root, err := prepareTree(true)
require.NoError(t, err)
defer os.RemoveAll(root)
var size int64 = 1024
key := []byte("Set this key")
value := make([]byte, size)
rand.Read(value)
b := treeBucket{
dir: root,
perm: 0700,
depth: 2,
prefixLength: 4,
sz: atomic.NewInt64(0),
}
err = b.Set(key, value)
require.NoError(t, err)
require.Equal(t, size, b.Size())
}
func BenchmarkTreebucket_List(b *testing.B) {
root, err := prepareTree(false)
defer os.RemoveAll(root)
if err != nil {
b.Error(err)
}
treeFSBucket := &treeBucket{
dir: root,
perm: 0755,
depth: 2,
prefixLength: 4,
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := treeFSBucket.List()
if err != nil {
b.Error(err)
}
}
}
func BenchmarkFilewalkBucket_List(b *testing.B) {
root, err := prepareTree(false)
defer os.RemoveAll(root)
if err != nil {
b.Error(err)
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
buckets := make([]bucket.BucketItem, 0)
filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return nil
}
val, err := ioutil.ReadFile(path)
if err != nil {
return err
}
key, err := decodeHexKey(info.Name())
if err != nil {
return err
}
buckets = append(buckets, bucket.BucketItem{
Key: key,
Val: val,
})
return nil
})
}
}
func BenchmarkTreeBucket_Size(b *testing.B) {
root, err := prepareTree(false)
defer os.RemoveAll(root)
if err != nil {
b.Error(err)
}
treeFSBucket := &treeBucket{
dir: root,
perm: 0755,
depth: 2,
prefixLength: 4,
}
treeFSBucket.sz = atomic.NewInt64(treeFSBucket.size())
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = treeFSBucket.Size()
}
}