forked from TrueCloudLab/frostfs-node
[#645] blobstor: Add simple blobtree impl
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
306f12e6c5
commit
82a30c0775
17 changed files with 1133 additions and 10 deletions
80
pkg/local_object_storage/blobstor/blobtree/blobtree.go
Normal file
80
pkg/local_object_storage/blobstor/blobtree/blobtree.go
Normal file
|
@ -0,0 +1,80 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
var _ common.Storage = &BlobTree{}
|
||||
|
||||
type BlobTree struct {
|
||||
cfg cfg
|
||||
dirLock *utilSync.KeyLocker[string]
|
||||
fileLock *utilSync.KeyLocker[string]
|
||||
compressor *compression.Config
|
||||
dispatcher *rootDispatcher
|
||||
suffix atomic.Uint64
|
||||
}
|
||||
|
||||
func New(opts ...Option) *BlobTree {
|
||||
b := &BlobTree{
|
||||
cfg: cfg{
|
||||
targetFileSizeBytes: 4 * 1024 * 1024,
|
||||
rootPath: "./",
|
||||
depth: 3,
|
||||
permissions: 0700,
|
||||
initWorkersCount: 1000,
|
||||
},
|
||||
dirLock: utilSync.NewKeyLocker[string](),
|
||||
fileLock: utilSync.NewKeyLocker[string](),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&b.cfg)
|
||||
}
|
||||
|
||||
b.dispatcher = newRootDispatcher()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *BlobTree) getDirectoryPath(addr oid.Address) string {
|
||||
sAddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString()
|
||||
var sb strings.Builder
|
||||
size := int(1+b.cfg.depth*(directoryLength+1)) + len(b.cfg.rootPath) // /path + slash + (character + slash for every level)
|
||||
sb.Grow(size)
|
||||
sb.WriteString(b.cfg.rootPath)
|
||||
|
||||
for i := uint64(0); i < b.cfg.depth; i++ {
|
||||
sb.WriteRune(filepath.Separator)
|
||||
sb.WriteString(sAddr[:directoryLength])
|
||||
sAddr = sAddr[directoryLength:]
|
||||
}
|
||||
|
||||
sb.WriteRune(filepath.Separator)
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func (b *BlobTree) createDir(dir string) error {
|
||||
b.dirLock.Lock(dir)
|
||||
defer b.dirLock.Unlock(dir)
|
||||
|
||||
if err := util.MkdirAllX(dir, b.cfg.permissions); err != nil {
|
||||
if errors.Is(err, syscall.ENOSPC) {
|
||||
err = common.ErrNoSpace
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
14
pkg/local_object_storage/blobstor/blobtree/config.go
Normal file
14
pkg/local_object_storage/blobstor/blobtree/config.go
Normal file
|
@ -0,0 +1,14 @@
|
|||
package blobtree
|
||||
|
||||
import "io/fs"
|
||||
|
||||
var directoryLength uint64 = 1
|
||||
|
||||
type cfg struct {
|
||||
rootPath string
|
||||
depth uint64
|
||||
targetFileSizeBytes uint64
|
||||
permissions fs.FileMode
|
||||
readOnly bool
|
||||
initWorkersCount int
|
||||
}
|
191
pkg/local_object_storage/blobstor/blobtree/content.go
Normal file
191
pkg/local_object_storage/blobstor/blobtree/content.go
Normal file
|
@ -0,0 +1,191 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultVersion = 0
|
||||
|
||||
sizeOfVersion = 1
|
||||
sizeOfCount = 8
|
||||
sizeOfDataLength = 8
|
||||
sizeOfContainerID = sha256.Size
|
||||
sizeOfObjectID = sha256.Size
|
||||
)
|
||||
|
||||
var (
|
||||
errFileToSmall = errors.New("invalid file content: not enough bytes to read count of records")
|
||||
errInvalidFileContentVersion = errors.New("invalid file content: not enough bytes to read record version")
|
||||
errInvalidFileContentContainerID = errors.New("invalid file content: not enough bytes to read container ID")
|
||||
errInvalidFileContentObjectID = errors.New("invalid file content: not enough bytes to read object ID")
|
||||
errInvalidFileContentLength = errors.New("invalid file content: not enough bytes to read data length")
|
||||
errInvalidFileContentData = errors.New("invalid file content: not enough bytes to read data")
|
||||
)
|
||||
|
||||
type objectData struct {
|
||||
Version byte
|
||||
Address oid.Address
|
||||
Data []byte
|
||||
}
|
||||
|
||||
func (b *BlobTree) readFileContent(path string) ([]objectData, error) {
|
||||
rawData, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return []objectData{}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return b.unmarshalSlice(rawData)
|
||||
}
|
||||
|
||||
func (b *BlobTree) unmarshalSlice(data []byte) ([]objectData, error) {
|
||||
if len(data) < sizeOfCount {
|
||||
return nil, errFileToSmall
|
||||
}
|
||||
count := binary.LittleEndian.Uint64(data[:8])
|
||||
result := make([]objectData, 0, count)
|
||||
|
||||
data = data[sizeOfCount:]
|
||||
var idx uint64
|
||||
for idx = 0; idx < count; idx++ {
|
||||
record, read, err := b.unmarshalRecord(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, record)
|
||||
data = data[read:]
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) unmarshalRecord(data []byte) (objectData, uint64, error) {
|
||||
if len(data) < sizeOfVersion {
|
||||
return objectData{}, 0, errInvalidFileContentVersion
|
||||
}
|
||||
var result objectData
|
||||
var read uint64
|
||||
result.Version = data[0]
|
||||
if result.Version != defaultVersion {
|
||||
return objectData{}, 0, fmt.Errorf("invalid file content: unknown version %d", result.Version)
|
||||
}
|
||||
read += sizeOfVersion
|
||||
|
||||
if len(data[read:]) < sizeOfContainerID {
|
||||
return objectData{}, 0, errInvalidFileContentContainerID
|
||||
}
|
||||
var contID cid.ID
|
||||
if err := contID.Decode(data[read : read+sizeOfContainerID]); err != nil {
|
||||
return objectData{}, 0, fmt.Errorf("invalid file content: failed to read container ID: %w", err)
|
||||
}
|
||||
read += sizeOfContainerID
|
||||
|
||||
if len(data[read:]) < sizeOfObjectID {
|
||||
return objectData{}, 0, errInvalidFileContentObjectID
|
||||
}
|
||||
var objID oid.ID
|
||||
if err := objID.Decode(data[read : read+sizeOfObjectID]); err != nil {
|
||||
return objectData{}, 0, fmt.Errorf("invalid file content: failed to read object ID: %w", err)
|
||||
}
|
||||
read += sizeOfObjectID
|
||||
|
||||
result.Address.SetContainer(contID)
|
||||
result.Address.SetObject(objID)
|
||||
|
||||
if len(data[read:]) < sizeOfDataLength {
|
||||
return objectData{}, 0, errInvalidFileContentLength
|
||||
}
|
||||
dataLength := binary.LittleEndian.Uint64(data[read : read+sizeOfDataLength])
|
||||
read += sizeOfDataLength
|
||||
|
||||
if uint64(len(data[read:])) < dataLength {
|
||||
return objectData{}, 0, errInvalidFileContentData
|
||||
}
|
||||
result.Data = make([]byte, dataLength)
|
||||
copy(result.Data, data[read:read+dataLength])
|
||||
read += dataLength
|
||||
|
||||
return result, read, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) saveContentToFile(records []objectData, path string) (uint64, error) {
|
||||
data, err := b.marshalSlice(records)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint64(len(data)), b.writeFile(path, data)
|
||||
}
|
||||
|
||||
func (b *BlobTree) writeFile(p string, data []byte) error {
|
||||
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL|os.O_SYNC, b.cfg.permissions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Write(data)
|
||||
if err1 := f.Close(); err1 != nil && err == nil {
|
||||
err = err1
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *BlobTree) marshalSlice(records []objectData) ([]byte, error) {
|
||||
buf := make([]byte, b.estimateSize(records))
|
||||
result := buf
|
||||
binary.LittleEndian.PutUint64(buf, uint64(len(records)))
|
||||
buf = buf[sizeOfCount:]
|
||||
for _, record := range records {
|
||||
written := b.marshalRecord(record, buf)
|
||||
buf = buf[written:]
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) marshalRecord(record objectData, dst []byte) uint64 {
|
||||
var written uint64
|
||||
|
||||
dst[0] = record.Version
|
||||
dst = dst[sizeOfVersion:]
|
||||
written += sizeOfVersion
|
||||
|
||||
record.Address.Container().Encode(dst)
|
||||
dst = dst[sizeOfContainerID:]
|
||||
written += sizeOfContainerID
|
||||
|
||||
record.Address.Object().Encode(dst)
|
||||
dst = dst[sizeOfObjectID:]
|
||||
written += sizeOfObjectID
|
||||
|
||||
binary.LittleEndian.PutUint64(dst, uint64(len(record.Data)))
|
||||
dst = dst[sizeOfDataLength:]
|
||||
written += sizeOfDataLength
|
||||
|
||||
copy(dst, record.Data)
|
||||
written += uint64(len(record.Data))
|
||||
|
||||
return written
|
||||
}
|
||||
|
||||
func (b *BlobTree) estimateSize(records []objectData) uint64 {
|
||||
var result uint64
|
||||
result += sizeOfCount
|
||||
for _, record := range records {
|
||||
result += (sizeOfVersion + sizeOfContainerID + sizeOfObjectID + sizeOfDataLength)
|
||||
result += uint64(len(record.Data))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (b *BlobTree) getFilePath(dir string, idx uint64) string {
|
||||
return filepath.Join(dir, strconv.FormatUint(idx, 16))
|
||||
}
|
98
pkg/local_object_storage/blobstor/blobtree/control.go
Normal file
98
pkg/local_object_storage/blobstor/blobtree/control.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var Type = "blobtree"
|
||||
|
||||
func (b *BlobTree) Open(readOnly bool) error {
|
||||
b.cfg.readOnly = readOnly
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) Init() error {
|
||||
if err := b.createDir(b.cfg.rootPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var eg errgroup.Group
|
||||
eg.SetLimit(b.cfg.initWorkersCount)
|
||||
eg.Go(func() error {
|
||||
return b.initDir(&eg, b.cfg.rootPath, 0)
|
||||
})
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (b *BlobTree) initDir(eg *errgroup.Group, dir string, depth uint64) error {
|
||||
entities, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, entity := range entities {
|
||||
if depth < b.cfg.depth && entity.IsDir() {
|
||||
eg.Go(func() error {
|
||||
return b.initDir(eg, filepath.Join(dir, entity.Name()), depth+1)
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if depth != b.cfg.depth {
|
||||
continue
|
||||
}
|
||||
|
||||
if b.isTempFile(entity.Name()) {
|
||||
if err = os.Remove(filepath.Join(dir, entity.Name())); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
idx, err := b.parseIdx(entity.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
b.dispatcher.Init(dir, idx)
|
||||
|
||||
stat, err := os.Stat(filepath.Join(dir, entity.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stat.Size() < int64(b.cfg.targetFileSizeBytes) {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) isTempFile(name string) bool {
|
||||
return strings.Contains(name, tempFileSymbols)
|
||||
}
|
||||
|
||||
func (b *BlobTree) parseIdx(name string) (uint64, error) {
|
||||
return strconv.ParseUint(name, 16, 64)
|
||||
}
|
||||
|
||||
func (b *BlobTree) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) Type() string { return Type }
|
||||
func (b *BlobTree) Path() string { return b.cfg.rootPath }
|
||||
|
||||
func (b *BlobTree) SetCompressor(cc *compression.Config) {
|
||||
b.compressor = cc
|
||||
}
|
||||
|
||||
func (b *BlobTree) Compressor() *compression.Config {
|
||||
return b.compressor
|
||||
}
|
||||
|
||||
func (b *BlobTree) SetReportErrorFunc(_ func(string, error)) {}
|
||||
func (b *BlobTree) SetParentID(_ string) {}
|
124
pkg/local_object_storage/blobstor/blobtree/delete.go
Normal file
124
pkg/local_object_storage/blobstor/blobtree/delete.go
Normal file
|
@ -0,0 +1,124 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Delete(_ context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
if b.cfg.readOnly {
|
||||
return common.DeleteRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
if len(prm.StorageID) == storageIDLength {
|
||||
return b.deleteFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID))
|
||||
}
|
||||
return b.findAndDelete(prm.Address)
|
||||
}
|
||||
|
||||
func (b *BlobTree) deleteFromIdx(addr oid.Address, idx uint64) (common.DeleteRes, error) {
|
||||
dir := b.getDirectoryPath(addr)
|
||||
path := b.getFilePath(dir, idx)
|
||||
|
||||
b.fileLock.Lock(path)
|
||||
defer b.fileLock.Unlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
deleteIdx := -1
|
||||
for i := range records {
|
||||
if records[i].Address.Equals(addr) {
|
||||
deleteIdx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if deleteIdx == -1 {
|
||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
if len(records) == 1 {
|
||||
err = os.Remove(path)
|
||||
if err == nil {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
// decrease files metric
|
||||
}
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
records = append(records[:idx], records[idx+1:]...)
|
||||
size, err := b.writeToTmpAndRename(records, path)
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
if size < b.cfg.targetFileSizeBytes {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
}
|
||||
return common.DeleteRes{}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) findAndDelete(addr oid.Address) (common.DeleteRes, error) {
|
||||
dir := b.getDirectoryPath(addr)
|
||||
idx, err := b.findFileIdx(dir, addr)
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
return b.deleteFromIdx(addr, idx)
|
||||
}
|
||||
|
||||
func (b *BlobTree) findFileIdx(dir string, addr oid.Address) (uint64, error) {
|
||||
entities, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return 0, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
for _, entity := range entities {
|
||||
if entity.IsDir() {
|
||||
continue
|
||||
}
|
||||
if b.isTempFile(entity.Name()) {
|
||||
continue
|
||||
}
|
||||
idx, err := b.parseIdx(entity.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
path := b.getFilePath(dir, idx)
|
||||
contains, err := b.fileContainsObject(path, addr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if contains {
|
||||
return idx, nil
|
||||
}
|
||||
}
|
||||
return 0, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *BlobTree) fileContainsObject(path string, addr oid.Address) (bool, error) {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for i := range records {
|
||||
if records[i].Address.Equals(addr) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
94
pkg/local_object_storage/blobstor/blobtree/dispatcher.go
Normal file
94
pkg/local_object_storage/blobstor/blobtree/dispatcher.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type rootDispatcher struct {
|
||||
dispatchers map[string]*dirDispatcher
|
||||
guard sync.Mutex
|
||||
}
|
||||
|
||||
func newRootDispatcher() *rootDispatcher {
|
||||
return &rootDispatcher{
|
||||
dispatchers: make(map[string]*dirDispatcher),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) GetIdxForWrite(dir string) uint64 {
|
||||
return d.getDirDispatcher(dir).GetIdxForWrite()
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) ReturnIdx(dir string, idx uint64) {
|
||||
d.getDirDispatcher(dir).ReturnIdx(idx)
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) Init(dir string, idx uint64) {
|
||||
d.getDirDispatcher(dir).Init(idx)
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) getDirDispatcher(dir string) *dirDispatcher {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
if result, ok := d.dispatchers[dir]; ok {
|
||||
return result
|
||||
}
|
||||
|
||||
result := newDirDispatcher(dir)
|
||||
d.dispatchers[dir] = result
|
||||
return result
|
||||
}
|
||||
|
||||
type dirDispatcher struct {
|
||||
dir string
|
||||
guard sync.Mutex
|
||||
indicies map[uint64]struct{}
|
||||
nextIndex uint64
|
||||
}
|
||||
|
||||
func newDirDispatcher(dir string) *dirDispatcher {
|
||||
return &dirDispatcher{
|
||||
dir: dir,
|
||||
indicies: make(map[uint64]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dirDispatcher) GetIdxForWrite() uint64 {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
var result uint64
|
||||
var found bool
|
||||
|
||||
for idx := range d.indicies {
|
||||
result = idx
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
if found {
|
||||
delete(d.indicies, result)
|
||||
return result
|
||||
}
|
||||
|
||||
result = d.nextIndex
|
||||
d.nextIndex++
|
||||
return result
|
||||
}
|
||||
|
||||
func (d *dirDispatcher) ReturnIdx(idx uint64) {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
d.indicies[idx] = struct{}{}
|
||||
}
|
||||
|
||||
func (d *dirDispatcher) Init(idx uint64) {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
if d.nextIndex <= idx {
|
||||
d.nextIndex = idx + 1
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDispatcher(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newRootDispatcher()
|
||||
idx := d.GetIdxForWrite("/dir1")
|
||||
require.Equal(t, uint64(0), idx)
|
||||
d.ReturnIdx("/dir1", idx)
|
||||
|
||||
idx = d.GetIdxForWrite("/dir1")
|
||||
require.Equal(t, uint64(0), idx)
|
||||
|
||||
idx = d.GetIdxForWrite("/dir1")
|
||||
require.Equal(t, uint64(1), idx)
|
||||
|
||||
d.Init("/dir2", 5)
|
||||
idx = d.GetIdxForWrite("/dir2")
|
||||
require.Equal(t, uint64(6), idx)
|
||||
|
||||
idx = d.GetIdxForWrite("/dir2")
|
||||
require.Equal(t, uint64(7), idx)
|
||||
}
|
54
pkg/local_object_storage/blobstor/blobtree/exists.go
Normal file
54
pkg/local_object_storage/blobstor/blobtree/exists.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Exists(_ context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
if len(prm.StorageID) == storageIDLength {
|
||||
return b.existsFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID))
|
||||
}
|
||||
return b.findAndCheck(prm.Address)
|
||||
}
|
||||
|
||||
func (b *BlobTree) existsFromIdx(addr oid.Address, idx uint64) (common.ExistsRes, error) {
|
||||
dir := b.getDirectoryPath(addr)
|
||||
path := b.getFilePath(dir, idx)
|
||||
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
|
||||
for i := range records {
|
||||
if records[i].Address.Equals(addr) {
|
||||
return common.ExistsRes{
|
||||
Exists: true,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return common.ExistsRes{}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) findAndCheck(addr oid.Address) (common.ExistsRes, error) {
|
||||
dir := b.getDirectoryPath(addr)
|
||||
_, err := b.findFileIdx(dir, addr)
|
||||
if err != nil {
|
||||
var notFound *apistatus.ObjectNotFound
|
||||
if errors.As(err, ¬Found) {
|
||||
return common.ExistsRes{}, nil
|
||||
}
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
return common.ExistsRes{Exists: true}, nil
|
||||
}
|
39
pkg/local_object_storage/blobstor/blobtree/generic_test.go
Normal file
39
pkg/local_object_storage/blobstor/blobtree/generic_test.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
newTreeFromPath := func(path string) common.Storage {
|
||||
return New(
|
||||
WithPath(path),
|
||||
WithDepth(2))
|
||||
}
|
||||
|
||||
newTree := func(t *testing.T) common.Storage {
|
||||
return newTreeFromPath(t.TempDir())
|
||||
}
|
||||
|
||||
blobstortest.TestAll(t, newTree, 2048, 16*1024)
|
||||
|
||||
t.Run("info", func(t *testing.T) {
|
||||
path := t.TempDir()
|
||||
blobstortest.TestInfo(t, func(*testing.T) common.Storage {
|
||||
return newTreeFromPath(path)
|
||||
}, Type, path)
|
||||
})
|
||||
}
|
||||
|
||||
func TestControl(t *testing.T) {
|
||||
newTree := func(t *testing.T) common.Storage {
|
||||
return New(
|
||||
WithPath(t.TempDir()),
|
||||
WithDepth(2))
|
||||
}
|
||||
|
||||
blobstortest.TestControl(t, newTree, 2048, 2048)
|
||||
}
|
107
pkg/local_object_storage/blobstor/blobtree/get.go
Normal file
107
pkg/local_object_storage/blobstor/blobtree/get.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Get(_ context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
if len(prm.StorageID) == storageIDLength {
|
||||
return b.getFromIdx(prm.Address, binary.LittleEndian.Uint64(prm.StorageID))
|
||||
}
|
||||
return b.findAndGet(prm.Address)
|
||||
}
|
||||
|
||||
func (b *BlobTree) getFromIdx(addr oid.Address, idx uint64) (common.GetRes, error) {
|
||||
dir := b.getDirectoryPath(addr)
|
||||
path := b.getFilePath(dir, idx)
|
||||
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
for _, record := range records {
|
||||
if record.Address.Equals(addr) {
|
||||
return b.unmarshalGetRes(record)
|
||||
}
|
||||
}
|
||||
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *BlobTree) unmarshalGetRes(record objectData) (common.GetRes, error) {
|
||||
data, err := b.compressor.Decompress(record.Data)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
return common.GetRes{Object: obj, RawData: data}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) findAndGet(addr oid.Address) (common.GetRes, error) {
|
||||
dir := b.getDirectoryPath(addr)
|
||||
entities, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
for _, entity := range entities {
|
||||
if entity.IsDir() {
|
||||
continue
|
||||
}
|
||||
if b.isTempFile(entity.Name()) {
|
||||
continue
|
||||
}
|
||||
idx, err := b.parseIdx(entity.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
path := b.getFilePath(dir, idx)
|
||||
res, err := b.tryReadObject(path, addr)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
if res.Object != nil {
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *BlobTree) tryReadObject(path string, addr oid.Address) (common.GetRes, error) {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
for _, record := range records {
|
||||
if record.Address.Equals(addr) {
|
||||
res, err := b.unmarshalGetRes(record)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
return common.GetRes{}, nil
|
||||
}
|
28
pkg/local_object_storage/blobstor/blobtree/get_range.go
Normal file
28
pkg/local_object_storage/blobstor/blobtree/get_range.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
func (b *BlobTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
res, err := b.Get(ctx, common.GetPrm{Address: prm.Address, StorageID: prm.StorageID})
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
}
|
||||
|
||||
payload := res.Object.Payload()
|
||||
from := prm.Range.GetOffset()
|
||||
to := from + prm.Range.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
|
||||
}
|
||||
|
||||
return common.GetRangeRes{
|
||||
Data: payload[from:to],
|
||||
}, nil
|
||||
}
|
96
pkg/local_object_storage/blobstor/blobtree/iterate.go
Normal file
96
pkg/local_object_storage/blobstor/blobtree/iterate.go
Normal file
|
@ -0,0 +1,96 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
return common.IterateRes{}, b.iterateDir(b.cfg.rootPath, 0, prm)
|
||||
}
|
||||
|
||||
func (b *BlobTree) iterateDir(dir string, depth uint64, prm common.IteratePrm) error {
|
||||
entities, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for _, entity := range entities {
|
||||
if depth < b.cfg.depth && entity.IsDir() {
|
||||
err := b.iterateDir(filepath.Join(dir, entity.Name()), depth+1, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if depth != b.cfg.depth {
|
||||
continue
|
||||
}
|
||||
if b.isTempFile(entity.Name()) {
|
||||
continue
|
||||
}
|
||||
idx, err := b.parseIdx(entity.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
path := b.getFilePath(dir, idx)
|
||||
err = b.iterateRecords(idx, path, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) iterateRecords(idx uint64, path string, prm common.IteratePrm) error {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for _, record := range records {
|
||||
if prm.LazyHandler != nil {
|
||||
if err = prm.LazyHandler(record.Address, func() ([]byte, error) {
|
||||
return record.Data, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
record.Data, err = b.compressor.Decompress(record.Data)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
if prm.ErrorHandler != nil {
|
||||
return prm.ErrorHandler(record.Address, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
storageID := make([]byte, storageIDLength)
|
||||
binary.LittleEndian.PutUint64(storageID, idx)
|
||||
err = prm.Handler(common.IterationElement{
|
||||
Address: record.Address,
|
||||
ObjectData: record.Data,
|
||||
StorageID: storageID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
29
pkg/local_object_storage/blobstor/blobtree/option.go
Normal file
29
pkg/local_object_storage/blobstor/blobtree/option.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package blobtree
|
||||
|
||||
import "io/fs"
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
func WithPath(path string) Option {
|
||||
return func(c *cfg) {
|
||||
c.rootPath = path
|
||||
}
|
||||
}
|
||||
|
||||
func WithDepth(depth uint64) Option {
|
||||
return func(c *cfg) {
|
||||
c.depth = depth
|
||||
}
|
||||
}
|
||||
|
||||
func WithPerm(p fs.FileMode) Option {
|
||||
return func(c *cfg) {
|
||||
c.permissions = p
|
||||
}
|
||||
}
|
||||
|
||||
func WithTargetSize(size uint64) Option {
|
||||
return func(c *cfg) {
|
||||
c.targetFileSizeBytes = size
|
||||
}
|
||||
}
|
88
pkg/local_object_storage/blobstor/blobtree/put.go
Normal file
88
pkg/local_object_storage/blobstor/blobtree/put.go
Normal file
|
@ -0,0 +1,88 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
)
|
||||
|
||||
const (
|
||||
tempFileSymbols = "###"
|
||||
storageIDLength = 8
|
||||
)
|
||||
|
||||
func (b *BlobTree) Put(_ context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
if b.cfg.readOnly {
|
||||
return common.PutRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
dir := b.getDirectoryPath(prm.Address)
|
||||
|
||||
if err := b.createDir(dir); err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
if !prm.DontCompress {
|
||||
prm.RawData = b.compressor.Compress(prm.RawData)
|
||||
}
|
||||
|
||||
idx, err := b.saveToFile(prm, dir)
|
||||
if err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
storageID := make([]byte, storageIDLength)
|
||||
binary.LittleEndian.PutUint64(storageID, idx)
|
||||
return common.PutRes{StorageID: storageID}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) saveToFile(prm common.PutPrm, dir string) (uint64, error) {
|
||||
returnIdx := true
|
||||
idx := b.dispatcher.GetIdxForWrite(dir)
|
||||
path := b.getFilePath(dir, idx)
|
||||
|
||||
b.fileLock.Lock(path)
|
||||
defer b.fileLock.Unlock(path)
|
||||
|
||||
defer func() {
|
||||
if returnIdx {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
}
|
||||
}()
|
||||
|
||||
currentContent, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
var newRecord objectData
|
||||
newRecord.Address = prm.Address
|
||||
newRecord.Data = prm.RawData
|
||||
|
||||
size, err := b.writeToTmpAndRename(append(currentContent, newRecord), path)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
returnIdx = size < b.cfg.targetFileSizeBytes
|
||||
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) writeToTmpAndRename(records []objectData, path string) (uint64, error) {
|
||||
tmpFile := path + tempFileSymbols + strconv.FormatUint(b.suffix.Add(1), 16)
|
||||
|
||||
size, err := b.saveContentToFile(records, tmpFile)
|
||||
if err != nil {
|
||||
_ = os.Remove(tmpFile)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpFile, path); err != nil {
|
||||
_ = os.Remove(tmpFile)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore"
|
||||
|
@ -81,6 +82,15 @@ var storages = []storage{
|
|||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "blobtree",
|
||||
create: func(dir string) common.Storage {
|
||||
return blobtree.New(
|
||||
blobtree.WithDepth(2),
|
||||
blobtree.WithPath(dir),
|
||||
)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func BenchmarkSubstorageReadPerf(b *testing.B) {
|
||||
|
|
|
@ -3,8 +3,8 @@ package sync
|
|||
import "sync"
|
||||
|
||||
type locker struct {
|
||||
mtx sync.Mutex
|
||||
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
||||
mtx sync.RWMutex
|
||||
userCount int // not protected by mtx, must used outer mutex to update concurrently
|
||||
}
|
||||
|
||||
type KeyLocker[K comparable] struct {
|
||||
|
@ -19,26 +19,50 @@ func NewKeyLocker[K comparable]() *KeyLocker[K] {
|
|||
}
|
||||
|
||||
func (l *KeyLocker[K]) Lock(key K) {
|
||||
l.lock(key, false)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) RLock(key K) {
|
||||
l.lock(key, true)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) lock(key K, read bool) {
|
||||
l.lockersMtx.Lock()
|
||||
|
||||
if locker, found := l.lockers[key]; found {
|
||||
locker.waiters++
|
||||
locker.userCount++
|
||||
l.lockersMtx.Unlock()
|
||||
|
||||
locker.mtx.Lock()
|
||||
if read {
|
||||
locker.mtx.RLock()
|
||||
} else {
|
||||
locker.mtx.Lock()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
locker := &locker{
|
||||
waiters: 1,
|
||||
userCount: 1,
|
||||
}
|
||||
if read {
|
||||
locker.mtx.RLock()
|
||||
} else {
|
||||
locker.mtx.Lock()
|
||||
}
|
||||
locker.mtx.Lock()
|
||||
|
||||
l.lockers[key] = locker
|
||||
l.lockersMtx.Unlock()
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) Unlock(key K) {
|
||||
l.unlock(key, false)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) RUnlock(key K) {
|
||||
l.unlock(key, true)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) unlock(key K, read bool) {
|
||||
l.lockersMtx.Lock()
|
||||
defer l.lockersMtx.Unlock()
|
||||
|
||||
|
@ -47,10 +71,14 @@ func (l *KeyLocker[K]) Unlock(key K) {
|
|||
return
|
||||
}
|
||||
|
||||
if locker.waiters == 1 {
|
||||
if locker.userCount == 1 {
|
||||
delete(l.lockers, key)
|
||||
}
|
||||
locker.waiters--
|
||||
locker.userCount--
|
||||
|
||||
locker.mtx.Unlock()
|
||||
if read {
|
||||
locker.mtx.RUnlock()
|
||||
} else {
|
||||
locker.mtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestKeyLocker(t *testing.T) {
|
||||
func TestKeyLockerWrite(t *testing.T) {
|
||||
taken := false
|
||||
eg, _ := errgroup.WithContext(context.Background())
|
||||
keyLocker := NewKeyLocker[int]()
|
||||
|
@ -30,3 +30,17 @@ func TestKeyLocker(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
||||
func TestKeyLockerRead(t *testing.T) {
|
||||
eg, _ := errgroup.WithContext(context.Background())
|
||||
keyLocker := NewKeyLocker[int]()
|
||||
for i := 0; i < 100; i++ {
|
||||
eg.Go(func() error {
|
||||
keyLocker.RLock(0)
|
||||
defer keyLocker.RUnlock(0)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue