forked from TrueCloudLab/frostfs-node
184 lines
4.7 KiB
Go
184 lines
4.7 KiB
Go
package writer
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
|
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
)
|
|
|
|
type MaxSizeSource interface {
|
|
// MaxObjectSize returns maximum payload size
|
|
// of physically stored object in system.
|
|
//
|
|
// Must return 0 if value can not be obtained.
|
|
MaxObjectSize() uint64
|
|
}
|
|
|
|
type ClientConstructor interface {
|
|
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
|
}
|
|
|
|
type InnerRing interface {
|
|
InnerRingKeys() ([][]byte, error)
|
|
}
|
|
|
|
type FormatValidatorConfig interface {
|
|
VerifySessionTokenIssuer() bool
|
|
}
|
|
|
|
// Config represents a set of static parameters that are established during
|
|
// the initialization phase of all services.
|
|
type Config struct {
|
|
KeyStorage *objutil.KeyStorage
|
|
|
|
MaxSizeSrc MaxSizeSource
|
|
|
|
LocalStore ObjectStorage
|
|
|
|
ContainerSource container.Source
|
|
|
|
NetmapSource netmap.Source
|
|
|
|
RemotePool, LocalPool util.WorkerPool
|
|
|
|
NetmapKeys netmap.AnnouncedKeys
|
|
|
|
FormatValidator *object.FormatValidator
|
|
|
|
NetworkState netmap.State
|
|
|
|
ClientConstructor ClientConstructor
|
|
|
|
Logger *logger.Logger
|
|
|
|
VerifySessionTokenIssuer bool
|
|
}
|
|
|
|
type Option func(*Config)
|
|
|
|
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
|
return func(c *Config) {
|
|
c.RemotePool, c.LocalPool = remote, local
|
|
}
|
|
}
|
|
|
|
func WithLogger(l *logger.Logger) Option {
|
|
return func(c *Config) {
|
|
c.Logger = l
|
|
}
|
|
}
|
|
|
|
func WithVerifySessionTokenIssuer(v bool) Option {
|
|
return func(c *Config) {
|
|
c.VerifySessionTokenIssuer = v
|
|
}
|
|
}
|
|
|
|
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
|
if c.NetmapKeys.IsLocalKey(pub) {
|
|
return c.LocalPool, true
|
|
}
|
|
return c.RemotePool, false
|
|
}
|
|
|
|
type Params struct {
|
|
Config *Config
|
|
|
|
Common *objutil.CommonPrm
|
|
|
|
Header *objectSDK.Object
|
|
|
|
Container containerSDK.Container
|
|
|
|
TraverseOpts []placement.Option
|
|
|
|
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
|
|
|
SignRequestPrivateKey *ecdsa.PrivateKey
|
|
}
|
|
|
|
func New(prm *Params) transformer.ObjectWriter {
|
|
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
|
|
return newECWriter(prm)
|
|
}
|
|
return newDefaultObjectWriter(prm, false)
|
|
}
|
|
|
|
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
|
|
var relay func(context.Context, NodeDescriptor) error
|
|
if prm.Relay != nil {
|
|
relay = func(ctx context.Context, node NodeDescriptor) error {
|
|
var info client.NodeInfo
|
|
|
|
client.NodeInfoFromNetmapElement(&info, node.Info)
|
|
|
|
c, err := prm.Config.ClientConstructor.Get(info)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
|
}
|
|
|
|
return prm.Relay(ctx, info, c)
|
|
}
|
|
}
|
|
|
|
var resetSuccessAfterOnBroadcast bool
|
|
traverseOpts := prm.TraverseOpts
|
|
if forECPlacement && !prm.Common.LocalOnly() {
|
|
// save non-regular and linking object to EC container.
|
|
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
|
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
|
|
resetSuccessAfterOnBroadcast = true
|
|
}
|
|
|
|
return &distributedWriter{
|
|
cfg: prm.Config,
|
|
placementOpts: traverseOpts,
|
|
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
|
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
|
|
if node.Local {
|
|
return LocalTarget{
|
|
Storage: prm.Config.LocalStore,
|
|
Container: prm.Container,
|
|
}
|
|
}
|
|
|
|
rt := &remoteWriter{
|
|
privateKey: prm.SignRequestPrivateKey,
|
|
commonPrm: prm.Common,
|
|
clientConstructor: prm.Config.ClientConstructor,
|
|
}
|
|
|
|
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
|
|
|
|
return rt
|
|
},
|
|
relay: relay,
|
|
}
|
|
}
|
|
|
|
func newECWriter(prm *Params) transformer.ObjectWriter {
|
|
return &objectWriterDispatcher{
|
|
ecWriter: &ECWriter{
|
|
Config: prm.Config,
|
|
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
|
Container: prm.Container,
|
|
Key: prm.SignRequestPrivateKey,
|
|
CommonPrm: prm.Common,
|
|
Relay: prm.Relay,
|
|
},
|
|
repWriter: newDefaultObjectWriter(prm, true),
|
|
}
|
|
}
|