[#186] cmd/neofs-node: Integrate new storage engine into application

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-11-19 13:58:27 +03:00 committed by Alex Vanin
parent 953387a1e5
commit 4ca1035aa4
3 changed files with 154 additions and 41 deletions

View file

@ -5,6 +5,8 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"net" "net"
"os" "os"
"path"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -16,9 +18,12 @@ import (
"github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/container"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/fsbucket" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/fsbucket"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
@ -77,11 +82,6 @@ const (
cfgContainerContract = "container.scripthash" cfgContainerContract = "container.scripthash"
cfgContainerFee = "container.fee" cfgContainerFee = "container.fee"
cfgObjectStorage = "storage.object"
cfgMetaBasePath = "storage.metabase.path"
cfgMetaBasePerm = "storage.metabase.perm"
cfgGCQueueSize = "gc.queuesize" cfgGCQueueSize = "gc.queuesize"
cfgGCQueueTick = "gc.duration.sleep" cfgGCQueueTick = "gc.duration.sleep"
cfgGCTimeout = "gc.duration.timeout" cfgGCTimeout = "gc.duration.timeout"
@ -111,6 +111,21 @@ const (
cfgObjectSearchDialTimeout = "object.search.dial_timeout" cfgObjectSearchDialTimeout = "object.search.dial_timeout"
) )
const (
cfgLocalStorageSection = "storage"
cfgStorageShardSection = "shard"
cfgBlobStorSection = "blobstor"
cfgBlobStorCompress = "compress"
cfgBlobStorShallowDepth = "shallow_depth"
cfgBlobStorTreePath = "path"
cfgBlobStorTreePerm = "perm"
cfgMetaBaseSection = "metabase"
cfgMetaBasePath = "path"
cfgMetaBasePerm = "perm"
)
const ( const (
addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding
) )
@ -213,13 +228,17 @@ type cfgObject struct {
cnrStorage container.Source cnrStorage container.Source
metastorage *meta.DB
blobstorage bucket.Bucket
cnrClient *wrapper.Wrapper cnrClient *wrapper.Wrapper
pool cfgObjectRoutines pool cfgObjectRoutines
cfgLocalStorage cfgLocalStorage
}
type cfgLocalStorage struct {
localStorage *engine.StorageEngine
shardOpts [][]shard.Option
} }
type cfgObjectRoutines struct { type cfgObjectRoutines struct {
@ -347,11 +366,6 @@ func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3") v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3")
v.SetDefault(cfgNetmapFee, "1") v.SetDefault(cfgNetmapFee, "1")
v.SetDefault(cfgObjectStorage+".type", "inmemory")
v.SetDefault(cfgMetaBasePath, "metabase")
v.SetDefault(cfgMetaBasePerm, 0600)
v.SetDefault(cfgLogLevel, "info") v.SetDefault(cfgLogLevel, "info")
v.SetDefault(cfgLogFormat, "console") v.SetDefault(cfgLogFormat, "console")
v.SetDefault(cfgLogTrace, "fatal") v.SetDefault(cfgLogTrace, "fatal")
@ -388,22 +402,99 @@ func (c *cfg) LocalAddress() *network.Address {
} }
func initLocalStorage(c *cfg) { func initLocalStorage(c *cfg) {
var err error initShardOptions(c)
c.cfgObject.blobstorage, err = initBucket(cfgObjectStorage, c) ls := engine.New(
fatalOnErr(err) engine.WithLogger(c.log),
boltDB, err := bbolt.Open(
c.viper.GetString(cfgMetaBasePath),
os.FileMode(c.viper.GetUint32(cfgMetaBasePerm)),
nil,
) )
fatalOnErr(err)
c.cfgObject.metastorage = meta.NewDB( for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts {
meta.FromBoltDB(boltDB), id, err := ls.AddShard(opts...)
meta.WithLogger(c.log), fatalOnErr(err)
)
c.log.Info("shard attached to engine",
zap.Stringer("id", id),
)
}
c.cfgObject.cfgLocalStorage.localStorage = ls
}
func initShardOptions(c *cfg) {
var opts [][]shard.Option
for i := 0; ; i++ {
prefix := configPath(
cfgLocalStorageSection,
cfgStorageShardSection,
strconv.Itoa(i),
)
blobPrefix := configPath(prefix, cfgBlobStorSection)
blobPath := c.viper.GetString(
configPath(blobPrefix, cfgBlobStorTreePath),
)
if blobPath == "" {
break
}
compressObjects := c.viper.GetBool(
configPath(blobPrefix, cfgBlobStorCompress),
)
blobPerm := os.FileMode(c.viper.GetInt(
configPath(blobPrefix, cfgBlobStorTreePerm),
))
shallowDepth := c.viper.GetInt(
configPath(blobPrefix, cfgBlobStorShallowDepth),
)
metaPrefix := configPath(prefix, cfgMetaBaseSection)
metaPath := c.viper.GetString(
configPath(metaPrefix, cfgMetaBasePath),
)
metaPerm := os.FileMode(c.viper.GetUint32(
configPath(metaPrefix, cfgMetaBasePerm),
))
fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm))
boltDB, err := bbolt.Open(metaPath, metaPerm, nil)
fatalOnErr(err)
opts = append(opts, []shard.Option{
shard.WithLogger(c.log),
shard.WithBlobStorOptions(
blobstor.WithTreeRootPath(blobPath),
blobstor.WithCompressObjects(compressObjects, c.log),
blobstor.WithTreeRootPerm(blobPerm),
blobstor.WithShallowDepth(shallowDepth),
),
shard.WithMetaBaseOptions(
meta.WithLogger(c.log),
meta.FromBoltDB(boltDB),
),
})
c.log.Info("storage shard options",
zap.String("BLOB path", blobPath),
zap.Stringer("BLOB permissions", blobPerm),
zap.Bool("BLOB compress", compressObjects),
zap.Int("BLOB shallow depth", shallowDepth),
zap.String("metabase path", metaPath),
zap.Stringer("metabase permissions", metaPerm),
)
}
c.cfgObject.cfgLocalStorage.shardOpts = opts
}
func configPath(sections ...string) string {
return strings.Join(sections, ".")
} }
func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) { func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) {

View file

@ -5,7 +5,6 @@ import (
"log" "log"
"github.com/nspcc-dev/neofs-node/pkg/util/grace" "github.com/nspcc-dev/neofs-node/pkg/util/grace"
"go.uber.org/zap"
) )
func fatalOnErr(err error) { func fatalOnErr(err error) {
@ -58,12 +57,6 @@ func wait(c *cfg) {
} }
func shutdown(c *cfg) { func shutdown(c *cfg) {
if err := c.cfgObject.metastorage.Close(); err != nil {
c.log.Error("could not close metabase",
zap.String("error", err.Error()),
)
}
c.cfgGRPC.server.GracefulStop() c.cfgGRPC.server.GracefulStop()
c.log.Info("gRPC server stopped") c.log.Info("gRPC server stopped")

View file

@ -7,12 +7,13 @@ import (
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
"github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/network/cache"
objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
@ -38,6 +39,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/policer"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator" "github.com/nspcc-dev/neofs-node/pkg/services/replicator"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -169,11 +171,33 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe
return s.rngHash.GetRangeHash(ctx, req) return s.rngHash.GetRangeHash(ctx, req)
} }
func initObjectService(c *cfg) { type localObjectRemover struct {
ls := localstore.New( storage *engine.StorageEngine
c.cfgObject.blobstorage,
c.cfgObject.metastorage, log *logger.Logger
}
func (r *localObjectRemover) Delete(addr *objectSDK.Address) error {
_, err := r.storage.Delete(new(engine.DeletePrm).
WithAddress(addr),
) )
return err
}
func (r *localObjectRemover) DeleteObjects(list ...*objectSDK.Address) {
for _, a := range list {
if err := r.Delete(a); err != nil {
r.log.Error("could not delete object",
zap.Stringer("address", a),
zap.String("error", err.Error()),
)
}
}
}
func initObjectService(c *cfg) {
ls := c.cfgObject.cfgLocalStorage.localStorage
keyStorage := util.NewKeyStorage(c.key, c.privateTokenStore) keyStorage := util.NewKeyStorage(c.key, c.privateTokenStore)
nodeOwner := owner.NewID() nodeOwner := owner.NewID()
@ -184,9 +208,14 @@ func initObjectService(c *cfg) {
clientCache := cache.NewSDKClientCache() clientCache := cache.NewSDKClientCache()
objRemover := &localObjectRemover{
storage: ls,
log: c.log,
}
objGC := gc.New( objGC := gc.New(
gc.WithLogger(c.log), gc.WithLogger(c.log),
gc.WithRemover(ls), gc.WithRemover(objRemover),
gc.WithQueueCapacity(c.viper.GetUint32(cfgGCQueueSize)), gc.WithQueueCapacity(c.viper.GetUint32(cfgGCQueueSize)),
gc.WithSleepInterval(c.viper.GetDuration(cfgGCQueueTick)), gc.WithSleepInterval(c.viper.GetDuration(cfgGCQueueTick)),
gc.WithWorkingInterval(c.viper.GetDuration(cfgGCTimeout)), gc.WithWorkingInterval(c.viper.GetDuration(cfgGCTimeout)),
@ -258,7 +287,7 @@ func initObjectService(c *cfg) {
putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
putsvc.WithLocalAddressSource(c), putsvc.WithLocalAddressSource(c),
putsvc.WithFormatValidatorOpts( putsvc.WithFormatValidatorOpts(
objectCore.WithDeleteHandler(c.cfgObject.metastorage), objectCore.WithDeleteHandler(objRemover),
), ),
putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPool(c.cfgObject.pool.put), putsvc.WithWorkerPool(c.cfgObject.pool.put),