Refactor getsvc #277

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:object-3606 into master 2023-04-28 14:03:13 +00:00
10 changed files with 132 additions and 64 deletions
Showing only changes of commit 8de72f144d - Show all commits

View file

@ -70,7 +70,7 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
select {
case <-ctx.Done():
r.log.Debug(logs.InterruptPlacementIterationByContext,
zap.String("error", ctx.Err().Error()),
zap.Error(ctx.Err()),
)
return true

View file

@ -107,7 +107,7 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
default:
exec.log.Debug(logs.OperationFinishedWithError,
zap.String("error", exec.err.Error()),
zap.Error(exec.err),
)
if execCnr {

View file

@ -30,9 +30,7 @@ func (r *request) executeLocal(ctx context.Context) {
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetLocalGetFailed,
zap.String("error", err.Error()),
)
r.log.Debug(logs.GetLocalGetFailed, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil

View file

@ -36,9 +36,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.status = statusUndefined
r.err = errNotFound
r.log.Debug(logs.GetRemoteCallFailed,
zap.String("error", err.Error()),
)
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil

View file

@ -125,9 +125,7 @@ func (r *request) initEpoch() bool {
r.status = statusUndefined
r.err = err
r.log.Debug(logs.CouldNotGetCurrentEpochNumber,
zap.String("error", err.Error()),
)
r.log.Debug(logs.CouldNotGetCurrentEpochNumber, zap.Error(err))
return false
case err == nil:
@ -146,9 +144,7 @@ func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, boo
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser,
zap.String("error", err.Error()),
)
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
return nil, false
case err == nil:
@ -185,9 +181,7 @@ func (r *request) writeCollectedHeader(ctx context.Context) bool {
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWriteHeader,
zap.String("error", err.Error()),
)
r.log.Debug(logs.GetCouldNotWriteHeader, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil
@ -208,9 +202,7 @@ func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object)
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWritePayloadChunk,
zap.String("error", err.Error()),
)
r.log.Debug(logs.GetCouldNotWritePayloadChunk, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil

View file

@ -0,0 +1,92 @@
package getsvc
import (
"errors"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
)
var (
errMissingObjAddress = errors.New("missing object address")
errWrongMessageSeq = errors.New("incorrect message sequence")
errNilObjectPart = errors.New("nil object part")
errMissingSignature = errors.New("missing signature")
errInvalidObjectIDSign = errors.New("invalid object ID signature")
errWrongHeaderPartTypeExpShortRecvWithSignature = fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
errWrongHeaderPartTypeExpWithSignRecvShort = fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
)
func errInvalidObjAddress(err error) error {
return fmt.Errorf("invalid object address: %w", err)
}
func errRequestParamsValidation(err error) error {
return fmt.Errorf("request params validation: %w", err)
}
func errFetchingSessionKey(err error) error {
return fmt.Errorf("fetching session key: %w", err)
}
func errUnknownChechsumType(t refs.ChecksumType) error {
return fmt.Errorf("unknown checksum type %v", t)
}
func errResponseVerificationFailed(err error) error {
return fmt.Errorf("response verification failed: %w", err)
}
func errCouldNotWriteObjHeader(err error) error {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
func errStreamOpenningFailed(err error) error {
return fmt.Errorf("stream opening failed: %w", err)
}
func errReadingResponseFailed(err error) error {
return fmt.Errorf("reading the response failed: %w", err)
}
func errUnexpectedObjectPart(v objectV2.GetObjectPart) error {
return fmt.Errorf("unexpected object part %T", v)
}
func errCouldNotWriteObjChunk(forwarder string, err error) error {
return fmt.Errorf("could not write object chunk in %s forwarder: %w", forwarder, err)
}
func errCouldNotVerifyRangeResponse(resp *objectV2.GetRangeResponse, err error) error {
return fmt.Errorf("could not verify %T: %w", resp, err)
}
func errCouldNotCreateGetRangeStream(err error) error {
return fmt.Errorf("could not create Get payload range stream: %w", err)
}
func errUnexpectedRangePart(v objectV2.GetRangePart) error {
return fmt.Errorf("unexpected range type %T", v)
}
func errUnexpectedHeaderPart(v objectV2.GetHeaderPart) error {
return fmt.Errorf("unexpected header type %T", v)
}
func errMarshalID(err error) error {
return fmt.Errorf("marshal ID: %w", err)
}
func errCantReadSignature(err error) error {
return fmt.Errorf("can't read signature: %w", err)
}
func errSendingRequestFailed(err error) error {
return fmt.Errorf("sending the request failed: %w", err)
}

View file

@ -4,7 +4,6 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
@ -71,7 +70,7 @@ func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
return errResponseVerificationFailed(err)
}
return checkStatus(resp.GetMetaHeader().GetStatus())
@ -89,7 +88,7 @@ func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetOb
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
})
if err != nil {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
return errCouldNotWriteObjHeader(err)
}
return nil
}
@ -102,7 +101,7 @@ func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Addre
return e
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
return nil, errStreamOpenningFailed(err)
}
return getStream, nil
}
@ -127,7 +126,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
return errReadingResponseFailed(err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
@ -136,7 +135,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return fmt.Errorf("unexpected object part %T", v)
return errUnexpectedObjectPart(v)
case *objectV2.GetObjectPartInit:
if headWas {
return errWrongMessageSeq
@ -159,7 +158,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
return errCouldNotWriteObjChunk("Get", err)
}
localProgress += len(origChunk)

View file

@ -4,7 +4,6 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
@ -73,7 +72,7 @@ func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeRespons
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("could not verify %T: %w", resp, err)
return errCouldNotVerifyRangeResponse(resp, err)
}
return checkStatus(resp.GetMetaHeader().GetStatus())
@ -88,7 +87,7 @@ func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.
return e
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
return nil, errCouldNotCreateGetRangeStream(err)
}
return rangeStream, nil
}
@ -105,7 +104,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
return errReadingResponseFailed(err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
@ -114,7 +113,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return fmt.Errorf("unexpected range type %T", v)
return errUnexpectedRangePart(v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
@ -125,7 +124,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
return errCouldNotWriteObjChunk("GetRange", err)
}
localProgress += len(origChunk)

View file

@ -3,8 +3,6 @@ package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
@ -74,7 +72,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
return nil, errUnexpectedHeaderPart(v)
case *objectV2.ShortHeader:
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
return nil, err
@ -100,9 +98,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
if !f.Request.GetBody().GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
return nil, errWrongHeaderPartTypeExpShortRecvWithSignature
}
hdr := new(objectV2.Header)
@ -118,35 +114,32 @@ func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
if f.Request.GetBody().GetMainOnly() {
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
return nil, nil, errWrongHeaderPartTypeExpWithSignRecvShort
}
if hdrWithSig == nil {
return nil, nil, errors.New("nil object part")
return nil, nil, errNilObjectPart
}
hdr := hdrWithSig.GetHeader()
idSig := hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, nil, errors.New("missing signature")
return nil, nil, errMissingSignature
}
binID, err := f.ObjectAddr.Object().Marshal()
if err != nil {
return nil, nil, fmt.Errorf("marshal ID: %w", err)
return nil, nil, errMarshalID(err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, nil, fmt.Errorf("can't read signature: %w", err)
return nil, nil, errCantReadSignature(err)
}
if !sig.Verify(binID) {
return nil, nil, errors.New("invalid object ID signature")
return nil, nil, errInvalidObjectIDSign
}
return hdr, idSig, nil
@ -160,7 +153,7 @@ func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network
return e
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
return nil, errSendingRequestFailed(err)
}
return headResp, nil
}
@ -173,7 +166,7 @@ func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, p
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
return errResponseVerificationFailed(err)
}
return checkStatus(f.Response.GetMetaHeader().GetStatus())

View file

@ -4,7 +4,6 @@ import (
"context"
"crypto/sha256"
"errors"
"fmt"
"hash"
"sync"
@ -24,21 +23,19 @@ import (
"git.frostfs.info/TrueCloudLab/tzhash/tz"
)
var errWrongMessageSeq = errors.New("incorrect message sequence")
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
body := req.GetBody()
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var addr oid.Address
err := addr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)
@ -81,14 +78,14 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var addr oid.Address
err := addr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)
@ -108,7 +105,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
err = p.Validate()
if err != nil {
return nil, fmt.Errorf("request params validation: %w", err)
return nil, errRequestParamsValidation(err)
}
if !commonPrm.LocalOnly() {
@ -136,14 +133,14 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var addr oid.Address
err := addr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)
@ -167,7 +164,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
}
if err != nil {
return nil, fmt.Errorf("fetching session key: %w", err)
return nil, errFetchingSessionKey(err)
}
p.WithCachedSignerKey(signerKey)
@ -185,7 +182,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
switch t := body.GetType(); t {
default:
return nil, fmt.Errorf("unknown checksum type %v", t)
return nil, errUnknownChechsumType(t)
case refs.SHA256:
p.SetHashGenerator(func() hash.Hash {
return sha256.New()
@ -220,14 +217,14 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var objAddr oid.Address
err := objAddr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)