This commit is contained in:
Anton Nikiforov 2024-12-20 14:50:23 +03:00
parent 148d68933b
commit 5abf4e555f
9 changed files with 39 additions and 11 deletions

View file

@ -66,6 +66,7 @@ $(BINS): $(DIRS) dep
@echo "⇒ Build $@" @echo "⇒ Build $@"
CGO_ENABLED=0 \ CGO_ENABLED=0 \
go build -v -trimpath \ go build -v -trimpath \
-gcflags "all=-N -l" \
-ldflags "-X $(REPO)/misc.Version=$(VERSION)" \ -ldflags "-X $(REPO)/misc.Version=$(VERSION)" \
-o $@ ./cmd/$(notdir $@) -o $@ ./cmd/$(notdir $@)

View file

@ -12,6 +12,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/experimental"
"google.golang.org/grpc/mem"
) )
const serviceNameControl = "control" const serviceNameControl = "control"
@ -50,7 +52,7 @@ func initControlService(ctx context.Context, c *cfg) {
return return
} }
c.cfgControlService.server = grpc.NewServer() c.cfgControlService.server = grpc.NewServer(experimental.BufferPool(mem.NopBufferPool{}))
c.onShutdown(func() { c.onShutdown(func() {
stopGRPC(ctx, "FrostFS Control API", c.cfgControlService.server, c.log) stopGRPC(ctx, "FrostFS Control API", c.cfgControlService.server, c.log)

View file

@ -15,6 +15,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/experimental"
"google.golang.org/grpc/mem"
) )
const maxRecvMsgSize = 256 << 20 const maxRecvMsgSize = 256 << 20
@ -37,6 +39,7 @@ func initGRPC(ctx context.Context, c *cfg) {
} }
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint()) c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
serverOpts = append(serverOpts, experimental.BufferPool(mem.NopBufferPool{}))
srv := grpc.NewServer(serverOpts...) srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() { c.onShutdown(func() {
@ -94,6 +97,7 @@ func tryReconnect(ctx context.Context, endpoint string, c *cfg) bool {
} }
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint) c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
serverOpts = append(serverOpts, experimental.BufferPool(mem.NopBufferPool{}))
srv := grpc.NewServer(serverOpts...) srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() { c.onShutdown(func() {

7
go.mod
View file

@ -2,6 +2,11 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.22 go 1.22
replace (
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467 => /home/annikifa/workspace/frostfs-sdk-go/
google.golang.org/grpc v1.66.2 => /home/annikifa/workspace/grpc-go/
)
require ( require (
code.gitea.io/sdk/gitea v0.17.1 code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1-0.20241205083807-762d7f9f9f08 git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1-0.20241205083807-762d7f9f9f08
@ -27,7 +32,7 @@ require (
github.com/klauspost/compress v1.17.4 github.com/klauspost/compress v1.17.4
github.com/mailru/easyjson v0.7.7 github.com/mailru/easyjson v0.7.7
github.com/mr-tron/base58 v1.2.0 github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1 github.com/multiformats/go-multiaddr v0.14.0
github.com/nspcc-dev/neo-go v0.106.3 github.com/nspcc-dev/neo-go v0.106.3
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0 github.com/panjf2000/ants/v2 v2.9.0

BIN
go.sum

Binary file not shown.

View file

@ -33,6 +33,8 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/experimental"
"google.golang.org/grpc/mem"
) )
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper, func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
@ -346,7 +348,8 @@ func (s *Server) initGRPCServer(ctx context.Context, cfg *viper.Viper, log *logg
controlsrv.WithAllowedKeys(authKeys), controlsrv.WithAllowedKeys(authKeys),
), log, audit) ), log, audit)
grpcControlSrv := grpc.NewServer() opts := []grpc.ServerOption{experimental.BufferPool(mem.NopBufferPool{})}
grpcControlSrv := grpc.NewServer(opts...)
control.RegisterControlServiceServer(grpcControlSrv, controlSvc) control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
s.runners = append(s.runners, func(ch chan<- error) error { s.runners = append(s.runners, func(ch chan<- error) error {

View file

@ -39,9 +39,10 @@ func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWrit
prm.SignRequestPrivateKey = nodeKey prm.SignRequestPrivateKey = nodeKey
} }
wrt, _ := objectwriter.New(prm)
// prepare untrusted-Put object target // prepare untrusted-Put object target
return &validatingPreparedTarget{ return &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)), nextTarget: newInMemoryObjectBuilder(wrt),
fmt: prm.Config.FormatValidator, fmt: prm.Config.FormatValidator,
maxPayloadSz: maxPayloadSz, maxPayloadSz: maxPayloadSz,
@ -98,11 +99,13 @@ func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter
prm.SignRequestPrivateKey = key prm.SignRequestPrivateKey = key
} }
wrt, onClose := objectwriter.New(prm)
return &validatingTarget{ return &validatingTarget{
fmt: prm.Config.FormatValidator, fmt: prm.Config.FormatValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key, Key: key,
NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) }, NextTargetInit: func() transformer.ObjectWriter { return wrt },
OnClose: onClose,
NetworkState: prm.Config.NetworkState, NetworkState: prm.Config.NetworkState,
MaxSize: maxPayloadSz, MaxSize: maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container), WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container),

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"golang.org/x/sync/errgroup"
) )
type preparedObjectTarget interface { type preparedObjectTarget interface {
@ -26,6 +27,8 @@ type distributedWriter struct {
relay func(context.Context, NodeDescriptor) error relay func(context.Context, NodeDescriptor) error
resetSuccessAfterOnBroadcast bool resetSuccessAfterOnBroadcast bool
eg errgroup.Group
} }
// Traversal parameters and state of container. // Traversal parameters and state of container.
@ -104,7 +107,10 @@ func (t *distributedWriter) WriteObject(ctx context.Context, obj *objectSDK.Obje
if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil { if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", t, err) return fmt.Errorf("(%T) could not validate payload content: %w", t, err)
} }
return t.iteratePlacement(ctx) t.eg.Go(func() error {
return t.iteratePlacement(ctx)
})
return nil
} }
func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error { func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error {

View file

@ -110,14 +110,14 @@ type Params struct {
SignRequestPrivateKey *ecdsa.PrivateKey SignRequestPrivateKey *ecdsa.PrivateKey
} }
func New(prm *Params) transformer.ObjectWriter { func New(prm *Params) (transformer.ObjectWriter, func() error) {
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) { if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
return newECWriter(prm) return newECWriter(prm), nil
} }
return newDefaultObjectWriter(prm, false) return newDefaultObjectWriter(prm, false)
} }
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter { func newDefaultObjectWriter(prm *Params, forECPlacement bool) (transformer.ObjectWriter, func() error) {
var relay func(context.Context, NodeDescriptor) error var relay func(context.Context, NodeDescriptor) error
if prm.Relay != nil { if prm.Relay != nil {
relay = func(ctx context.Context, node NodeDescriptor) error { relay = func(ctx context.Context, node NodeDescriptor) error {
@ -143,7 +143,7 @@ func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.Object
resetSuccessAfterOnBroadcast = true resetSuccessAfterOnBroadcast = true
} }
return &distributedWriter{ dw := &distributedWriter{
cfg: prm.Config, cfg: prm.Config,
placementOpts: traverseOpts, placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast, resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
@ -167,9 +167,13 @@ func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.Object
}, },
relay: relay, relay: relay,
} }
return dw, func() error {
return dw.eg.Wait()
}
} }
func newECWriter(prm *Params) transformer.ObjectWriter { func newECWriter(prm *Params) transformer.ObjectWriter {
dw, _ := newDefaultObjectWriter(prm, true)
return &objectWriterDispatcher{ return &objectWriterDispatcher{
ecWriter: &ECWriter{ ecWriter: &ECWriter{
Config: prm.Config, Config: prm.Config,
@ -179,6 +183,6 @@ func newECWriter(prm *Params) transformer.ObjectWriter {
CommonPrm: prm.Common, CommonPrm: prm.Common,
Relay: prm.Relay, Relay: prm.Relay,
}, },
repWriter: newDefaultObjectWriter(prm, true), repWriter: dw,
} }
} }