frostfs-node/pkg/local_object_storage/metabase/db.go

352 lines
8.7 KiB
Go
Raw Permalink Normal View History

package meta
import (
"bytes"
"encoding/binary"
"encoding/hex"
"io/fs"
"os"
"strconv"
"strings"
"sync"
"time"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/mr-tron/base58"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
type matcher struct {
matchSlow func(string, []byte, string) bool
matchBucket func(*bbolt.Bucket, string, string, func([]byte, []byte) error) error
}
// EpochState is an interface that provides access to the
// current epoch number.
type EpochState interface {
// CurrentEpoch must return current epoch height.
CurrentEpoch() uint64
}
// DB represents local metabase of storage node.
type DB struct {
*cfg
modeMtx sync.RWMutex
mode mode.Mode
matchers map[object.SearchMatchType]matcher
boltDB *bbolt.DB
initialized bool
}
// Option is an option of DB constructor.
type Option func(*cfg)
type cfg struct {
boltOptions *bbolt.Options // optional
boltBatchSize int
boltBatchDelay time.Duration
info Info
log *logger.Logger
epochState EpochState
}
func defaultCfg() *cfg {
return &cfg{
info: Info{
Permission: os.ModePerm, // 0777
},
boltBatchDelay: bbolt.DefaultMaxBatchDelay,
boltBatchSize: bbolt.DefaultMaxBatchSize,
log: &logger.Logger{Logger: zap.L()},
}
}
// New creates and returns new Metabase instance.
func New(opts ...Option) *DB {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
if c.epochState == nil {
panic("metabase: epoch state is not specified")
}
return &DB{
cfg: c,
matchers: map[object.SearchMatchType]matcher{
object.MatchUnknown: {
matchSlow: unknownMatcher,
matchBucket: unknownMatcherBucket,
},
object.MatchStringEqual: {
matchSlow: stringEqualMatcher,
matchBucket: stringEqualMatcherBucket,
},
object.MatchStringNotEqual: {
matchSlow: stringNotEqualMatcher,
matchBucket: stringNotEqualMatcherBucket,
},
object.MatchCommonPrefix: {
matchSlow: stringCommonPrefixMatcher,
matchBucket: stringCommonPrefixMatcherBucket,
},
},
}
}
func stringifyValue(key string, objVal []byte) string {
switch key {
default:
return string(objVal)
case v2object.FilterHeaderObjectID, v2object.FilterHeaderContainerID, v2object.FilterHeaderParent:
return base58.Encode(objVal)
case v2object.FilterHeaderPayloadHash, v2object.FilterHeaderHomomorphicHash:
return hex.EncodeToString(objVal)
case v2object.FilterHeaderCreationEpoch, v2object.FilterHeaderPayloadLength:
return strconv.FormatUint(binary.LittleEndian.Uint64(objVal), 10)
}
}
// fromHexChar converts a hex character into its value and a success flag.
func fromHexChar(c byte) (byte, bool) {
switch {
case '0' <= c && c <= '9':
return c - '0', true
case 'a' <= c && c <= 'f':
return c - 'a' + 10, true
case 'A' <= c && c <= 'F':
return c - 'A' + 10, true
}
return 0, false
}
// destringifyValue is the reverse operation for stringify value.
// The last return value returns true if the filter CAN match any value.
// The second return value is true iff prefix is true and the filter value is considered
// a hex-encoded string. In this case only the first (highest) bits of the last byte should be checked.
func destringifyValue(key, value string, prefix bool) ([]byte, bool, bool) {
switch key {
default:
return []byte(value), false, true
case v2object.FilterHeaderObjectID, v2object.FilterHeaderContainerID, v2object.FilterHeaderParent:
v, err := base58.Decode(value)
return v, false, err == nil
case v2object.FilterHeaderPayloadHash, v2object.FilterHeaderHomomorphicHash:
v, err := hex.DecodeString(value)
if err != nil {
if !prefix || len(value)%2 == 0 {
return v, false, false
}
// To match the old behaviour we need to process odd length hex strings, such as 'abc'
last, ok := fromHexChar(value[len(value)-1])
if !ok {
return v, false, false
}
v := make([]byte, hex.DecodedLen(len(value)-1)+1)
_, err := hex.Decode(v, []byte(value[:len(value)-1]))
if err != nil {
return nil, false, false
}
v[len(v)-1] = last
return v, true, true
}
return v, false, err == nil
case v2object.FilterHeaderCreationEpoch, v2object.FilterHeaderPayloadLength:
u, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, false, false
}
raw := make([]byte, 8)
binary.LittleEndian.PutUint64(raw, u)
return raw, false, true
}
}
func stringEqualMatcher(key string, objVal []byte, filterVal string) bool {
return stringifyValue(key, objVal) == filterVal
}
func stringEqualMatcherBucket(b *bbolt.Bucket, fKey string, fValue string, f func([]byte, []byte) error) error {
// Ignore the second return value because we check for strict equality.
val, _, ok := destringifyValue(fKey, fValue, false)
if !ok {
return nil
}
if data := b.Get(val); data != nil {
return f(val, data)
}
if b.Bucket(val) != nil {
return f(val, nil)
}
return nil
}
func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool {
return stringifyValue(key, objVal) != filterVal
}
func stringNotEqualMatcherBucket(b *bbolt.Bucket, fKey string, fValue string, f func([]byte, []byte) error) error {
// Ignore the second return value because we check for strict inequality.
val, _, ok := destringifyValue(fKey, fValue, false)
return b.ForEach(func(k, v []byte) error {
if !ok || !bytes.Equal(val, k) {
return f(k, v)
}
return nil
})
}
func stringCommonPrefixMatcher(key string, objVal []byte, filterVal string) bool {
return strings.HasPrefix(stringifyValue(key, objVal), filterVal)
}
func stringCommonPrefixMatcherBucket(b *bbolt.Bucket, fKey string, fVal string, f func([]byte, []byte) error) error {
val, checkLast, ok := destringifyValue(fKey, fVal, true)
if !ok {
return nil
}
prefix := val
if checkLast {
prefix = val[:len(val)-1]
}
if len(val) == 0 {
// empty common prefix, all the objects
// satisfy that filter
return b.ForEach(f)
}
c := b.Cursor()
for k, v := c.Seek(val); bytes.HasPrefix(k, prefix); k, v = c.Next() {
if checkLast && (len(k) == len(prefix) || k[len(prefix)]>>4 != val[len(val)-1]) {
// If the last byte doesn't match, this means the prefix does no longer match,
// so we need to break here.
break
}
if err := f(k, v); err != nil {
return err
}
}
return nil
}
func unknownMatcher(_ string, _ []byte, _ string) bool {
return false
}
func unknownMatcherBucket(_ *bbolt.Bucket, _ string, _ string, _ func([]byte, []byte) error) error {
return nil
}
// bucketKeyHelper returns byte representation of val that is used as a key
// in boltDB. Useful for getting filter values from unique and list indexes.
func bucketKeyHelper(hdr string, val string) []byte {
switch hdr {
case v2object.FilterHeaderParent:
v, err := base58.Decode(val)
if err != nil {
return nil
}
return v
case v2object.FilterHeaderPayloadHash:
v, err := hex.DecodeString(val)
if err != nil {
return nil
}
return v
case v2object.FilterHeaderSplitID:
s := object.NewSplitID()
err := s.Parse(val)
if err != nil {
return nil
}
return s.ToV2()
default:
return []byte(val)
}
}
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (db *DB) SetLogger(l *logger.Logger) {
db.log = l
}
// WithLogger returns option to set logger of DB.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
// WithBoltDBOptions returns option to specify BoltDB options.
func WithBoltDBOptions(opts *bbolt.Options) Option {
return func(c *cfg) {
c.boltOptions = opts
}
}
// WithPath returns option to set system path to Metabase.
func WithPath(path string) Option {
return func(c *cfg) {
c.info.Path = path
}
}
// WithPermissions returns option to specify permission bits
// of Metabase system path.
func WithPermissions(perm fs.FileMode) Option {
return func(c *cfg) {
c.info.Permission = perm
}
}
// WithMaxBatchSize returns option to specify maximum concurrent operations
// to be processed in a single transactions.
// This option is missing from `bbolt.Options` but is set right after DB is open.
func WithMaxBatchSize(s int) Option {
return func(c *cfg) {
if s != 0 {
c.boltBatchSize = s
}
}
}
// WithMaxBatchDelay returns option to specify maximum time to wait before
// the batch of concurrent transactions is processed.
// This option is missing from `bbolt.Options` but is set right after DB is open.
func WithMaxBatchDelay(d time.Duration) Option {
return func(c *cfg) {
if d != 0 {
c.boltBatchDelay = d
}
}
}
// WithEpochState return option to specify a source of current epoch height.
func WithEpochState(s EpochState) Option {
return func(c *cfg) {
c.epochState = s
}
}