frostfs-node/pkg/services/object/common/writer/writer.go

185 lines
4.7 KiB
Go
Raw Normal View History

package writer
import (
"context"
"crypto/ecdsa"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
// Config represents a set of static parameters that are established during
// the initialization phase of all services.
type Config struct {
KeyStorage *objutil.KeyStorage
MaxSizeSrc MaxSizeSource
LocalStore ObjectStorage
ContainerSource container.Source
NetmapSource netmap.Source
RemotePool, LocalPool util.WorkerPool
NetmapKeys netmap.AnnouncedKeys
FormatValidator *object.FormatValidator
NetworkState netmap.State
ClientConstructor ClientConstructor
Logger *logger.Logger
VerifySessionTokenIssuer bool
}
type Option func(*Config)
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *Config) {
c.RemotePool, c.LocalPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *Config) {
c.Logger = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *Config) {
c.VerifySessionTokenIssuer = v
}
}
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
if c.NetmapKeys.IsLocalKey(pub) {
return c.LocalPool, true
}
return c.RemotePool, false
}
type Params struct {
Config *Config
Common *objutil.CommonPrm
Header *objectSDK.Object
Container containerSDK.Container
TraverseOpts []placement.Option
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
SignRequestPrivateKey *ecdsa.PrivateKey
}
func New(prm *Params) transformer.ObjectWriter {
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
return newECWriter(prm)
}
return newDefaultObjectWriter(prm, false)
}
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, NodeDescriptor) error
if prm.Relay != nil {
relay = func(ctx context.Context, node NodeDescriptor) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.Info)
c, err := prm.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return prm.Relay(ctx, info, c)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.TraverseOpts
if forECPlacement && !prm.Common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedWriter{
cfg: prm.Config,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
if node.Local {
return LocalTarget{
Storage: prm.Config.LocalStore,
Container: prm.Container,
}
}
rt := &remoteWriter{
privateKey: prm.SignRequestPrivateKey,
commonPrm: prm.Common,
clientConstructor: prm.Config.ClientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
return rt
},
relay: relay,
}
}
func newECWriter(prm *Params) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ECWriter{
Config: prm.Config,
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
Container: prm.Container,
Key: prm.SignRequestPrivateKey,
CommonPrm: prm.Common,
Relay: prm.Relay,
},
repWriter: newDefaultObjectWriter(prm, true),
}
}