Refactor get service #193

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:refactoring/OBJECT-3610_getsvc into master 2023-04-05 14:38:49 +00:00
14 changed files with 623 additions and 477 deletions

View file

@ -10,7 +10,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) assemble() { func (exec *execCtx) assemble(ctx context.Context) {
if !exec.canAssemble() { if !exec.canAssemble() {
exec.log.Debug("can not assemble the object") exec.log.Debug("can not assemble the object")
return return
@ -49,7 +49,7 @@ func (exec *execCtx) assemble() {
zap.Uint64("range_length", exec.ctxRange().GetLength()), zap.Uint64("range_length", exec.ctxRange().GetLength()),
) )
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter) obj, err := assembler.Assemble(ctx, exec.prm.objWriter)
if err != nil { if err != nil {
exec.log.Warn("failed to assemble splitted object", exec.log.Warn("failed to assemble splitted object",
zap.Error(err), zap.Error(err),
@ -107,8 +107,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w) prm.SetHeaderWriter(w)
//nolint: contextcheck err := exec.svc.Head(ctx, prm)
err := exec.svc.Head(exec.context(), prm)
if err != nil { if err != nil {
return nil, err return nil, err
@ -128,8 +127,7 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra
p.addr.SetContainer(exec.containerID()) p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id) p.addr.SetObject(id)
//nolint: contextcheck statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng))
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
if statusError.err != nil { if statusError.err != nil {
return nil, statusError.err return nil, statusError.err

View file

@ -7,7 +7,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeOnContainer() { func (exec *execCtx) executeOnContainer(ctx context.Context) {
if exec.isLocal() { if exec.isLocal() {
exec.log.Debug("return result directly") exec.log.Debug("return result directly")
return return
@ -26,7 +26,7 @@ func (exec *execCtx) executeOnContainer() {
} }
for { for {
if exec.processCurrentEpoch() { if exec.processCurrentEpoch(ctx) {
break break
} }
@ -42,7 +42,7 @@ func (exec *execCtx) executeOnContainer() {
} }
} }
func (exec *execCtx) processCurrentEpoch() bool { func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
exec.log.Debug("process epoch", exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
@ -52,7 +52,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
return true return true
} }
ctx, cancel := context.WithCancel(exec.context()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
exec.status = statusUndefined exec.status = statusUndefined

View file

@ -19,12 +19,9 @@ type statusError struct {
err error err error
} }
// nolint: containedctx
type execCtx struct { type execCtx struct {
svc *Service svc *Service
ctx context.Context
prm RangePrm prm RangePrm
statusError statusError
@ -80,10 +77,6 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
)} )}
} }
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool { func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly() return exec.prm.common.LocalOnly()
} }
@ -217,13 +210,13 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
} }
} }
func (exec *execCtx) writeCollectedHeader() bool { func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
if exec.ctxRange() != nil { if exec.ctxRange() != nil {
return true return true
} }
err := exec.prm.objWriter.WriteHeader( err := exec.prm.objWriter.WriteHeader(
exec.context(), ctx,
exec.collectedObject.CutPayload(), exec.collectedObject.CutPayload(),
) )
@ -243,12 +236,12 @@ func (exec *execCtx) writeCollectedHeader() bool {
return exec.status == statusOK return exec.status == statusOK
} }
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if exec.headOnly() { if exec.headOnly() {
return true return true
} }
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload()) err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch { switch {
default: default:
@ -266,9 +259,9 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
return err == nil return err == nil
} }
func (exec *execCtx) writeCollectedObject() { func (exec *execCtx) writeCollectedObject(ctx context.Context) {
if ok := exec.writeCollectedHeader(); ok { if ok := exec.writeCollectedHeader(ctx); ok {
exec.writeObjectPayload(exec.collectedObject) exec.writeObjectPayload(ctx, exec.collectedObject)
} }
} }

View file

@ -65,7 +65,6 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError { func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
exec := &execCtx{ exec := &execCtx{
svc: s, svc: s,
ctx: ctx,
prm: RangePrm{ prm: RangePrm{
commonPrm: prm, commonPrm: prm,
}, },
@ -78,22 +77,21 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
exec.setLogger(s.log) exec.setLogger(s.log)
//nolint: contextcheck exec.execute(ctx)
exec.execute()
return exec.statusError return exec.statusError
} }
func (exec *execCtx) execute() { func (exec *execCtx) execute(ctx context.Context) {
exec.log.Debug("serving request...") exec.log.Debug("serving request...")
// perform local operation // perform local operation
exec.executeLocal() exec.executeLocal(ctx)
exec.analyzeStatus(true) exec.analyzeStatus(ctx, true)
} }
func (exec *execCtx) analyzeStatus(execCnr bool) { func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result // analyze local result
switch exec.status { switch exec.status {
case statusOK: case statusOK:
@ -102,7 +100,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
exec.log.Debug("requested object was marked as removed") exec.log.Debug("requested object was marked as removed")
case statusVIRTUAL: case statusVIRTUAL:
exec.log.Debug("requested object is virtual") exec.log.Debug("requested object is virtual")
exec.assemble() exec.assemble(ctx)
case statusOutOfRange: case statusOutOfRange:
exec.log.Debug("requested range is out of object bounds") exec.log.Debug("requested range is out of object bounds")
default: default:
@ -111,8 +109,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
) )
if execCnr { if execCnr {
exec.executeOnContainer() exec.executeOnContainer(ctx)
exec.analyzeStatus(false) exec.analyzeStatus(ctx, false)
} }
} }
} }

View file

@ -117,7 +117,7 @@ func newTestClient() *testClient {
} }
} }
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
v, ok := c.results[exec.address().EncodeToString()] v, ok := c.results[exec.address().EncodeToString()]
if !ok { if !ok {
var errNotFound apistatus.ObjectNotFound var errNotFound apistatus.ObjectNotFound

View file

@ -1,6 +1,7 @@
package getsvc package getsvc
import ( import (
"context"
"errors" "errors"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -8,7 +9,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeLocal() { func (exec *execCtx) executeLocal(ctx context.Context) {
var err error var err error
exec.collectedObject, err = exec.svc.localStorage.get(exec) exec.collectedObject, err = exec.svc.localStorage.get(exec)
@ -28,7 +29,7 @@ func (exec *execCtx) executeLocal() {
case err == nil: case err == nil:
exec.status = statusOK exec.status = statusOK
exec.err = nil exec.err = nil
exec.writeCollectedObject() exec.writeCollectedObject(ctx)
case errors.As(err, &errRemoved): case errors.As(err, &errRemoved):
exec.status = statusINHUMED exec.status = statusINHUMED
exec.err = errRemoved exec.err = errRemoved

View file

@ -59,7 +59,7 @@ type RangeHashPrm struct {
salt []byte salt []byte
} }
type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error) type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
// HeadPrm groups parameters of Head service call. // HeadPrm groups parameters of Head service call.
type HeadPrm struct { type HeadPrm struct {

View file

@ -18,7 +18,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
return true return true
} }
obj, err := client.getObject(exec, info) obj, err := client.getObject(ctx, exec, info)
var errSplitInfo *objectSDK.SplitInfoError var errSplitInfo *objectSDK.SplitInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved var errRemoved *apistatus.ObjectAlreadyRemoved
@ -43,8 +43,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
// has already been streamed to the requesting party // has already been streamed to the requesting party
if obj != nil { if obj != nil {
exec.collectedObject = obj exec.collectedObject = obj
//nolint: contextcheck exec.writeCollectedObject(ctx)
exec.writeCollectedObject()
} }
case errors.As(err, &errRemoved): case errors.As(err, &errRemoved):
exec.status = statusINHUMED exec.status = statusINHUMED

View file

@ -1,6 +1,8 @@
package getsvc package getsvc
import ( import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
@ -22,7 +24,7 @@ type Service struct {
type Option func(*cfg) type Option func(*cfg)
type getClient interface { type getClient interface {
getObject(*execCtx, client.NodeInfo) (*object.Object, error) getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
} }
type cfg struct { type cfg struct {

View file

@ -87,10 +87,9 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
}, nil }, nil
} }
// nolint: funlen func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() { if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client) return exec.prm.forwarder(ctx, info, c.client)
} }
key, err := exec.key() key, err := exec.key()
@ -99,83 +98,91 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
} }
if exec.headOnly() { if exec.headOnly() {
var prm internalclient.HeadObjectPrm return c.getHeadOnly(ctx, exec, key)
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.HeadObject(prm)
if err != nil {
return nil, err
}
return res.Header(), nil
} }
// we don't specify payload writer because we accumulate // we don't specify payload writer because we accumulate
// the object locally (even huge). // the object locally (even huge).
if rng := exec.ctxRange(); rng != nil { if rng := exec.ctxRange(); rng != nil {
var prm internalclient.PayloadRangePrm // Current spec allows other storage node to deny access,
// fallback to GET here.
prm.SetContext(exec.context()) return c.getRange(ctx, exec, key, rng)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
// Current spec allows other storage node to deny access,
// fallback to GET here.
obj, err := c.get(exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
} }
return c.get(exec, key) return c.get(ctx, exec, key)
} }
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
var prm internalclient.PayloadRangePrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
obj, err := c.get(ctx, exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
}
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.HeadObjectPrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.HeadObject(prm)
if err != nil {
return nil, err
}
return res.Header(), nil
}
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm var prm internalclient.GetObjectPrm
prm.SetContext(exec.context()) prm.SetContext(ctx)
prm.SetClient(c.client) prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(exec.curProcEpoch)

View file

@ -0,0 +1,168 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type getRequestForwarder struct {
OnceResign *sync.Once
OnceHeaderSending *sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRequest
Stream *streamObjectWriter
}
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
getStream, err := f.openStream(ctx, addr, c)
if err != nil {
return nil, err
}
return nil, f.readStream(ctx, c, getStream, pubkey)
}
func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}
func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetObjectPartInit) error {
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
var err error
f.OnceHeaderSending.Do(func() {
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
})
if err != nil {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
return nil
}
func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.GetResponseReader, error) {
var getStream *rpc.GetResponseReader
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
getStream, e = rpc.GetObject(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
return getStream, nil
}
func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddressClient, getStream *rpc.GetResponseReader, pubkey []byte) error {
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
return err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return errWrongMessageSeq
}
headWas = true
if err := f.writeHeader(ctx, v); err != nil {
return err
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
f.GlobalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return object.NewSplitInfoError(si)
}
}
return nil
}

View file

@ -0,0 +1,134 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type getRangeRequestForwarder struct {
OnceResign *sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRangeRequest
Stream *streamObjectRangeWriter
}
func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
rangeStream, err := f.openStream(ctx, addr, c)
if err != nil {
return nil, err
}
return nil, f.readStream(ctx, rangeStream, c, pubkey)
}
func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("could not verify %T: %w", resp, err)
}
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}
func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.ObjectRangeResponseReader, error) {
// open stream
var rangeStream *rpc.ObjectRangeResponseReader
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
rangeStream, e = rpc.GetObjectRange(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
}
return rangeStream, nil
}
func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *rpc.ObjectRangeResponseReader, c client.MultiAddressClient, pubkey []byte) error {
resp := new(objectV2.GetRangeResponse)
var localProgress int
for {
// receive message from server stream
err := rangeStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
return err
}
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return fmt.Errorf("unexpected range type %T", v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
localProgress += len(origChunk)
f.GlobalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return object.NewSplitInfoError(si)
}
}
return nil
}

View file

@ -0,0 +1,175 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type headRequestForwarder struct {
Request *objectV2.HeadRequest
Response *objectV2.HeadResponse
OnceResign *sync.Once
ObjectAddr oid.Address
Key *ecdsa.PrivateKey
}
func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
headResp, err := f.sendHeadRequest(ctx, addr, c)
if err != nil {
return nil, err
}
if err := f.verifyResponse(headResp, pubkey); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
return nil, err
}
case *objectV2.HeaderWithSignature:
if hdr, idSig, err = f.getHeaderAndSignature(v); err != nil {
return nil, err
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(f.ObjectAddr.Object())
return obj, nil
}
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
if !f.Request.GetBody().GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
hdr := new(objectV2.Header)
hdr.SetPayloadLength(sh.GetPayloadLength())
hdr.SetVersion(sh.GetVersion())
hdr.SetOwnerID(sh.GetOwnerID())
hdr.SetObjectType(sh.GetObjectType())
hdr.SetCreationEpoch(sh.GetCreationEpoch())
hdr.SetPayloadHash(sh.GetPayloadHash())
hdr.SetHomomorphicHash(sh.GetHomomorphicHash())
return hdr, nil
}
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
if f.Request.GetBody().GetMainOnly() {
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
if hdrWithSig == nil {
return nil, nil, errors.New("nil object part")
}
hdr := hdrWithSig.GetHeader()
idSig := hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, nil, errors.New("missing signature")
}
binID, err := f.ObjectAddr.Object().Marshal()
if err != nil {
return nil, nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, nil, errors.New("invalid object ID signature")
}
return hdr, idSig, nil
}
func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*objectV2.HeadResponse, error) {
var headResp *objectV2.HeadResponse
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
headResp, e = rpc.HeadObject(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
return headResp, nil
}
func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
if err := checkStatus(f.Response.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}

View file

@ -6,25 +6,18 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
"io"
"sync" "sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
@ -33,7 +26,6 @@ import (
var errWrongMessageSeq = errors.New("incorrect message sequence") var errWrongMessageSeq = errors.New("incorrect message sequence")
// nolint: funlen, gocognit
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
body := req.GetBody() body := req.GetBody()
@ -49,8 +41,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
return nil, fmt.Errorf("invalid object address: %w", err) return nil, fmt.Errorf("invalid object address: %w", err)
} }
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req) commonPrm, err := util.CommonPrmFromV2(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -66,141 +56,26 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
p.SetObjectWriter(streamWrapper) p.SetObjectWriter(streamWrapper)
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
var onceHeaderSending sync.Once forwarder := &getRequestForwarder{
var globalProgress int OnceResign: &sync.Once{},
OnceHeaderSending: &sync.Once{},
GlobalProgress: 0,
Key: key,
Request: req,
Stream: streamWrapper,
}
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
var err error
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.GetObject implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// open stream
var getStream *rpc.GetResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return nil, io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return nil, fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return nil, errWrongMessageSeq
}
headWas = true
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
onceHeaderSending.Do(func() {
err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj))
})
if err != nil {
return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return nil, errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
return nil, nil
}))
} }
return p, nil return p, nil
} }
// nolint: funlen, gocognit
func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) { func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) {
body := req.GetBody() body := req.GetBody()
@ -216,8 +91,6 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
return nil, fmt.Errorf("invalid object address: %w", err) return nil, fmt.Errorf("invalid object address: %w", err)
} }
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req) commonPrm, err := util.CommonPrmFromV2(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -239,104 +112,20 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
} }
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once
var globalProgress int
key, err := s.keyStorage.GetKey(nil) key, err := s.keyStorage.GetKey(nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { forwarder := &getRangeRequestForwarder{
var err error OnceResign: &sync.Once{},
GlobalProgress: 0,
Key: key,
Request: req,
Stream: streamWrapper,
}
// once compose and resign forwarding request p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.ObjectPayloadRangeData implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// open stream
var rangeStream *rpc.ObjectRangeResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
}
resp := new(objectV2.GetRangeResponse)
var localProgress int
for {
// receive message from server stream
err := rangeStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
internalclient.ReportError(c, err)
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return nil, fmt.Errorf("unexpected range type %T", v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
return nil, nil
}))
} }
return p, nil return p, nil
@ -426,7 +215,6 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
return nil return nil
} }
// nolint: funlen
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody() body := req.GetBody()
@ -442,8 +230,6 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
return nil, fmt.Errorf("invalid object address: %w", err) return nil, fmt.Errorf("invalid object address: %w", err)
} }
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req) commonPrm, err := util.CommonPrmFromV2(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -463,135 +249,20 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
return p, nil return p, nil
} }
var onceResign sync.Once key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { forwarder := &headRequestForwarder{
var err error Request: req,
Response: resp,
OnceResign: &sync.Once{},
ObjectAddr: objAddr,
Key: key,
}
key, err := s.keyStorage.GetKey(nil) p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.GetObjectHeader implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// send Head request
var headResp *objectV2.HeadResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if !body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
h := v
hdr = new(objectV2.Header)
hdr.SetPayloadLength(h.GetPayloadLength())
hdr.SetVersion(h.GetVersion())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetPayloadHash(h.GetPayloadHash())
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
case *objectV2.HeaderWithSignature:
if body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
hdrWithSig := v
if hdrWithSig == nil {
return nil, errors.New("nil object part")
}
hdr = hdrWithSig.GetHeader()
idSig = hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, errors.New("missing signature")
}
binID, err := objAddr.Object().Marshal()
if err != nil {
return nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, errors.New("invalid object ID signature")
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(objAddr.Object())
// convert the object
return obj, nil
}))
return p, nil return p, nil
} }
@ -659,8 +330,8 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
return sh return sh
} }
func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder { func groupAddressRequestForwarder(f func(context.Context, network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
return func(info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) { return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
var ( var (
firstErr error firstErr error
res *object.Object res *object.Object
@ -681,7 +352,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli
// would be nice to log otherwise // would be nice to log otherwise
}() }()
res, err = f(addr, c, key) res, err = f(ctx, addr, c, key)
return return
}) })