frostfs-node/pkg/services/object/put/service.go

132 lines
2.7 KiB
Go
Raw Permalink Normal View History

package putsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type Service struct {
*cfg
}
type Option func(*cfg)
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
type cfg struct {
keyStorage *objutil.KeyStorage
maxSizeSrc MaxSizeSource
localStore ObjectStorage
cnrSrc container.Source
netMapSrc netmap.Source
remotePool, localPool util.WorkerPool
netmapKeys netmap.AnnouncedKeys
fmtValidator *object.FormatValidator
networkState netmap.State
clientConstructor ClientConstructor
log *logger.Logger
verifySessionTokenIssuer bool
}
func NewService(ks *objutil.KeyStorage,
cc ClientConstructor,
ms MaxSizeSource,
os ObjectStorage,
cs container.Source,
ns netmap.Source,
nk netmap.AnnouncedKeys,
nst netmap.State,
ir InnerRing,
opts ...Option) *Service {
c := &cfg{
remotePool: util.NewPseudoWorkerPool(),
localPool: util.NewPseudoWorkerPool(),
log: &logger.Logger{Logger: zap.L()},
keyStorage: ks,
clientConstructor: cc,
maxSizeSrc: ms,
localStore: os,
cnrSrc: cs,
netMapSrc: ns,
netmapKeys: nk,
networkState: nst,
}
for i := range opts {
opts[i](c)
}
c.fmtValidator = object.NewFormatValidator(
object.WithLockSource(os),
object.WithNetState(nst),
object.WithInnerRing(ir),
object.WithNetmapSource(ns),
object.WithContainersSource(cs),
object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer),
object.WithLogger(c.log),
)
return &Service{
cfg: c,
}
}
func (p *Service) Put() (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
}, nil
}
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *cfg) {
c.remotePool, c.localPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *cfg) {
c.verifySessionTokenIssuer = v
}
}