diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 0c8f12b45..9f2dfd971 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -1,10 +1,6 @@ package putsvc import ( - "context" - "crypto/ecdsa" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" @@ -19,10 +15,6 @@ type PutInitPrm struct { cnr containerSDK.Container traverseOpts []placement.Option - - relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error - - privateKey *ecdsa.PrivateKey } type PutChunkPrm struct { @@ -53,14 +45,6 @@ func (p *PutInitPrm) WithCopyNumbers(v []uint32) *PutInitPrm { return p } -func (p *PutInitPrm) WithRelay(f func(context.Context, client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm { - if p != nil { - p.relay = f - } - - return p -} - func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { if p != nil { p.chunk = v @@ -68,11 +52,3 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { return p } - -func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm { - if p != nil { - p.privateKey = v - } - - return p -} diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 5bf15b4cd..6aa3a3315 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -5,27 +5,18 @@ import ( "fmt" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" - rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" - internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) type streamer struct { stream *putsvc.Streamer keyStorage *util.KeyStorage saveChunks bool - init *object.PutRequest chunks []*object.PutRequest *sizes // only for relay streams @@ -66,8 +57,6 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error) if s.payloadSz > maxSz { return target.ErrExceedingMaxSize } - - s.init = req } case *object.PutObjectPartChunk: if s.saveChunks { @@ -129,84 +118,3 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error return fromPutResponse(resp), nil } - -func (s *streamer) relayRequest(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) error { - ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.relayRequest") - defer span.End() - - // open stream - resp := new(object.PutResponse) - - key := info.PublicKey() - - var firstErr error - - info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) { - ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.iterateAddress", - trace.WithAttributes( - attribute.String("address", addr.String()), - )) - defer span.End() - - var err error - - defer func() { - stop = err == nil - - if stop || firstErr == nil { - firstErr = err - } - - // would be nice to log otherwise - }() - - var stream *rpc.PutRequestWriter - - err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error { - stream, err = rpc.PutObject(cli, resp, rawclient.WithContext(ctx)) - return err - }) - if err != nil { - err = fmt.Errorf("stream opening failed: %w", err) - return - } - - // send init part - err = stream.Write(s.init) - if err != nil { - internalclient.ReportError(c, err) - err = fmt.Errorf("sending the initial message to stream failed: %w", err) - return - } - - for i := range s.chunks { - if err = stream.Write(s.chunks[i]); err != nil { - internalclient.ReportError(c, err) - err = fmt.Errorf("sending the chunk %d failed: %w", i, err) - return - } - } - - // close object stream and receive response from remote node - err = stream.Close() - if err != nil { - err = fmt.Errorf("closing the stream failed: %w", err) - return - } - - // verify response key - if err = internal.VerifyResponseKeyV2(key, resp); err != nil { - return - } - - // verify response structure - err = signature.VerifyServiceMessage(resp) - if err != nil { - err = fmt.Errorf("response verification failed: %w", err) - } - - return - }) - - return firstErr -} diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go index a157a9542..59d4f9d90 100644 --- a/pkg/services/object/put/v2/util.go +++ b/pkg/services/object/put/v2/util.go @@ -23,7 +23,6 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put WithObject( objectSDK.NewFromV2(oV2), ). - WithRelay(s.relayRequest). WithCommonPrm(commonPrm). WithCopyNumbers(part.GetCopiesNumber()), nil }