From 4ca1035aa4e79413c45762d7b52a37b45e3a8488 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 19 Nov 2020 13:58:27 +0300 Subject: [PATCH] [#186] cmd/neofs-node: Integrate new storage engine into application Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 145 +++++++++++++++++++++++++++++++-------- cmd/neofs-node/main.go | 7 -- cmd/neofs-node/object.go | 43 ++++++++++-- 3 files changed, 154 insertions(+), 41 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 5a884febe..435c525f2 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -5,6 +5,8 @@ import ( "crypto/ecdsa" "net" "os" + "path" + "strconv" "strings" "sync" "time" @@ -16,9 +18,12 @@ import ( "github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/pkg/core/container" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/fsbucket" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" @@ -77,11 +82,6 @@ const ( cfgContainerContract = "container.scripthash" cfgContainerFee = "container.fee" - cfgObjectStorage = "storage.object" - - cfgMetaBasePath = "storage.metabase.path" - cfgMetaBasePerm = "storage.metabase.perm" - cfgGCQueueSize = "gc.queuesize" cfgGCQueueTick = "gc.duration.sleep" cfgGCTimeout = "gc.duration.timeout" @@ -111,6 +111,21 @@ const ( cfgObjectSearchDialTimeout = "object.search.dial_timeout" ) +const ( + cfgLocalStorageSection = "storage" + cfgStorageShardSection = "shard" + + cfgBlobStorSection = "blobstor" + cfgBlobStorCompress = "compress" + cfgBlobStorShallowDepth = "shallow_depth" + cfgBlobStorTreePath = "path" + cfgBlobStorTreePerm = "perm" + + cfgMetaBaseSection = "metabase" + cfgMetaBasePath = "path" + cfgMetaBasePerm = "perm" +) + const ( addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding ) @@ -213,13 +228,17 @@ type cfgObject struct { cnrStorage container.Source - metastorage *meta.DB - - blobstorage bucket.Bucket - cnrClient *wrapper.Wrapper pool cfgObjectRoutines + + cfgLocalStorage cfgLocalStorage +} + +type cfgLocalStorage struct { + localStorage *engine.StorageEngine + + shardOpts [][]shard.Option } type cfgObjectRoutines struct { @@ -347,11 +366,6 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3") v.SetDefault(cfgNetmapFee, "1") - v.SetDefault(cfgObjectStorage+".type", "inmemory") - - v.SetDefault(cfgMetaBasePath, "metabase") - v.SetDefault(cfgMetaBasePerm, 0600) - v.SetDefault(cfgLogLevel, "info") v.SetDefault(cfgLogFormat, "console") v.SetDefault(cfgLogTrace, "fatal") @@ -388,22 +402,99 @@ func (c *cfg) LocalAddress() *network.Address { } func initLocalStorage(c *cfg) { - var err error + initShardOptions(c) - c.cfgObject.blobstorage, err = initBucket(cfgObjectStorage, c) - fatalOnErr(err) - - boltDB, err := bbolt.Open( - c.viper.GetString(cfgMetaBasePath), - os.FileMode(c.viper.GetUint32(cfgMetaBasePerm)), - nil, + ls := engine.New( + engine.WithLogger(c.log), ) - fatalOnErr(err) - c.cfgObject.metastorage = meta.NewDB( - meta.FromBoltDB(boltDB), - meta.WithLogger(c.log), - ) + for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts { + id, err := ls.AddShard(opts...) + fatalOnErr(err) + + c.log.Info("shard attached to engine", + zap.Stringer("id", id), + ) + } + + c.cfgObject.cfgLocalStorage.localStorage = ls +} + +func initShardOptions(c *cfg) { + var opts [][]shard.Option + + for i := 0; ; i++ { + prefix := configPath( + cfgLocalStorageSection, + cfgStorageShardSection, + strconv.Itoa(i), + ) + + blobPrefix := configPath(prefix, cfgBlobStorSection) + + blobPath := c.viper.GetString( + configPath(blobPrefix, cfgBlobStorTreePath), + ) + if blobPath == "" { + break + } + + compressObjects := c.viper.GetBool( + configPath(blobPrefix, cfgBlobStorCompress), + ) + + blobPerm := os.FileMode(c.viper.GetInt( + configPath(blobPrefix, cfgBlobStorTreePerm), + )) + + shallowDepth := c.viper.GetInt( + configPath(blobPrefix, cfgBlobStorShallowDepth), + ) + + metaPrefix := configPath(prefix, cfgMetaBaseSection) + + metaPath := c.viper.GetString( + configPath(metaPrefix, cfgMetaBasePath), + ) + + metaPerm := os.FileMode(c.viper.GetUint32( + configPath(metaPrefix, cfgMetaBasePerm), + )) + + fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm)) + + boltDB, err := bbolt.Open(metaPath, metaPerm, nil) + fatalOnErr(err) + + opts = append(opts, []shard.Option{ + shard.WithLogger(c.log), + shard.WithBlobStorOptions( + blobstor.WithTreeRootPath(blobPath), + blobstor.WithCompressObjects(compressObjects, c.log), + blobstor.WithTreeRootPerm(blobPerm), + blobstor.WithShallowDepth(shallowDepth), + ), + shard.WithMetaBaseOptions( + meta.WithLogger(c.log), + meta.FromBoltDB(boltDB), + ), + }) + + c.log.Info("storage shard options", + zap.String("BLOB path", blobPath), + zap.Stringer("BLOB permissions", blobPerm), + zap.Bool("BLOB compress", compressObjects), + zap.Int("BLOB shallow depth", shallowDepth), + zap.String("metabase path", metaPath), + zap.Stringer("metabase permissions", metaPerm), + ) + } + + c.cfgObject.cfgLocalStorage.shardOpts = opts +} + +func configPath(sections ...string) string { + return strings.Join(sections, ".") } func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) { diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 21ccff857..bf9f27093 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -5,7 +5,6 @@ import ( "log" "github.com/nspcc-dev/neofs-node/pkg/util/grace" - "go.uber.org/zap" ) func fatalOnErr(err error) { @@ -58,12 +57,6 @@ func wait(c *cfg) { } func shutdown(c *cfg) { - if err := c.cfgObject.metastorage.Close(); err != nil { - c.log.Error("could not close metabase", - zap.String("error", err.Error()), - ) - } - c.cfgGRPC.server.GracefulStop() c.log.Info("gRPC server stopped") diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index e83d83b7b..8afd19c90 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -7,12 +7,13 @@ import ( "github.com/mr-tron/base58" "github.com/nspcc-dev/neofs-api-go/pkg/client" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/network/cache" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" @@ -38,6 +39,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -169,11 +171,33 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe return s.rngHash.GetRangeHash(ctx, req) } -func initObjectService(c *cfg) { - ls := localstore.New( - c.cfgObject.blobstorage, - c.cfgObject.metastorage, +type localObjectRemover struct { + storage *engine.StorageEngine + + log *logger.Logger +} + +func (r *localObjectRemover) Delete(addr *objectSDK.Address) error { + _, err := r.storage.Delete(new(engine.DeletePrm). + WithAddress(addr), ) + + return err +} + +func (r *localObjectRemover) DeleteObjects(list ...*objectSDK.Address) { + for _, a := range list { + if err := r.Delete(a); err != nil { + r.log.Error("could not delete object", + zap.Stringer("address", a), + zap.String("error", err.Error()), + ) + } + } +} + +func initObjectService(c *cfg) { + ls := c.cfgObject.cfgLocalStorage.localStorage keyStorage := util.NewKeyStorage(c.key, c.privateTokenStore) nodeOwner := owner.NewID() @@ -184,9 +208,14 @@ func initObjectService(c *cfg) { clientCache := cache.NewSDKClientCache() + objRemover := &localObjectRemover{ + storage: ls, + log: c.log, + } + objGC := gc.New( gc.WithLogger(c.log), - gc.WithRemover(ls), + gc.WithRemover(objRemover), gc.WithQueueCapacity(c.viper.GetUint32(cfgGCQueueSize)), gc.WithSleepInterval(c.viper.GetDuration(cfgGCQueueTick)), gc.WithWorkingInterval(c.viper.GetDuration(cfgGCTimeout)), @@ -258,7 +287,7 @@ func initObjectService(c *cfg) { putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), putsvc.WithLocalAddressSource(c), putsvc.WithFormatValidatorOpts( - objectCore.WithDeleteHandler(c.cfgObject.metastorage), + objectCore.WithDeleteHandler(objRemover), ), putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPool(c.cfgObject.pool.put),