[#xx] services/object: Remove non-blocking pools for Put operation
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
12839e0087
commit
268c8a4f9a
7 changed files with 6 additions and 73 deletions
|
@ -325,7 +325,6 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
||||||
c,
|
c,
|
||||||
c.cfgNetmap.state,
|
c.cfgNetmap.state,
|
||||||
irFetcher,
|
irFetcher,
|
||||||
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
|
||||||
objectwriter.WithLogger(c.log),
|
objectwriter.WithLogger(c.log),
|
||||||
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||||
)
|
)
|
||||||
|
|
|
@ -79,11 +79,11 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey())
|
isLocal := n.cfg.NetmapKeys.IsLocalKey(addr.PublicKey())
|
||||||
|
|
||||||
item := new(bool)
|
item := new(bool)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
if err := workerPool.Submit(func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
||||||
|
@ -95,11 +95,7 @@ func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
||||||
|
|
||||||
traverser.SubmitSuccess()
|
traverser.SubmitSuccess()
|
||||||
*item = true
|
*item = true
|
||||||
}); err != nil {
|
}()
|
||||||
wg.Done()
|
|
||||||
svcutil.LogWorkerPoolError(ctx, n.cfg.Logger, "PUT", err)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark the container node as processed in order to exclude it
|
// Mark the container node as processed in order to exclude it
|
||||||
// in subsequent container broadcast. Note that we don't
|
// in subsequent container broadcast. Note that we don't
|
||||||
|
|
|
@ -149,17 +149,7 @@ func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
||||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
completed := make(chan interface{})
|
err = e.Relay(ctx, info, c)
|
||||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
|
||||||
defer close(completed)
|
|
||||||
err = e.Relay(ctx, info, c)
|
|
||||||
}); poolErr != nil {
|
|
||||||
close(completed)
|
|
||||||
svcutil.LogWorkerPoolError(ctx, e.Config.Logger, "PUT", poolErr)
|
|
||||||
return poolErr
|
|
||||||
}
|
|
||||||
<-completed
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -343,21 +333,11 @@ func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, n
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
var err error
|
|
||||||
localTarget := LocalTarget{
|
localTarget := LocalTarget{
|
||||||
Storage: e.Config.LocalStore,
|
Storage: e.Config.LocalStore,
|
||||||
Container: e.Container,
|
Container: e.Container,
|
||||||
}
|
}
|
||||||
completed := make(chan interface{})
|
return localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||||
if poolErr := e.Config.LocalPool.Submit(func() {
|
|
||||||
defer close(completed)
|
|
||||||
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
|
||||||
}); poolErr != nil {
|
|
||||||
close(completed)
|
|
||||||
return poolErr
|
|
||||||
}
|
|
||||||
<-completed
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||||
|
@ -371,15 +351,5 @@ func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, n
|
||||||
nodeInfo: clientNodeInfo,
|
nodeInfo: clientNodeInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
return remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||||
completed := make(chan interface{})
|
|
||||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
|
||||||
defer close(completed)
|
|
||||||
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
|
||||||
}); poolErr != nil {
|
|
||||||
close(completed)
|
|
||||||
return poolErr
|
|
||||||
}
|
|
||||||
<-completed
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/panjf2000/ants/v2"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -131,9 +130,6 @@ func TestECWriter(t *testing.T) {
|
||||||
nodeKey, err := keys.NewPrivateKey()
|
nodeKey, err := keys.NewPrivateKey()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
pool, err := ants.NewPool(4, ants.WithNonblocking(true))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
log, err := logger.NewLogger(nil)
|
log, err := logger.NewLogger(nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -141,7 +137,6 @@ func TestECWriter(t *testing.T) {
|
||||||
ecw := ECWriter{
|
ecw := ECWriter{
|
||||||
Config: &Config{
|
Config: &Config{
|
||||||
NetmapKeys: n,
|
NetmapKeys: n,
|
||||||
RemotePool: pool,
|
|
||||||
Logger: log,
|
Logger: log,
|
||||||
ClientConstructor: clientConstructor{vectors: ns},
|
ClientConstructor: clientConstructor{vectors: ns},
|
||||||
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
KeyStorage: util.NewKeyStorage(&nodeKey.PrivateKey, nil, nil),
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -52,8 +51,6 @@ type Config struct {
|
||||||
|
|
||||||
NetmapSource netmap.Source
|
NetmapSource netmap.Source
|
||||||
|
|
||||||
RemotePool, LocalPool util.WorkerPool
|
|
||||||
|
|
||||||
NetmapKeys netmap.AnnouncedKeys
|
NetmapKeys netmap.AnnouncedKeys
|
||||||
|
|
||||||
FormatValidator *object.FormatValidator
|
FormatValidator *object.FormatValidator
|
||||||
|
@ -69,12 +66,6 @@ type Config struct {
|
||||||
|
|
||||||
type Option func(*Config)
|
type Option func(*Config)
|
||||||
|
|
||||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
|
||||||
return func(c *Config) {
|
|
||||||
c.RemotePool, c.LocalPool = remote, local
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLogger(l *logger.Logger) Option {
|
func WithLogger(l *logger.Logger) Option {
|
||||||
return func(c *Config) {
|
return func(c *Config) {
|
||||||
c.Logger = l
|
c.Logger = l
|
||||||
|
@ -87,13 +78,6 @@ func WithVerifySessionTokenIssuer(v bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
|
||||||
if c.NetmapKeys.IsLocalKey(pub) {
|
|
||||||
return c.LocalPool, true
|
|
||||||
}
|
|
||||||
return c.RemotePool, false
|
|
||||||
}
|
|
||||||
|
|
||||||
type Params struct {
|
type Params struct {
|
||||||
Config *Config
|
Config *Config
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -27,8 +26,6 @@ func NewService(ks *objutil.KeyStorage,
|
||||||
opts ...objectwriter.Option,
|
opts ...objectwriter.Option,
|
||||||
) *Service {
|
) *Service {
|
||||||
c := &objectwriter.Config{
|
c := &objectwriter.Config{
|
||||||
RemotePool: util.NewPseudoWorkerPool(),
|
|
||||||
LocalPool: util.NewPseudoWorkerPool(),
|
|
||||||
Logger: logger.NewLoggerWrapper(zap.L()),
|
Logger: logger.NewLoggerWrapper(zap.L()),
|
||||||
KeyStorage: ks,
|
KeyStorage: ks,
|
||||||
ClientConstructor: cc,
|
ClientConstructor: cc,
|
||||||
|
|
|
@ -17,11 +17,3 @@ func LogServiceError(ctx context.Context, l *logger.Logger, req string, node net
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogWorkerPoolError writes debug error message of object worker pool to provided logger.
|
|
||||||
func LogWorkerPoolError(ctx context.Context, l *logger.Logger, req string, err error) {
|
|
||||||
l.Error(ctx, logs.UtilCouldNotPushTaskToWorkerPool,
|
|
||||||
zap.String("request", req),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue