frostfs-node/pkg/local_object_storage/metabase/db.go

170 lines
3.9 KiB
Go
Raw Normal View History

package meta
import (
"encoding/binary"
"encoding/hex"
"io/fs"
"os"
"strconv"
"strings"
"time"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// DB represents local metabase of storage node.
type DB struct {
*cfg
matchers map[object.SearchMatchType]func(string, []byte, string) bool
boltDB *bbolt.DB
}
// Option is an option of DB constructor.
type Option func(*cfg)
type cfg struct {
boltOptions *bbolt.Options // optional
boltBatchSize int
boltBatchDelay time.Duration
info Info
log *logger.Logger
}
func defaultCfg() *cfg {
return &cfg{
info: Info{
Permission: os.ModePerm, // 0777
},
boltBatchDelay: bbolt.DefaultMaxBatchDelay,
boltBatchSize: bbolt.DefaultMaxBatchSize,
log: zap.L(),
}
}
// New creates and returns new Metabase instance.
func New(opts ...Option) *DB {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &DB{
cfg: c,
matchers: map[object.SearchMatchType]func(string, []byte, string) bool{
object.MatchUnknown: unknownMatcher,
object.MatchStringEqual: stringEqualMatcher,
object.MatchStringNotEqual: stringNotEqualMatcher,
object.MatchCommonPrefix: stringCommonPrefixMatcher,
},
}
}
func stringifyValue(key string, objVal []byte) string {
switch key {
default:
return string(objVal)
case v2object.FilterHeaderPayloadHash, v2object.FilterHeaderHomomorphicHash:
return hex.EncodeToString(objVal)
case v2object.FilterHeaderCreationEpoch, v2object.FilterHeaderPayloadLength:
return strconv.FormatUint(binary.LittleEndian.Uint64(objVal), 10)
}
}
func stringEqualMatcher(key string, objVal []byte, filterVal string) bool {
return stringifyValue(key, objVal) == filterVal
}
func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool {
return stringifyValue(key, objVal) != filterVal
}
func stringCommonPrefixMatcher(key string, objVal []byte, filterVal string) bool {
return strings.HasPrefix(stringifyValue(key, objVal), filterVal)
}
func unknownMatcher(_ string, _ []byte, _ string) bool {
return false
}
// bucketKeyHelper returns byte representation of val that is used as a key
// in boltDB. Useful for getting filter values from unique and list indexes.
func bucketKeyHelper(hdr string, val string) []byte {
switch hdr {
case v2object.FilterHeaderPayloadHash:
v, err := hex.DecodeString(val)
if err != nil {
return nil
}
return v
case v2object.FilterHeaderSplitID:
s := object.NewSplitID()
err := s.Parse(val)
if err != nil {
return nil
}
return s.ToV2()
default:
return []byte(val)
}
}
// WithLogger returns option to set logger of DB.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
// WithBoltDBOptions returns option to specify BoltDB options.
func WithBoltDBOptions(opts *bbolt.Options) Option {
return func(c *cfg) {
c.boltOptions = opts
}
}
// WithPath returns option to set system path to Metabase.
func WithPath(path string) Option {
return func(c *cfg) {
c.info.Path = path
}
}
// WithPermissions returns option to specify permission bits
// of Metabase system path.
func WithPermissions(perm fs.FileMode) Option {
return func(c *cfg) {
c.info.Permission = perm
}
}
// WithBatchSize returns option to specify maximum concurrent operations
// to be processed in a single transactions.
// This option is missing from `bbolt.Options` but is set right after DB is open.
func WithBatchSize(s int) Option {
return func(c *cfg) {
c.boltBatchSize = s
}
}
// WithBatchDelay returns option to specify maximum time to wait before
// the batch of concurrent transactions is processed.
// This option is missing from `bbolt.Options` but is set right after DB is open.
func WithBatchDelay(d time.Duration) Option {
return func(c *cfg) {
c.boltBatchDelay = d
}
}