Refactor get service #193

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:refactoring/OBJECT-3610_getsvc into master 2023-04-05 14:38:49 +00:00
14 changed files with 623 additions and 477 deletions

View file

@ -10,7 +10,7 @@ import (
"go.uber.org/zap"
)
func (exec *execCtx) assemble() {
func (exec *execCtx) assemble(ctx context.Context) {

Can we use more meaningful commit message for this? Like a sentence you use in the body.

Can we use more meaningful commit message for this? Like a sentence you use in the body.

And same for the last commit, we have 2 commits with identical message doing pretty different things.

And same for the last commit, we have 2 commits with identical message doing pretty different things.

Fixed

Fixed
if !exec.canAssemble() {
exec.log.Debug("can not assemble the object")
return
@ -49,7 +49,7 @@ func (exec *execCtx) assemble() {
zap.Uint64("range_length", exec.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter)
obj, err := assembler.Assemble(ctx, exec.prm.objWriter)
if err != nil {
exec.log.Warn("failed to assemble splitted object",
zap.Error(err),
@ -107,8 +107,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
//nolint: contextcheck
err := exec.svc.Head(exec.context(), prm)
err := exec.svc.Head(ctx, prm)
if err != nil {
return nil, err
@ -128,8 +127,7 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
//nolint: contextcheck
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng))
if statusError.err != nil {
return nil, statusError.err

View file

@ -7,7 +7,7 @@ import (
"go.uber.org/zap"
)
func (exec *execCtx) executeOnContainer() {
func (exec *execCtx) executeOnContainer(ctx context.Context) {
if exec.isLocal() {
exec.log.Debug("return result directly")
return
@ -26,7 +26,7 @@ func (exec *execCtx) executeOnContainer() {
}
for {
if exec.processCurrentEpoch() {
if exec.processCurrentEpoch(ctx) {
break
}
@ -42,7 +42,7 @@ func (exec *execCtx) executeOnContainer() {
}
}
func (exec *execCtx) processCurrentEpoch() bool {
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
)
@ -52,7 +52,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
return true
}
ctx, cancel := context.WithCancel(exec.context())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
exec.status = statusUndefined

View file

@ -19,12 +19,9 @@ type statusError struct {
err error
}
// nolint: containedctx
type execCtx struct {
svc *Service
ctx context.Context
prm RangePrm
statusError
@ -80,10 +77,6 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
)}
}
func (exec execCtx) context() context.Context {
return exec.ctx
fyrchik marked this conversation as resolved Outdated

Do we still need the field in the struct?

Do we still need the field in the struct?

No, we dont. It's dropped.

No, we dont. It's dropped.
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
@ -217,13 +210,13 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
}
}
func (exec *execCtx) writeCollectedHeader() bool {
func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
if exec.ctxRange() != nil {
return true
}
err := exec.prm.objWriter.WriteHeader(
exec.context(),
ctx,
exec.collectedObject.CutPayload(),
)
@ -243,12 +236,12 @@ func (exec *execCtx) writeCollectedHeader() bool {
return exec.status == statusOK
}
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if exec.headOnly() {
return true
}
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload())
err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch {
default:
@ -266,9 +259,9 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
return err == nil
}
func (exec *execCtx) writeCollectedObject() {
if ok := exec.writeCollectedHeader(); ok {
exec.writeObjectPayload(exec.collectedObject)
func (exec *execCtx) writeCollectedObject(ctx context.Context) {
if ok := exec.writeCollectedHeader(ctx); ok {
exec.writeObjectPayload(ctx, exec.collectedObject)
}
}

View file

@ -65,7 +65,6 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
exec := &execCtx{
svc: s,
ctx: ctx,
prm: RangePrm{
commonPrm: prm,
},
@ -78,22 +77,21 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
exec.setLogger(s.log)
//nolint: contextcheck
exec.execute()
exec.execute(ctx)
return exec.statusError
}
func (exec *execCtx) execute() {
func (exec *execCtx) execute(ctx context.Context) {
exec.log.Debug("serving request...")
// perform local operation
exec.executeLocal()
exec.executeLocal(ctx)
exec.analyzeStatus(true)
exec.analyzeStatus(ctx, true)
}
func (exec *execCtx) analyzeStatus(execCnr bool) {
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result
switch exec.status {
case statusOK:
@ -102,7 +100,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
exec.log.Debug("requested object was marked as removed")
case statusVIRTUAL:
exec.log.Debug("requested object is virtual")
exec.assemble()
exec.assemble(ctx)
case statusOutOfRange:
exec.log.Debug("requested range is out of object bounds")
default:
@ -111,8 +109,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
)
if execCnr {
exec.executeOnContainer()
exec.analyzeStatus(false)
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
}
}

View file

@ -117,7 +117,7 @@ func newTestClient() *testClient {
}
}
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
v, ok := c.results[exec.address().EncodeToString()]
if !ok {
var errNotFound apistatus.ObjectNotFound

View file

@ -1,6 +1,7 @@
package getsvc
import (
"context"
"errors"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -8,7 +9,7 @@ import (
"go.uber.org/zap"
)
func (exec *execCtx) executeLocal() {
func (exec *execCtx) executeLocal(ctx context.Context) {
var err error
exec.collectedObject, err = exec.svc.localStorage.get(exec)
@ -28,7 +29,7 @@ func (exec *execCtx) executeLocal() {
case err == nil:
exec.status = statusOK
exec.err = nil
exec.writeCollectedObject()
exec.writeCollectedObject(ctx)
case errors.As(err, &errRemoved):
exec.status = statusINHUMED
exec.err = errRemoved

View file

@ -59,7 +59,7 @@ type RangeHashPrm struct {
salt []byte
}
type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
// HeadPrm groups parameters of Head service call.
type HeadPrm struct {

View file

@ -18,7 +18,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
return true
}
obj, err := client.getObject(exec, info)
obj, err := client.getObject(ctx, exec, info)
var errSplitInfo *objectSDK.SplitInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
@ -43,8 +43,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
// has already been streamed to the requesting party
if obj != nil {
exec.collectedObject = obj
//nolint: contextcheck
exec.writeCollectedObject()
exec.writeCollectedObject(ctx)
}
case errors.As(err, &errRemoved):
exec.status = statusINHUMED

View file

@ -1,6 +1,8 @@
package getsvc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
@ -22,7 +24,7 @@ type Service struct {
type Option func(*cfg)
type getClient interface {
getObject(*execCtx, client.NodeInfo) (*object.Object, error)
getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
}
type cfg struct {

View file

@ -87,10 +87,9 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
}, nil
}
// nolint: funlen
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client)
return exec.prm.forwarder(ctx, info, c.client)
}
key, err := exec.key()
@ -99,35 +98,23 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
}
if exec.headOnly() {
var prm internalclient.HeadObjectPrm
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.HeadObject(prm)
if err != nil {
return nil, err
}
return res.Header(), nil
return c.getHeadOnly(ctx, exec, key)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return c.getRange(ctx, exec, key, rng)
}
return c.get(ctx, exec, key)
}
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
var prm internalclient.PayloadRangePrm
prm.SetContext(exec.context())
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
@ -146,9 +133,7 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
// Current spec allows other storage node to deny access,
// fallback to GET here.
obj, err := c.get(exec, key)
obj, err := c.get(ctx, exec, key)
if err != nil {
return nil, err
}
@ -169,13 +154,35 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
return payloadOnlyObject(res.PayloadRange()), nil
}
return c.get(exec, key)
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.HeadObjectPrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
res, err := internalclient.HeadObject(prm)
if err != nil {
return nil, err
}
return res.Header(), nil
}
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm
prm.SetContext(exec.context())
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)

View file

@ -0,0 +1,168 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type getRequestForwarder struct {
OnceResign *sync.Once
OnceHeaderSending *sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRequest
Stream *streamObjectWriter
}
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {

so not used second return value?

so not used second return value?

forwardRequestToNode is used as an argument for func groupAddressRequestForwarder(...) function. groupAddressRequestForwarder requires two values to be returned. It is possible to use a lambda function, but I don't like it.

Do you think that with a lambda and one return value it will be easier to understand?

```forwardRequestToNode``` is used as an argument for ```func groupAddressRequestForwarder(...)``` function. ```groupAddressRequestForwarder``` requires two values to be returned. It is possible to use a lambda function, but I don't like it. Do you think that with a lambda and one return value it will be easier to understand?

Do you think that with a lambda and one return value it will be easier to understand?

no, do not like lambdas usually as more complex thing for a reader.

is used as an argument for func groupAddressRequestForwarder(...) function

originally, comment was about that: seems like anything that is ...forwarder... should not return objects. if groupAddressRequestForwarder is used somewhere else AND that is not a complete refactor and we plan to reorganize it later, im ok with that

> Do you think that with a lambda and one return value it will be easier to understand? no, do not like lambdas usually as more complex thing for a reader. > is used as an argument for func groupAddressRequestForwarder(...) function originally, comment was about that: seems like anything that is `...forwarder...` should not return objects. if `groupAddressRequestForwarder` is used somewhere else AND that is not a complete refactor and we plan to reorganize it later, im ok with that

HEAD request requires return result from forwarder

HEAD request requires return result from forwarder
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
getStream, err := f.openStream(ctx, addr, c)
if err != nil {
return nil, err
}
return nil, f.readStream(ctx, c, getStream, pubkey)
}
func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey []byte) error {
// verify response key
carpawell marked this conversation as resolved Outdated

can we just return f.readStream err if no details are added? (personally, i would add some details via fmt.Errorf)

related to more than just that line

can we just return `f.readStream` err if no details are added? (personally, i would add some details via `fmt.Errorf`) related to more than just that line

Fixed.

Fixed.
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}
func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetObjectPartInit) error {
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
var err error
f.OnceHeaderSending.Do(func() {
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
})
if err != nil {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
return nil
}
func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.GetResponseReader, error) {
var getStream *rpc.GetResponseReader
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
getStream, e = rpc.GetObject(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
return getStream, nil
}
func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddressClient, getStream *rpc.GetResponseReader, pubkey []byte) error {
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
return err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return errWrongMessageSeq
}
headWas = true
if err := f.writeHeader(ctx, v); err != nil {
return err
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
f.GlobalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return object.NewSplitInfoError(si)
}
}
return nil
}

View file

@ -0,0 +1,134 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type getRangeRequestForwarder struct {
OnceResign *sync.Once
GlobalProgress int
Key *ecdsa.PrivateKey
Request *objectV2.GetRangeRequest
Stream *streamObjectRangeWriter
}
func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
rangeStream, err := f.openStream(ctx, addr, c)
if err != nil {
return nil, err
}
return nil, f.readStream(ctx, rangeStream, c, pubkey)
}
func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("could not verify %T: %w", resp, err)
}
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}
func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.ObjectRangeResponseReader, error) {
// open stream
var rangeStream *rpc.ObjectRangeResponseReader
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
rangeStream, e = rpc.GetObjectRange(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
}
return rangeStream, nil
}
func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *rpc.ObjectRangeResponseReader, c client.MultiAddressClient, pubkey []byte) error {
resp := new(objectV2.GetRangeResponse)
var localProgress int
for {
// receive message from server stream
err := rangeStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
return err
}
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return fmt.Errorf("unexpected range type %T", v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
localProgress += len(origChunk)
f.GlobalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return object.NewSplitInfoError(si)
}
}
return nil
}

View file

@ -0,0 +1,175 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type headRequestForwarder struct {
Request *objectV2.HeadRequest
Response *objectV2.HeadResponse
OnceResign *sync.Once
ObjectAddr oid.Address
Key *ecdsa.PrivateKey
}
func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
f.OnceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(f.Request.GetMetaHeader())
writeCurrentVersion(metaHdr)
f.Request.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(f.Key, f.Request)
})
if err != nil {
return nil, err
}
headResp, err := f.sendHeadRequest(ctx, addr, c)
if err != nil {
return nil, err
}
if err := f.verifyResponse(headResp, pubkey); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
return nil, err
}
case *objectV2.HeaderWithSignature:
if hdr, idSig, err = f.getHeaderAndSignature(v); err != nil {
return nil, err
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(f.ObjectAddr.Object())
return obj, nil
}
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
if !f.Request.GetBody().GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
hdr := new(objectV2.Header)
hdr.SetPayloadLength(sh.GetPayloadLength())
hdr.SetVersion(sh.GetVersion())
hdr.SetOwnerID(sh.GetOwnerID())
hdr.SetObjectType(sh.GetObjectType())
hdr.SetCreationEpoch(sh.GetCreationEpoch())
hdr.SetPayloadHash(sh.GetPayloadHash())
hdr.SetHomomorphicHash(sh.GetHomomorphicHash())
return hdr, nil
}
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
if f.Request.GetBody().GetMainOnly() {
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
if hdrWithSig == nil {
return nil, nil, errors.New("nil object part")
}
hdr := hdrWithSig.GetHeader()
idSig := hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, nil, errors.New("missing signature")
}
binID, err := f.ObjectAddr.Object().Marshal()
if err != nil {
return nil, nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, nil, errors.New("invalid object ID signature")
}
return hdr, idSig, nil
}
func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*objectV2.HeadResponse, error) {
var headResp *objectV2.HeadResponse
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
var e error
headResp, e = rpc.HeadObject(cli, f.Request, rpcclient.WithContext(ctx))
return e
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
return headResp, nil
}
func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, pubkey []byte) error {
// verify response key
if err := internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
if err := checkStatus(f.Response.GetMetaHeader().GetStatus()); err != nil {
return err
}
return nil
}

View file

@ -6,25 +6,18 @@ import (
"errors"
"fmt"
"hash"
"io"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
@ -33,7 +26,6 @@ import (
var errWrongMessageSeq = errors.New("incorrect message sequence")
// nolint: funlen, gocognit
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
body := req.GetBody()
@ -49,8 +41,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
return nil, fmt.Errorf("invalid object address: %w", err)
}
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
@ -66,141 +56,26 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
p.SetObjectWriter(streamWrapper)
if !commonPrm.LocalOnly() {
var onceResign sync.Once
var onceHeaderSending sync.Once
var globalProgress int
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil)
if err != nil {

Is there any reason you don't just use non-pointer field in struct and omit the initialization?

Is there any reason you don't just use non-pointer field in struct and omit the initialization?

This is the easiest way to meet this requirement: Values containing the types defined in this package should not be copied. (https://pkg.go.dev/sync)

This is the easiest way to meet this requirement: *Values containing the types defined in this package should not be copied.* (https://pkg.go.dev/sync)

but forwarder is already a pointer? i am ok with that, just want to come to some approach that will be commonly adopted in that repo (the other code do not use pointer inside a pointer i guess)

but `forwarder` is already a pointer? i am ok with that, just want to come to some approach that will be commonly adopted in that repo (the other code do not use pointer inside a pointer i guess)

the other code do not use pointer inside a pointer i guess

But you should always think how holder of sync.Mutex (RWMutex, Once...) will be used.

> the other code do not use pointer inside a pointer i guess But you should always think how holder of ```sync.Mutex (RWMutex, Once...)``` will be used.
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
forwarder := &getRequestForwarder{
OnceResign: &sync.Once{},
OnceHeaderSending: &sync.Once{},
GlobalProgress: 0,
Key: key,
Request: req,
Stream: streamWrapper,
}
// code below is copy-pasted from c.GetObject implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// open stream
var getStream *rpc.GetResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
}
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
// receive message from server stream
err := getStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
if !headWas {
return nil, io.ErrUnexpectedEOF
}
break
}
internalclient.ReportError(c, err)
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return nil, fmt.Errorf("unexpected object part %T", v)
case *objectV2.GetObjectPartInit:
if headWas {
return nil, errWrongMessageSeq
}
headWas = true
obj := new(objectV2.Object)
obj.SetObjectID(v.GetObjectID())
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
onceHeaderSending.Do(func() {
err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj))
})
if err != nil {
return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
case *objectV2.GetObjectPartChunk:
if !headWas {
return nil, errWrongMessageSeq
}
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
return nil, nil
}))
p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
}
return p, nil
}
// nolint: funlen, gocognit
func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) {
body := req.GetBody()
@ -216,8 +91,6 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
return nil, fmt.Errorf("invalid object address: %w", err)
}
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
@ -239,104 +112,20 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
}
if !commonPrm.LocalOnly() {
var onceResign sync.Once
var globalProgress int
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
forwarder := &getRangeRequestForwarder{
OnceResign: &sync.Once{},
GlobalProgress: 0,
Key: key,
Request: req,
Stream: streamWrapper,
}
// code below is copy-pasted from c.ObjectPayloadRangeData implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// open stream
var rangeStream *rpc.ObjectRangeResponseReader
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context()))
return err
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
}
resp := new(objectV2.GetRangeResponse)
var localProgress int
for {
// receive message from server stream
err := rangeStream.Read(resp)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
internalclient.ReportError(c, err)
return nil, fmt.Errorf("reading the response failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return nil, fmt.Errorf("unexpected range type %T", v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
}
return nil, nil
}))
p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
}
return p, nil
@ -426,7 +215,6 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
return nil
}
// nolint: funlen
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody()
@ -442,8 +230,6 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
return nil, fmt.Errorf("invalid object address: %w", err)
}
meta := req.GetMetaHeader()
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
@ -463,135 +249,20 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
return p, nil
}
var onceResign sync.Once
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil)
fyrchik marked this conversation as resolved Outdated

@fyrchik @carpawell Is it an error that key is requested at every head and get request? For get range request the key is requested once.
Fixed it in the last commit.

@fyrchik @carpawell Is it an error that ```key``` is requested at every head and get request? For get range request the key is requested once. Fixed it in the last commit.

Not an error, it should return the same key each time used.

Not an error, it should return the same key each time used.

Thx, then it can be requested once.

Thx, then it can be requested once.
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
forwarder := &headRequestForwarder{
Request: req,
Response: resp,
OnceResign: &sync.Once{},
ObjectAddr: objAddr,
Key: key,
}
// code below is copy-pasted from c.GetObjectHeader implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// send Head request
var headResp *objectV2.HeadResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if !body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
h := v
hdr = new(objectV2.Header)
hdr.SetPayloadLength(h.GetPayloadLength())
hdr.SetVersion(h.GetVersion())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetPayloadHash(h.GetPayloadHash())
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
case *objectV2.HeaderWithSignature:
if body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
hdrWithSig := v
if hdrWithSig == nil {
return nil, errors.New("nil object part")
}
hdr = hdrWithSig.GetHeader()
idSig = hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, errors.New("missing signature")
}
binID, err := objAddr.Object().Marshal()
if err != nil {
return nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, errors.New("invalid object ID signature")
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(objAddr.Object())
// convert the object
return obj, nil
}))
p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
return p, nil
}
@ -659,8 +330,8 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
return sh
}
func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
return func(info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
func groupAddressRequestForwarder(f func(context.Context, network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
var (
firstErr error
res *object.Object
@ -681,7 +352,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli
// would be nice to log otherwise
}()
res, err = f(addr, c, key)
res, err = f(ctx, addr, c, key)
return
})