forked from TrueCloudLab/frostfs-node
[#521] *: use stdlib errors
package
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
43e575cec2
commit
71b87155ef
171 changed files with 825 additions and 674 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
acl "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
|
||||
|
@ -23,7 +24,6 @@ import (
|
|||
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
|
||||
eaclV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl/v2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type (
|
||||
|
|
|
@ -3,6 +3,7 @@ package acl
|
|||
import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
acl "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
|
||||
|
@ -15,7 +16,6 @@ import (
|
|||
v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||
core "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -52,7 +52,7 @@ func (c SenderClassifier) Classify(
|
|||
cid *container.ID,
|
||||
cnr *container.Container) (role acl.Role, isIR bool, key []byte, err error) {
|
||||
if cid == nil {
|
||||
return 0, false, nil, errors.Wrap(ErrMalformedRequest, "container id is not set")
|
||||
return 0, false, nil, fmt.Errorf("%w: container id is not set", ErrMalformedRequest)
|
||||
}
|
||||
|
||||
ownerID, ownerKey, err := requestOwner(req)
|
||||
|
@ -95,7 +95,7 @@ func (c SenderClassifier) Classify(
|
|||
|
||||
func requestOwner(req metaWithToken) (*owner.ID, *ecdsa.PublicKey, error) {
|
||||
if req.vheader == nil {
|
||||
return nil, nil, errors.Wrap(ErrMalformedRequest, "nil verification header")
|
||||
return nil, nil, fmt.Errorf("%w: nil verification header", ErrMalformedRequest)
|
||||
}
|
||||
|
||||
// if session token is presented, use it as truth source
|
||||
|
@ -107,13 +107,13 @@ func requestOwner(req metaWithToken) (*owner.ID, *ecdsa.PublicKey, error) {
|
|||
// otherwise get original body signature
|
||||
bodySignature := originalBodySignature(req.vheader)
|
||||
if bodySignature == nil {
|
||||
return nil, nil, errors.Wrap(ErrMalformedRequest, "nil at body signature")
|
||||
return nil, nil, fmt.Errorf("%w: nil at body signature", ErrMalformedRequest)
|
||||
}
|
||||
|
||||
key := crypto.UnmarshalPublicKey(bodySignature.Key())
|
||||
neo3wallet, err := owner.NEO3WalletFromPublicKey(key)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "can't create neo3 wallet")
|
||||
return nil, nil, fmt.Errorf("can't create neo3 wallet: %w", err)
|
||||
}
|
||||
|
||||
// form user from public key
|
||||
|
@ -202,7 +202,7 @@ func ownerFromToken(token *session.SessionToken) (*owner.ID, *ecdsa.PublicKey, e
|
|||
tokenSignature := token.GetSignature()
|
||||
return tokenSignature.GetKey(), tokenSignature.GetSign()
|
||||
}); err != nil {
|
||||
return nil, nil, errors.Wrap(ErrMalformedRequest, "invalid session token signature")
|
||||
return nil, nil, fmt.Errorf("%w: invalid session token signature", ErrMalformedRequest)
|
||||
}
|
||||
|
||||
// 2. Then check if session token owner issued the session token
|
||||
|
@ -211,7 +211,7 @@ func ownerFromToken(token *session.SessionToken) (*owner.ID, *ecdsa.PublicKey, e
|
|||
|
||||
if !isOwnerFromKey(tokenOwner, tokenIssuerKey) {
|
||||
// todo: in this case we can issue all owner keys from neofs.id and check once again
|
||||
return nil, nil, errors.Wrap(ErrMalformedRequest, "invalid session token owner")
|
||||
return nil, nil, fmt.Errorf("%w: invalid session token owner", ErrMalformedRequest)
|
||||
}
|
||||
|
||||
return tokenOwner, tokenIssuerKey, nil
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
@ -16,7 +17,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
|
|
@ -2,11 +2,11 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
|
|
@ -2,13 +2,13 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Service implements Get operation of Object service v2.
|
||||
|
|
|
@ -3,6 +3,8 @@ package getsvc
|
|||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"sync"
|
||||
|
@ -22,7 +24,6 @@ import (
|
|||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/tzhash/tz"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
|
@ -79,7 +80,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
|
|||
// open stream
|
||||
stream, err := rpc.GetObject(c.Raw(), req, rpcclient.WithContext(stream.Context()))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "stream opening failed")
|
||||
return nil, fmt.Errorf("stream opening failed: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -93,7 +94,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
|
|||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !headWas {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
@ -101,17 +102,17 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
|
|||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||
default:
|
||||
return nil, errors.Errorf("unexpected object part %T", v)
|
||||
return nil, fmt.Errorf("unexpected object part %T", v)
|
||||
case *objectV2.GetObjectPartInit:
|
||||
if headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
|
@ -201,7 +202,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
// open stream
|
||||
stream, err := rpc.GetObjectRange(c.Raw(), req, rpcclient.WithContext(stream.Context()))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Get payload range stream")
|
||||
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
|
||||
}
|
||||
|
||||
payload := make([]byte, 0, body.GetRange().GetLength())
|
||||
|
@ -212,21 +213,21 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
return nil, errors.Errorf("unexpected range type %T", v)
|
||||
return nil, fmt.Errorf("unexpected range type %T", v)
|
||||
case *objectV2.GetRangePartChunk:
|
||||
payload = append(payload, v.GetChunk()...)
|
||||
case *objectV2.SplitInfo:
|
||||
|
@ -279,7 +280,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
|
|||
|
||||
switch t := body.GetType(); t {
|
||||
default:
|
||||
return nil, errors.Errorf("unknown checksum type %v", t)
|
||||
return nil, fmt.Errorf("unknown checksum type %v", t)
|
||||
case refs.SHA256:
|
||||
p.SetHashGenerator(func() hash.Hash {
|
||||
return sha256.New()
|
||||
|
@ -364,12 +365,12 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
|||
// send Head request
|
||||
resp, err := rpc.HeadObject(c.Raw(), req, rpcclient.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "sending the request failed")
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -379,10 +380,10 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
|||
|
||||
switch v := resp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, errors.Errorf("unexpected header type %T", v)
|
||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||
case *objectV2.ShortHeader:
|
||||
if !body.GetMainOnly() {
|
||||
return nil, errors.Errorf("wrong header part type: expected %T, received %T",
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
}
|
||||
|
@ -399,7 +400,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
|||
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
||||
case *objectV2.HeaderWithSignature:
|
||||
if body.GetMainOnly() {
|
||||
return nil, errors.Errorf("wrong header part type: expected %T, received %T",
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
}
|
||||
|
@ -420,7 +421,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
|||
return idSig.GetKey(), idSig.GetSign()
|
||||
},
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "incorrect object header signature")
|
||||
return nil, fmt.Errorf("incorrect object header signature: %w", err)
|
||||
}
|
||||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
|
|
|
@ -2,13 +2,14 @@ package headsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ClientConstructor interface {
|
||||
|
@ -62,7 +63,7 @@ func (p *RemoteHeadPrm) WithObjectAddress(v *objectSDK.Address) *RemoteHeadPrm {
|
|||
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Object, error) {
|
||||
key, err := h.keyStorage.GetKey(prm.commonHeadPrm.common.SessionToken())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive private key", h)
|
||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||
}
|
||||
|
||||
addr, err := prm.node.HostAddrString()
|
||||
|
@ -72,7 +73,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
|
|||
|
||||
c, err := h.clientCache.Get(addr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr)
|
||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, addr, err)
|
||||
}
|
||||
|
||||
p := new(client.ObjectHeaderParams).
|
||||
|
@ -90,7 +91,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
|
|||
client.WithKey(key),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not head object in %s", h, addr)
|
||||
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, addr, err)
|
||||
}
|
||||
|
||||
return object.NewFromSDK(hdr), nil
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
|
@ -10,7 +12,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type distributedTarget struct {
|
||||
|
@ -48,7 +49,7 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
append(t.traverseOpts, placement.ForObject(t.obj.ID()))...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not create object placement traverser", t)
|
||||
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
|
||||
}
|
||||
|
||||
sz := 0
|
||||
|
@ -66,7 +67,7 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
t.obj.SetPayload(payload)
|
||||
|
||||
if err := t.fmt.ValidateContent(t.obj.Object()); err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not validate payload content", t)
|
||||
return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
|
||||
}
|
||||
|
||||
loop:
|
||||
|
@ -90,12 +91,12 @@ loop:
|
|||
|
||||
if err := target.WriteHeader(t.obj); err != nil {
|
||||
svcutil.LogServiceError(t.log, "PUT", addr,
|
||||
errors.Wrap(err, "could not write header"))
|
||||
fmt.Errorf("could not write header: %w", err))
|
||||
|
||||
return
|
||||
} else if _, err := target.Close(); err != nil {
|
||||
svcutil.LogServiceError(t.log, "PUT", addr,
|
||||
errors.Wrap(err, "could not close object stream"))
|
||||
fmt.Errorf("could not close object stream: %w", err))
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type localTarget struct {
|
||||
|
@ -31,7 +32,7 @@ func (t *localTarget) Write(p []byte) (n int, err error) {
|
|||
|
||||
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||
if err := engine.Put(t.storage, t.obj.Object()); err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not put object to local storage", t)
|
||||
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
||||
}
|
||||
|
||||
return new(transformer.AccessIdentifiers).
|
||||
|
|
|
@ -2,13 +2,13 @@ package putsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type remoteTarget struct {
|
||||
|
@ -51,7 +51,7 @@ func (t *remoteTarget) WriteHeader(obj *object.RawObject) error {
|
|||
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||
key, err := t.keyStorage.GetKey(t.commonPrm.SessionToken())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive private key", t)
|
||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
|
||||
}
|
||||
|
||||
addr, err := t.addr.HostAddrString()
|
||||
|
@ -61,7 +61,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
|
||||
c, err := t.clientConstructor.Get(addr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr)
|
||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, addr, err)
|
||||
}
|
||||
|
||||
id, err := c.PutObject(t.ctx, new(client.PutObjectParams).
|
||||
|
@ -75,7 +75,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
)...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not put object to %s", t, addr)
|
||||
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, addr, err)
|
||||
}
|
||||
|
||||
return new(transformer.AccessIdentifiers).
|
||||
|
@ -118,9 +118,9 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
|
|||
}
|
||||
|
||||
if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not send object header", s)
|
||||
return fmt.Errorf("(%T) could not send object header: %w", s, err)
|
||||
} else if _, err := t.Close(); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not send object", s)
|
||||
return fmt.Errorf("(%T) could not send object: %w", s, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -2,13 +2,14 @@ package putsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Streamer struct {
|
||||
|
@ -26,13 +27,13 @@ var errInitRecall = errors.New("init recall")
|
|||
func (p *Streamer) Init(prm *PutInitPrm) error {
|
||||
// initialize destination target
|
||||
if err := p.initTarget(prm); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not initialize object target", p)
|
||||
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
|
||||
}
|
||||
|
||||
return errors.Wrapf(
|
||||
p.target.WriteHeader(prm.hdr),
|
||||
"(%T) could not write header to target", p,
|
||||
)
|
||||
if err := p.target.WriteHeader(prm.hdr); err != nil {
|
||||
return fmt.Errorf("(%T) could not write header to target: %w", p, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||
|
@ -43,7 +44,7 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
|||
|
||||
// prepare needed put parameters
|
||||
if err := p.preparePrm(prm); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not prepare put parameters", p)
|
||||
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
|
||||
}
|
||||
|
||||
if prm.hdr.Signature() != nil {
|
||||
|
@ -63,12 +64,12 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
|||
// get private token from local storage
|
||||
sessionKey, err := p.keyStorage.GetKey(sToken)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not receive session key", p)
|
||||
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
||||
}
|
||||
|
||||
maxSz := p.maxSizeSrc.MaxObjectSize()
|
||||
if maxSz == 0 {
|
||||
return errors.Errorf("(%T) could not obtain max object size parameter", p)
|
||||
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
|
||||
}
|
||||
|
||||
p.target = transformer.NewPayloadSizeLimiter(
|
||||
|
@ -92,13 +93,13 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
|||
// get latest network map
|
||||
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not get latest network map", p)
|
||||
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
|
||||
}
|
||||
|
||||
// get container to store the object
|
||||
cnr, err := p.cnrSrc.Get(prm.hdr.ContainerID())
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not get container by ID", p)
|
||||
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
||||
}
|
||||
|
||||
// add common options
|
||||
|
@ -158,7 +159,7 @@ func (p *Streamer) SendChunk(prm *PutChunkPrm) error {
|
|||
|
||||
_, err := p.target.Write(prm.chunk)
|
||||
|
||||
return errors.Wrapf(err, "(%T) could not write payload chunk to target", p)
|
||||
return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err)
|
||||
}
|
||||
|
||||
func (p *Streamer) Close() (*PutResponse, error) {
|
||||
|
@ -168,7 +169,7 @@ func (p *Streamer) Close() (*PutResponse, error) {
|
|||
|
||||
ids, err := p.target.Close()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not close object target", p)
|
||||
return nil, fmt.Errorf("(%T) could not close object target: %w", p, err)
|
||||
}
|
||||
|
||||
id := ids.ParentID()
|
||||
|
|
|
@ -2,10 +2,10 @@ package putsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Service implements Put operation of Object service v2.
|
||||
|
@ -37,7 +37,7 @@ func NewService(opts ...Option) *Service {
|
|||
func (s *Service) Put(ctx context.Context) (object.PutObjectStream, error) {
|
||||
stream, err := s.svc.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not open object put stream", s)
|
||||
return nil, fmt.Errorf("(%T) could not open object put stream: %w", s, err)
|
||||
}
|
||||
|
||||
return &streamer{
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type streamer struct {
|
||||
|
@ -21,14 +22,14 @@ func (s *streamer) Send(req *object.PutRequest) (err error) {
|
|||
}
|
||||
|
||||
if err = s.stream.Init(initPrm); err != nil {
|
||||
err = errors.Wrapf(err, "(%T) could not init object put stream", s)
|
||||
err = fmt.Errorf("(%T) could not init object put stream: %w", s, err)
|
||||
}
|
||||
case *object.PutObjectPartChunk:
|
||||
if err = s.stream.SendChunk(toChunkPrm(v)); err != nil {
|
||||
err = errors.Wrapf(err, "(%T) could not send payload chunk", s)
|
||||
err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err)
|
||||
}
|
||||
default:
|
||||
err = errors.Errorf("(%T) invalid object put stream part type %T", s, v)
|
||||
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
|
||||
}
|
||||
|
||||
return
|
||||
|
@ -37,7 +38,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) {
|
|||
func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
|
||||
resp, err := s.stream.Close()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not object put stream", s)
|
||||
return nil, fmt.Errorf("(%T) could not object put stream: %w", s, err)
|
||||
}
|
||||
|
||||
return fromPutResponse(resp), nil
|
||||
|
|
|
@ -3,13 +3,13 @@ package putsvc
|
|||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"hash"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||
"github.com/nspcc-dev/tzhash/tz"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type validatingTarget struct {
|
||||
|
@ -26,7 +26,7 @@ func (t *validatingTarget) WriteHeader(obj *object.RawObject) error {
|
|||
cs := obj.PayloadChecksum()
|
||||
switch typ := cs.Type(); typ {
|
||||
default:
|
||||
return errors.Errorf("(%T) unsupported payload checksum type %v", t, typ)
|
||||
return fmt.Errorf("(%T) unsupported payload checksum type %v", t, typ)
|
||||
case pkg.ChecksumSHA256:
|
||||
t.hash = sha256.New()
|
||||
case pkg.ChecksumTZ:
|
||||
|
@ -36,7 +36,7 @@ func (t *validatingTarget) WriteHeader(obj *object.RawObject) error {
|
|||
t.checksum = cs.Sum()
|
||||
|
||||
if err := t.fmt.Validate(obj.Object()); err != nil {
|
||||
return errors.Wrapf(err, "(%T) coult not validate object format", t)
|
||||
return fmt.Errorf("(%T) coult not validate object format: %w", t, err)
|
||||
}
|
||||
|
||||
return t.nextTarget.WriteHeader(obj)
|
||||
|
@ -53,7 +53,7 @@ func (t *validatingTarget) Write(p []byte) (n int, err error) {
|
|||
|
||||
func (t *validatingTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||
if !bytes.Equal(t.hash.Sum(nil), t.checksum) {
|
||||
return nil, errors.Errorf("(%T) incorrect payload checksum", t)
|
||||
return nil, fmt.Errorf("(%T) incorrect payload checksum", t)
|
||||
}
|
||||
|
||||
return t.nextTarget.Close()
|
||||
|
|
|
@ -2,11 +2,11 @@ package object
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ResponseService struct {
|
||||
|
@ -66,7 +66,7 @@ func (s *putStreamResponser) Send(req *object.PutRequest) error {
|
|||
func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
|
||||
r, err := s.stream.CloseAndRecv()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive response", s)
|
||||
return nil, fmt.Errorf("(%T) could not receive response: %w", s, err)
|
||||
}
|
||||
|
||||
return r.(*object.PutResponse), nil
|
||||
|
@ -75,7 +75,7 @@ func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
|
|||
func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||
stream, err := s.svc.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Put object streamer")
|
||||
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||
}
|
||||
|
||||
return &putStreamResponser{
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
@ -15,7 +16,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package searchsvc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
|
@ -16,7 +18,6 @@ import (
|
|||
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) {
|
||||
|
@ -80,16 +81,16 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
|
|||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
}
|
||||
|
||||
chunk := resp.GetBody().GetIDList()
|
||||
|
|
|
@ -3,10 +3,10 @@ package object
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/util"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type SignService struct {
|
||||
|
@ -74,7 +74,7 @@ func (s *putStreamSigner) Send(req *object.PutRequest) error {
|
|||
func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
|
||||
r, err := s.stream.CloseAndRecv()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not receive response")
|
||||
return nil, fmt.Errorf("could not receive response: %w", err)
|
||||
}
|
||||
|
||||
return r.(*object.PutResponse), nil
|
||||
|
@ -83,7 +83,7 @@ func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
|
|||
func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||
stream, err := s.svc.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Put object streamer")
|
||||
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||
}
|
||||
|
||||
return &putStreamSigner{
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
netmapSDK "github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type localPlacement struct {
|
||||
|
@ -44,7 +45,7 @@ func NewLocalPlacement(b placement.Builder, s network.LocalAddressSource) placem
|
|||
func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) {
|
||||
vs, err := p.builder.BuildPlacement(addr, policy)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not build object placement", p)
|
||||
return nil, fmt.Errorf("(%T) could not build object placement: %w", p, err)
|
||||
}
|
||||
|
||||
for i := range vs {
|
||||
|
@ -61,7 +62,7 @@ func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.
|
|||
}
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("(%T) local node is outside of object placement", p)
|
||||
return nil, fmt.Errorf("(%T) local node is outside of object placement", p)
|
||||
}
|
||||
|
||||
// NewRemotePlacementBuilder creates, initializes and returns placement builder that
|
||||
|
@ -76,7 +77,7 @@ func NewRemotePlacementBuilder(b placement.Builder, s network.LocalAddressSource
|
|||
func (p *remotePlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) {
|
||||
vs, err := p.builder.BuildPlacement(addr, policy)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not build object placement", p)
|
||||
return nil, fmt.Errorf("(%T) could not build object placement: %w", p, err)
|
||||
}
|
||||
|
||||
for i := range vs {
|
||||
|
@ -122,13 +123,13 @@ func (g *TraverserGenerator) GenerateTraverser(addr *object.Address, epoch uint6
|
|||
// get network map by epoch
|
||||
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not get network map #%d", epoch)
|
||||
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||
}
|
||||
|
||||
// get container related container
|
||||
cnr, err := g.cnrSrc.Get(addr.ContainerID())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get container")
|
||||
return nil, fmt.Errorf("could not get container: %w", err)
|
||||
}
|
||||
|
||||
// allocate placement traverser options
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue