[#9999] putSvc: Drop unused
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
43c8f82911
commit
9cc57d0e4b
3 changed files with 0 additions and 117 deletions
|
@ -1,10 +1,6 @@
|
||||||
package putsvc
|
package putsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
@ -19,10 +15,6 @@ type PutInitPrm struct {
|
||||||
cnr containerSDK.Container
|
cnr containerSDK.Container
|
||||||
|
|
||||||
traverseOpts []placement.Option
|
traverseOpts []placement.Option
|
||||||
|
|
||||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
|
||||||
|
|
||||||
privateKey *ecdsa.PrivateKey
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type PutChunkPrm struct {
|
type PutChunkPrm struct {
|
||||||
|
@ -53,14 +45,6 @@ func (p *PutInitPrm) WithCopyNumbers(v []uint32) *PutInitPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PutInitPrm) WithRelay(f func(context.Context, client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
|
|
||||||
if p != nil {
|
|
||||||
p.relay = f
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.chunk = v
|
p.chunk = v
|
||||||
|
@ -68,11 +52,3 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm {
|
|
||||||
if p != nil {
|
|
||||||
p.privateKey = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
|
@ -5,27 +5,18 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
|
||||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
||||||
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type streamer struct {
|
type streamer struct {
|
||||||
stream *putsvc.Streamer
|
stream *putsvc.Streamer
|
||||||
keyStorage *util.KeyStorage
|
keyStorage *util.KeyStorage
|
||||||
saveChunks bool
|
saveChunks bool
|
||||||
init *object.PutRequest
|
|
||||||
chunks []*object.PutRequest
|
chunks []*object.PutRequest
|
||||||
|
|
||||||
*sizes // only for relay streams
|
*sizes // only for relay streams
|
||||||
|
@ -66,8 +57,6 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
|
||||||
if s.payloadSz > maxSz {
|
if s.payloadSz > maxSz {
|
||||||
return target.ErrExceedingMaxSize
|
return target.ErrExceedingMaxSize
|
||||||
}
|
}
|
||||||
|
|
||||||
s.init = req
|
|
||||||
}
|
}
|
||||||
case *object.PutObjectPartChunk:
|
case *object.PutObjectPartChunk:
|
||||||
if s.saveChunks {
|
if s.saveChunks {
|
||||||
|
@ -129,84 +118,3 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error
|
||||||
|
|
||||||
return fromPutResponse(resp), nil
|
return fromPutResponse(resp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamer) relayRequest(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) error {
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.relayRequest")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
// open stream
|
|
||||||
resp := new(object.PutResponse)
|
|
||||||
|
|
||||||
key := info.PublicKey()
|
|
||||||
|
|
||||||
var firstErr error
|
|
||||||
|
|
||||||
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.iterateAddress",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("address", addr.String()),
|
|
||||||
))
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
stop = err == nil
|
|
||||||
|
|
||||||
if stop || firstErr == nil {
|
|
||||||
firstErr = err
|
|
||||||
}
|
|
||||||
|
|
||||||
// would be nice to log otherwise
|
|
||||||
}()
|
|
||||||
|
|
||||||
var stream *rpc.PutRequestWriter
|
|
||||||
|
|
||||||
err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error {
|
|
||||||
stream, err = rpc.PutObject(cli, resp, rawclient.WithContext(ctx))
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("stream opening failed: %w", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// send init part
|
|
||||||
err = stream.Write(s.init)
|
|
||||||
if err != nil {
|
|
||||||
internalclient.ReportError(c, err)
|
|
||||||
err = fmt.Errorf("sending the initial message to stream failed: %w", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range s.chunks {
|
|
||||||
if err = stream.Write(s.chunks[i]); err != nil {
|
|
||||||
internalclient.ReportError(c, err)
|
|
||||||
err = fmt.Errorf("sending the chunk %d failed: %w", i, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// close object stream and receive response from remote node
|
|
||||||
err = stream.Close()
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("closing the stream failed: %w", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify response key
|
|
||||||
if err = internal.VerifyResponseKeyV2(key, resp); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify response structure
|
|
||||||
err = signature.VerifyServiceMessage(resp)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("response verification failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
})
|
|
||||||
|
|
||||||
return firstErr
|
|
||||||
}
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
|
||||||
WithObject(
|
WithObject(
|
||||||
objectSDK.NewFromV2(oV2),
|
objectSDK.NewFromV2(oV2),
|
||||||
).
|
).
|
||||||
WithRelay(s.relayRequest).
|
|
||||||
WithCommonPrm(commonPrm).
|
WithCommonPrm(commonPrm).
|
||||||
WithCopyNumbers(part.GetCopiesNumber()), nil
|
WithCopyNumbers(part.GetCopiesNumber()), nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue