forked from TrueCloudLab/frostfs-node
184 lines
4.7 KiB
Go
184 lines
4.7 KiB
Go
|
package writer
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/ecdsa"
|
||
|
"fmt"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||
|
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||
|
)
|
||
|
|
||
|
type MaxSizeSource interface {
|
||
|
// MaxObjectSize returns maximum payload size
|
||
|
// of physically stored object in system.
|
||
|
//
|
||
|
// Must return 0 if value can not be obtained.
|
||
|
MaxObjectSize() uint64
|
||
|
}
|
||
|
|
||
|
type ClientConstructor interface {
|
||
|
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||
|
}
|
||
|
|
||
|
type InnerRing interface {
|
||
|
InnerRingKeys() ([][]byte, error)
|
||
|
}
|
||
|
|
||
|
type FormatValidatorConfig interface {
|
||
|
VerifySessionTokenIssuer() bool
|
||
|
}
|
||
|
|
||
|
// Config represents a set of static parameters that are established during
|
||
|
// the initialization phase of all services.
|
||
|
type Config struct {
|
||
|
KeyStorage *objutil.KeyStorage
|
||
|
|
||
|
MaxSizeSrc MaxSizeSource
|
||
|
|
||
|
LocalStore ObjectStorage
|
||
|
|
||
|
ContainerSource container.Source
|
||
|
|
||
|
NetmapSource netmap.Source
|
||
|
|
||
|
RemotePool, LocalPool util.WorkerPool
|
||
|
|
||
|
NetmapKeys netmap.AnnouncedKeys
|
||
|
|
||
|
FormatValidator *object.FormatValidator
|
||
|
|
||
|
NetworkState netmap.State
|
||
|
|
||
|
ClientConstructor ClientConstructor
|
||
|
|
||
|
Logger *logger.Logger
|
||
|
|
||
|
VerifySessionTokenIssuer bool
|
||
|
}
|
||
|
|
||
|
type Option func(*Config)
|
||
|
|
||
|
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||
|
return func(c *Config) {
|
||
|
c.RemotePool, c.LocalPool = remote, local
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithLogger(l *logger.Logger) Option {
|
||
|
return func(c *Config) {
|
||
|
c.Logger = l
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithVerifySessionTokenIssuer(v bool) Option {
|
||
|
return func(c *Config) {
|
||
|
c.VerifySessionTokenIssuer = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
||
|
if c.NetmapKeys.IsLocalKey(pub) {
|
||
|
return c.LocalPool, true
|
||
|
}
|
||
|
return c.RemotePool, false
|
||
|
}
|
||
|
|
||
|
type Params struct {
|
||
|
Config *Config
|
||
|
|
||
|
Common *objutil.CommonPrm
|
||
|
|
||
|
Header *objectSDK.Object
|
||
|
|
||
|
Container containerSDK.Container
|
||
|
|
||
|
TraverseOpts []placement.Option
|
||
|
|
||
|
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||
|
|
||
|
SignRequestPrivateKey *ecdsa.PrivateKey
|
||
|
}
|
||
|
|
||
|
func New(prm *Params) transformer.ObjectWriter {
|
||
|
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
|
||
|
return newECWriter(prm)
|
||
|
}
|
||
|
return newDefaultObjectWriter(prm, false)
|
||
|
}
|
||
|
|
||
|
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
|
||
|
var relay func(context.Context, NodeDescriptor) error
|
||
|
if prm.Relay != nil {
|
||
|
relay = func(ctx context.Context, node NodeDescriptor) error {
|
||
|
var info client.NodeInfo
|
||
|
|
||
|
client.NodeInfoFromNetmapElement(&info, node.Info)
|
||
|
|
||
|
c, err := prm.Config.ClientConstructor.Get(info)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||
|
}
|
||
|
|
||
|
return prm.Relay(ctx, info, c)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var resetSuccessAfterOnBroadcast bool
|
||
|
traverseOpts := prm.TraverseOpts
|
||
|
if forECPlacement && !prm.Common.LocalOnly() {
|
||
|
// save non-regular and linking object to EC container.
|
||
|
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||
|
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
|
||
|
resetSuccessAfterOnBroadcast = true
|
||
|
}
|
||
|
|
||
|
return &distributedWriter{
|
||
|
cfg: prm.Config,
|
||
|
placementOpts: traverseOpts,
|
||
|
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||
|
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
|
||
|
if node.Local {
|
||
|
return LocalTarget{
|
||
|
Storage: prm.Config.LocalStore,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
rt := &remoteWriter{
|
||
|
privateKey: prm.SignRequestPrivateKey,
|
||
|
commonPrm: prm.Common,
|
||
|
clientConstructor: prm.Config.ClientConstructor,
|
||
|
}
|
||
|
|
||
|
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
|
||
|
|
||
|
return rt
|
||
|
},
|
||
|
relay: relay,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func newECWriter(prm *Params) transformer.ObjectWriter {
|
||
|
return &objectWriterDispatcher{
|
||
|
ecWriter: &ECWriter{
|
||
|
Config: prm.Config,
|
||
|
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||
|
Container: prm.Container,
|
||
|
Key: prm.SignRequestPrivateKey,
|
||
|
CommonPrm: prm.Common,
|
||
|
Relay: prm.Relay,
|
||
|
},
|
||
|
repWriter: newDefaultObjectWriter(prm, true),
|
||
|
}
|
||
|
}
|