Refactor get service #193

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:refactoring/OBJECT-3610_getsvc into master 2023-04-05 14:38:49 +00:00
14 changed files with 623 additions and 477 deletions

View file

@ -10,7 +10,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) assemble() { func (exec *execCtx) assemble(ctx context.Context) {

Can we use more meaningful commit message for this? Like a sentence you use in the body.

Can we use more meaningful commit message for this? Like a sentence you use in the body.

And same for the last commit, we have 2 commits with identical message doing pretty different things.

And same for the last commit, we have 2 commits with identical message doing pretty different things.

Fixed

Fixed
if !exec.canAssemble() { if !exec.canAssemble() {
exec.log.Debug("can not assemble the object") exec.log.Debug("can not assemble the object")
return return
@ -49,7 +49,7 @@ func (exec *execCtx) assemble() {
zap.Uint64("range_length", exec.ctxRange().GetLength()), zap.Uint64("range_length", exec.ctxRange().GetLength()),
) )
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter) obj, err := assembler.Assemble(ctx, exec.prm.objWriter)
if err != nil { if err != nil {
exec.log.Warn("failed to assemble splitted object", exec.log.Warn("failed to assemble splitted object",
zap.Error(err), zap.Error(err),
@ -107,8 +107,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w) prm.SetHeaderWriter(w)
//nolint: contextcheck err := exec.svc.Head(ctx, prm)
err := exec.svc.Head(exec.context(), prm)
if err != nil { if err != nil {
return nil, err return nil, err
@ -128,8 +127,7 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra
p.addr.SetContainer(exec.containerID()) p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id) p.addr.SetObject(id)
//nolint: contextcheck statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng))
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
if statusError.err != nil { if statusError.err != nil {
return nil, statusError.err return nil, statusError.err

View file

@ -7,7 +7,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeOnContainer() { func (exec *execCtx) executeOnContainer(ctx context.Context) {
if exec.isLocal() { if exec.isLocal() {
exec.log.Debug("return result directly") exec.log.Debug("return result directly")
return return
@ -26,7 +26,7 @@ func (exec *execCtx) executeOnContainer() {
} }
for { for {
if exec.processCurrentEpoch() { if exec.processCurrentEpoch(ctx) {
break break
} }
@ -42,7 +42,7 @@ func (exec *execCtx) executeOnContainer() {
} }
} }
func (exec *execCtx) processCurrentEpoch() bool { func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
exec.log.Debug("process epoch", exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
@ -52,7 +52,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
return true return true
} }
ctx, cancel := context.WithCancel(exec.context()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
exec.status = statusUndefined exec.status = statusUndefined

View file

@ -19,12 +19,9 @@ type statusError struct {
err error err error
} }
// nolint: containedctx
type execCtx struct { type execCtx struct {
svc *Service svc *Service
ctx context.Context
prm RangePrm prm RangePrm
statusError statusError
@ -80,10 +77,6 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
)} )}
} }
func (exec execCtx) context() context.Context {
return exec.ctx
fyrchik marked this conversation as resolved Outdated

Do we still need the field in the struct?

Do we still need the field in the struct?

No, we dont. It's dropped.

No, we dont. It's dropped.
}
func (exec execCtx) isLocal() bool { func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly() return exec.prm.common.LocalOnly()
} }
@ -217,13 +210,13 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
} }
} }
func (exec *execCtx) writeCollectedHeader() bool { func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
if exec.ctxRange() != nil { if exec.ctxRange() != nil {
return true return true
} }
err := exec.prm.objWriter.WriteHeader( err := exec.prm.objWriter.WriteHeader(
exec.context(), ctx,
exec.collectedObject.CutPayload(), exec.collectedObject.CutPayload(),
) )
@ -243,12 +236,12 @@ func (exec *execCtx) writeCollectedHeader() bool {
return exec.status == statusOK return exec.status == statusOK
} }
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if exec.headOnly() { if exec.headOnly() {
return true return true
} }
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload()) err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch { switch {
default: default:
@ -266,9 +259,9 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
return err == nil return err == nil
} }
func (exec *execCtx) writeCollectedObject() { func (exec *execCtx) writeCollectedObject(ctx context.Context) {
if ok := exec.writeCollectedHeader(); ok { if ok := exec.writeCollectedHeader(ctx); ok {
exec.writeObjectPayload(exec.collectedObject) exec.writeObjectPayload(ctx, exec.collectedObject)
} }
} }

View file

@ -65,7 +65,6 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError { func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
exec := &execCtx{ exec := &execCtx{
svc: s, svc: s,
ctx: ctx,
prm: RangePrm{ prm: RangePrm{
commonPrm: prm, commonPrm: prm,
}, },
@ -78,22 +77,21 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
exec.setLogger(s.log) exec.setLogger(s.log)
//nolint: contextcheck exec.execute(ctx)
exec.execute()
return exec.statusError return exec.statusError
} }
func (exec *execCtx) execute() { func (exec *execCtx) execute(ctx context.Context) {
exec.log.Debug("serving request...") exec.log.Debug("serving request...")
// perform local operation // perform local operation
exec.executeLocal() exec.executeLocal(ctx)
exec.analyzeStatus(true) exec.analyzeStatus(ctx, true)
} }
func (exec *execCtx) analyzeStatus(execCnr bool) { func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result // analyze local result
switch exec.status { switch exec.status {
case statusOK: case statusOK:
@ -102,7 +100,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
exec.log.Debug("requested object was marked as removed") exec.log.Debug("requested object was marked as removed")
case statusVIRTUAL: case statusVIRTUAL:
exec.log.Debug("requested object is virtual") exec.log.Debug("requested object is virtual")
exec.assemble() exec.assemble(ctx)
case statusOutOfRange: case statusOutOfRange:
exec.log.Debug("requested range is out of object bounds") exec.log.Debug("requested range is out of object bounds")
default: default:
@ -111,8 +109,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
) )
if execCnr { if execCnr {
exec.executeOnContainer() exec.executeOnContainer(ctx)
exec.analyzeStatus(false) exec.analyzeStatus(ctx, false)
} }
} }
} }

View file

@ -117,7 +117,7 @@ func newTestClient() *testClient {
} }
} }
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
v, ok := c.results[exec.address().EncodeToString()] v, ok := c.results[exec.address().EncodeToString()]
if !ok { if !ok {
var errNotFound apistatus.ObjectNotFound var errNotFound apistatus.ObjectNotFound

View file

@ -1,6 +1,7 @@
package getsvc package getsvc
import ( import (
"context"
"errors" "errors"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -8,7 +9,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) executeLocal() { func (exec *execCtx) executeLocal(ctx context.Context) {
var err error var err error
exec.collectedObject, err = exec.svc.localStorage.get(exec) exec.collectedObject, err = exec.svc.localStorage.get(exec)
@ -28,7 +29,7 @@ func (exec *execCtx) executeLocal() {
case err == nil: case err == nil:
exec.status = statusOK exec.status = statusOK
exec.err = nil exec.err = nil
exec.writeCollectedObject() exec.writeCollectedObject(ctx)
case errors.As(err, &errRemoved): case errors.As(err, &errRemoved):
exec.status = statusINHUMED exec.status = statusINHUMED
exec.err = errRemoved exec.err = errRemoved

View file

@ -59,7 +59,7 @@ type RangeHashPrm struct {
salt []byte salt []byte
} }
type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error) type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
// HeadPrm groups parameters of Head service call. // HeadPrm groups parameters of Head service call.
type HeadPrm struct { type HeadPrm struct {

View file

@ -18,7 +18,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
return true return true
} }
obj, err := client.getObject(exec, info) obj, err := client.getObject(ctx, exec, info)
var errSplitInfo *objectSDK.SplitInfoError var errSplitInfo *objectSDK.SplitInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved var errRemoved *apistatus.ObjectAlreadyRemoved
@ -43,8 +43,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
// has already been streamed to the requesting party // has already been streamed to the requesting party
if obj != nil { if obj != nil {
exec.collectedObject = obj exec.collectedObject = obj
//nolint: contextcheck exec.writeCollectedObject(ctx)
exec.writeCollectedObject()
} }
case errors.As(err, &errRemoved): case errors.As(err, &errRemoved):
exec.status = statusINHUMED exec.status = statusINHUMED

View file

@ -1,6 +1,8 @@
package getsvc package getsvc
import ( import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
@ -22,7 +24,7 @@ type Service struct {
type Option func(*cfg) type Option func(*cfg)
type getClient interface { type getClient interface {
getObject(*execCtx, client.NodeInfo) (*object.Object, error) getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
} }
type cfg struct { type cfg struct {

View file

@ -87,10 +87,9 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
}, nil }, nil
} }
// nolint: funlen func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() { if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client) return exec.prm.forwarder(ctx, info, c.client)
} }
key, err := exec.key() key, err := exec.key()
@ -99,9 +98,66 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
} }
if exec.headOnly() { if exec.headOnly() {
return c.getHeadOnly(ctx, exec, key)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return c.getRange(ctx, exec, key, rng)
}
return c.get(ctx, exec, key)
}
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
var prm internalclient.PayloadRangePrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
obj, err := c.get(ctx, exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
}
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.HeadObjectPrm var prm internalclient.HeadObjectPrm
prm.SetContext(exec.context()) prm.SetContext(ctx)
prm.SetClient(c.client) prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(exec.curProcEpoch)
@ -121,61 +177,12 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
} }
return res.Header(), nil return res.Header(), nil
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
var prm internalclient.PayloadRangePrm
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
// Current spec allows other storage node to deny access,
// fallback to GET here.
obj, err := c.get(exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
}
return c.get(exec, key)
} }
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm var prm internalclient.GetObjectPrm
prm.SetContext(exec.context()) prm.SetContext(ctx)
prm.SetClient(c.client) prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(exec.curProcEpoch)

View file

@ -0,0 +1,168 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type getRequestForwarder struct {
OnceResign *sync.Once
OnceHeaderSending *sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRequest
Stream *streamObjectWriter
}
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {

so not used second return value?

so not used second return value?

forwardRequestToNode is used as an argument for func groupAddressRequestForwarder(...) function. groupAddressRequestForwarder requires two values to be returned. It is possible to use a lambda function, but I don't like it.

Do you think that with a lambda and one return value it will be easier to understand?

```forwardRequestToNode``` is used as an argument for ```func groupAddressRequestForwarder(...)``` function. ```groupAddressRequestForwarder``` requires two values to be returned. It is possible to use a lambda function, but I don't like it. Do you think that with a lambda and one return value it will be easier to understand?

Do you think that with a lambda and one return value it will be easier to understand?

no, do not like lambdas usually as more complex thing for a reader.

is used as an argument for func groupAddressRequestForwarder(...) function

originally, comment was about that: seems like anything that is ...forwarder... should not return objects. if groupAddressRequestForwarder is used somewhere else AND that is not a complete refactor and we plan to reorganize it later, im ok with that

> Do you think that with a lambda and one return value it will be easier to understand? no, do not like lambdas usually as more complex thing for a reader. > is used as an argument for func groupAddressRequestForwarder(...) function originally, comment was about that: seems like anything that is `...forwarder...` should not return objects. if `groupAddressRequestForwarder` is used somewhere else AND that is not a complete refactor and we plan to reorganize it later, im ok with that

HEAD request requires return result from forwarder

HEAD request requires return result from forwarder
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
getStream, err := f.openStream(ctx, addr, c)
if err != nil {
return nil, err
}
return nil, f.readStream(ctx, c, getStream, pubkey)
}
func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey []byte) error {
// verify response key
carpawell marked this conversation as resolved Outdated

can we just return f.readStream err if no details are added? (personally, i would add some details via fmt.Errorf)

related to more than just that line

can we just return `f.readStream` err if no details are added? (personally, i would add some details via `fmt.Errorf`) related to more than just that line

Fixed.

Fixed.
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}
func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetObjectPartInit) error {
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
var err error
f.OnceHeaderSending.Do(func() {
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
})
if err != nil {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
return nil
}
func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.GetResponseReader, error) {
var getStream *rpc.GetResponseReader
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
getStream, e = rpc.GetObject(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
return getStream, nil
}
func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddressClient, getStream *rpc.GetResponseReader, pubkey []byte) error {
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
return err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return errWrongMessageSeq
}
headWas = true
if err := f.writeHeader(ctx, v); err != nil {
return err
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
f.GlobalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return object.NewSplitInfoError(si)
}
}
return nil
}

View file

@ -0,0 +1,134 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type getRangeRequestForwarder struct {
OnceResign *sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRangeRequest
Stream *streamObjectRangeWriter
}
func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
rangeStream, err := f.openStream(ctx, addr, c)
if err != nil {
return nil, err
}
return nil, f.readStream(ctx, rangeStream, c, pubkey)
}
func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("could not verify %T: %w", resp, err)
}
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}
func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.ObjectRangeResponseReader, error) {
// open stream
var rangeStream *rpc.ObjectRangeResponseReader
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
rangeStream, e = rpc.GetObjectRange(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
}
return rangeStream, nil
}
func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *rpc.ObjectRangeResponseReader, c client.MultiAddressClient, pubkey []byte) error {
resp := new(objectV2.GetRangeResponse)
var localProgress int
for {
// receive message from server stream
err := rangeStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
return err
}
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return fmt.Errorf("unexpected range type %T", v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
localProgress += len(origChunk)
f.GlobalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return object.NewSplitInfoError(si)
}
}
return nil
}

View file

@ -0,0 +1,175 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type headRequestForwarder struct {
Request *objectV2.HeadRequest
Response *objectV2.HeadResponse
OnceResign *sync.Once
ObjectAddr oid.Address
Key *ecdsa.PrivateKey
}
func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
headResp, err := f.sendHeadRequest(ctx, addr, c)
if err != nil {
return nil, err
}
if err := f.verifyResponse(headResp, pubkey); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
return nil, err
}
case *objectV2.HeaderWithSignature:
if hdr, idSig, err = f.getHeaderAndSignature(v); err != nil {
return nil, err
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(f.ObjectAddr.Object())
return obj, nil
}
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
if !f.Request.GetBody().GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
hdr := new(objectV2.Header)
hdr.SetPayloadLength(sh.GetPayloadLength())
hdr.SetVersion(sh.GetVersion())
hdr.SetOwnerID(sh.GetOwnerID())
hdr.SetObjectType(sh.GetObjectType())
hdr.SetCreationEpoch(sh.GetCreationEpoch())
hdr.SetPayloadHash(sh.GetPayloadHash())
hdr.SetHomomorphicHash(sh.GetHomomorphicHash())
return hdr, nil
}
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
if f.Request.GetBody().GetMainOnly() {
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
if hdrWithSig == nil {
return nil, nil, errors.New("nil object part")
}
hdr := hdrWithSig.GetHeader()
idSig := hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, nil, errors.New("missing signature")
}
binID, err := f.ObjectAddr.Object().Marshal()
if err != nil {
return nil, nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, nil, errors.New("invalid object ID signature")
}
return hdr, idSig, nil
}
func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*objectV2.HeadResponse, error) {
var headResp *objectV2.HeadResponse
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
headResp, e = rpc.HeadObject(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
return headResp, nil
}
func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
if err := checkStatus(f.Response.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}

View file

@ -6,25 +6,18 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
"io"
"sync" "sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
@ -33,7 +26,6 @@ import (
var errWrongMessageSeq = errors.New("incorrect message sequence") var errWrongMessageSeq = errors.New("incorrect message sequence")
// nolint: funlen, gocognit
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
body := req.GetBody() body := req.GetBody()
@ -49,8 +41,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
return nil, fmt.Errorf("invalid object address: %w", err) return nil, fmt.Errorf("invalid object address: %w", err)
} }
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req) commonPrm, err := util.CommonPrmFromV2(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -66,141 +56,26 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
p.SetObjectWriter(streamWrapper) p.SetObjectWriter(streamWrapper)
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once
var onceHeaderSending sync.Once
var globalProgress int
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil) key, err := s.keyStorage.GetKey(nil)
if err != nil { if err != nil {

Is there any reason you don't just use non-pointer field in struct and omit the initialization?

Is there any reason you don't just use non-pointer field in struct and omit the initialization?

This is the easiest way to meet this requirement: Values containing the types defined in this package should not be copied. (https://pkg.go.dev/sync)

This is the easiest way to meet this requirement: *Values containing the types defined in this package should not be copied.* (https://pkg.go.dev/sync)

but forwarder is already a pointer? i am ok with that, just want to come to some approach that will be commonly adopted in that repo (the other code do not use pointer inside a pointer i guess)

but `forwarder` is already a pointer? i am ok with that, just want to come to some approach that will be commonly adopted in that repo (the other code do not use pointer inside a pointer i guess)

the other code do not use pointer inside a pointer i guess

But you should always think how holder of sync.Mutex (RWMutex, Once...) will be used.

> the other code do not use pointer inside a pointer i guess But you should always think how holder of ```sync.Mutex (RWMutex, Once...)``` will be used.
return nil, err return nil, err
} }
// once compose and resign forwarding request forwarder := &getRequestForwarder{
onceResign.Do(func() { OnceResign: &sync.Once{},
// compose meta header of the local server OnceHeaderSending: &sync.Once{},
metaHdr := new(session.RequestMetaHeader) GlobalProgress: 0,
metaHdr.SetTTL(meta.GetTTL() - 1) Key: key,
// TODO: #1165 think how to set the other fields Request: req,
metaHdr.SetOrigin(meta) Stream: streamWrapper,
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
} }
// code below is copy-pasted from c.GetObject implementation, p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
// perhaps it is worth highlighting the utility function in frostfs-api-go
// open stream
var getStream *rpc.GetResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return nil, io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return nil, fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return nil, errWrongMessageSeq
}
headWas = true
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
onceHeaderSending.Do(func() {
err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj))
})
if err != nil {
return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return nil, errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
return nil, nil
}))
} }
return p, nil return p, nil
} }
// nolint: funlen, gocognit
func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) { func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) {
body := req.GetBody() body := req.GetBody()
@ -216,8 +91,6 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
return nil, fmt.Errorf("invalid object address: %w", err) return nil, fmt.Errorf("invalid object address: %w", err)
} }
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req) commonPrm, err := util.CommonPrmFromV2(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -239,104 +112,20 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
} }
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once
var globalProgress int
key, err := s.keyStorage.GetKey(nil) key, err := s.keyStorage.GetKey(nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { forwarder := &getRangeRequestForwarder{
var err error OnceResign: &sync.Once{},
GlobalProgress: 0,
// once compose and resign forwarding request Key: key,
onceResign.Do(func() { Request: req,
// compose meta header of the local server Stream: streamWrapper,
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
} }
// code below is copy-pasted from c.ObjectPayloadRangeData implementation, p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
// perhaps it is worth highlighting the utility function in frostfs-api-go
// open stream
var rangeStream *rpc.ObjectRangeResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
}
resp := new(objectV2.GetRangeResponse)
var localProgress int
for {
// receive message from server stream
err := rangeStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
internalclient.ReportError(c, err)
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return nil, fmt.Errorf("unexpected range type %T", v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
return nil, nil
}))
} }
return p, nil return p, nil
@ -426,7 +215,6 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
return nil return nil
} }
// nolint: funlen
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody() body := req.GetBody()
@ -442,8 +230,6 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
return nil, fmt.Errorf("invalid object address: %w", err) return nil, fmt.Errorf("invalid object address: %w", err)
} }
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req) commonPrm, err := util.CommonPrmFromV2(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -463,135 +249,20 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
return p, nil return p, nil
} }
var onceResign sync.Once
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil) key, err := s.keyStorage.GetKey(nil)
fyrchik marked this conversation as resolved Outdated

@fyrchik @carpawell Is it an error that key is requested at every head and get request? For get range request the key is requested once.
Fixed it in the last commit.

@fyrchik @carpawell Is it an error that ```key``` is requested at every head and get request? For get range request the key is requested once. Fixed it in the last commit.

Not an error, it should return the same key each time used.

Not an error, it should return the same key each time used.

Thx, then it can be requested once.

Thx, then it can be requested once.
if err != nil { if err != nil {
return nil, err return nil, err
} }
// once compose and resign forwarding request forwarder := &headRequestForwarder{
onceResign.Do(func() { Request: req,
// compose meta header of the local server Response: resp,
metaHdr := new(session.RequestMetaHeader) OnceResign: &sync.Once{},
metaHdr.SetTTL(meta.GetTTL() - 1) ObjectAddr: objAddr,
// TODO: #1165 think how to set the other fields Key: key,
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
} }
// code below is copy-pasted from c.GetObjectHeader implementation, p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
// perhaps it is worth highlighting the utility function in frostfs-api-go
// send Head request
var headResp *objectV2.HeadResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if !body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
h := v
hdr = new(objectV2.Header)
hdr.SetPayloadLength(h.GetPayloadLength())
hdr.SetVersion(h.GetVersion())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetPayloadHash(h.GetPayloadHash())
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
case *objectV2.HeaderWithSignature:
if body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
hdrWithSig := v
if hdrWithSig == nil {
return nil, errors.New("nil object part")
}
hdr = hdrWithSig.GetHeader()
idSig = hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, errors.New("missing signature")
}
binID, err := objAddr.Object().Marshal()
if err != nil {
return nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, errors.New("invalid object ID signature")
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(objAddr.Object())
// convert the object
return obj, nil
}))
return p, nil return p, nil
} }
@ -659,8 +330,8 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
return sh return sh
} }
func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder { func groupAddressRequestForwarder(f func(context.Context, network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
return func(info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) { return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
var ( var (
firstErr error firstErr error
res *object.Object res *object.Object
@ -681,7 +352,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli
// would be nice to log otherwise // would be nice to log otherwise
}() }()
res, err = f(addr, c, key) res, err = f(ctx, addr, c, key)
return return
}) })