frostfs-node/pkg/services/object/put/service.go
Leonard Lyubich 358e3ed8c4 [#645] *: Change the locality condition of the node from the placement
Some software components regulate the way of working with placement arrays
when a local node enters it. In the previous implementation, the locality
criterion was the correspondence between the announced network address
(group) and the address with which the node was configured. However, by
design, network addresses are not unique identifiers of storage nodes in the
system.

Change comparisons by network addresses to comparisons by keys in all
packages with the logic described above. Implement `netmap.AnnouncedKeys`
interface on `cfg` type in the storage node application.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-09-07 09:53:18 +03:00

155 lines
2.8 KiB
Go

package putsvc
import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type Service struct {
*cfg
}
type Option func(*cfg)
type ClientConstructor interface {
Get(network.AddressGroup) (client.Client, error)
}
type cfg struct {
keyStorage *objutil.KeyStorage
maxSizeSrc MaxSizeSource
localStore *engine.StorageEngine
cnrSrc container.Source
netMapSrc netmap.Source
workerPool util.WorkerPool
netmapKeys netmap.AnnouncedKeys
fmtValidator *object.FormatValidator
fmtValidatorOpts []object.FormatValidatorOption
networkState netmap.State
clientConstructor ClientConstructor
log *logger.Logger
}
func defaultCfg() *cfg {
return &cfg{
workerPool: new(util.SyncWorkerPool),
log: zap.L(),
}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...)
return &Service{
cfg: c,
}
}
func (p *Service) Put(ctx context.Context) (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
ctx: ctx,
}, nil
}
func WithKeyStorage(v *objutil.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = v
}
}
func WithMaxSizeSource(v MaxSizeSource) Option {
return func(c *cfg) {
c.maxSizeSrc = v
}
}
func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.localStore = v
}
}
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
func WithNetworkMapSource(v netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = v
}
}
func WithWorkerPool(v util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = v
}
}
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
func WithFormatValidatorOpts(v ...object.FormatValidatorOption) Option {
return func(c *cfg) {
c.fmtValidatorOpts = v
}
}
func WithNetworkState(v netmap.State) Option {
return func(c *cfg) {
c.networkState = v
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithNetState(v))
}
}
func WithClientConstructor(v ClientConstructor) Option {
return func(c *cfg) {
c.clientConstructor = v
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}