Refactor get service #193
|
@ -10,7 +10,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) assemble() {
|
||||
func (exec *execCtx) assemble(ctx context.Context) {
|
||||
|
||||
if !exec.canAssemble() {
|
||||
exec.log.Debug("can not assemble the object")
|
||||
return
|
||||
|
@ -49,7 +49,7 @@ func (exec *execCtx) assemble() {
|
|||
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
||||
)
|
||||
|
||||
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter)
|
||||
obj, err := assembler.Assemble(ctx, exec.prm.objWriter)
|
||||
if err != nil {
|
||||
exec.log.Warn("failed to assemble splitted object",
|
||||
zap.Error(err),
|
||||
|
@ -107,8 +107,7 @@ func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Obje
|
|||
|
||||
w := NewSimpleObjectWriter()
|
||||
prm.SetHeaderWriter(w)
|
||||
//nolint: contextcheck
|
||||
err := exec.svc.Head(exec.context(), prm)
|
||||
err := exec.svc.Head(ctx, prm)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -128,8 +127,7 @@ func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Ra
|
|||
p.addr.SetContainer(exec.containerID())
|
||||
p.addr.SetObject(id)
|
||||
|
||||
//nolint: contextcheck
|
||||
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
|
||||
statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng))
|
||||
|
||||
if statusError.err != nil {
|
||||
return nil, statusError.err
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) executeOnContainer() {
|
||||
func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
||||
if exec.isLocal() {
|
||||
exec.log.Debug("return result directly")
|
||||
return
|
||||
|
@ -26,7 +26,7 @@ func (exec *execCtx) executeOnContainer() {
|
|||
}
|
||||
|
||||
for {
|
||||
if exec.processCurrentEpoch() {
|
||||
if exec.processCurrentEpoch(ctx) {
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,7 @@ func (exec *execCtx) executeOnContainer() {
|
|||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) processCurrentEpoch() bool {
|
||||
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||
exec.log.Debug("process epoch",
|
||||
zap.Uint64("number", exec.curProcEpoch),
|
||||
)
|
||||
|
@ -52,7 +52,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(exec.context())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
exec.status = statusUndefined
|
||||
|
|
|
@ -19,12 +19,9 @@ type statusError struct {
|
|||
err error
|
||||
}
|
||||
|
||||
// nolint: containedctx
|
||||
type execCtx struct {
|
||||
svc *Service
|
||||
|
||||
ctx context.Context
|
||||
|
||||
prm RangePrm
|
||||
|
||||
statusError
|
||||
|
@ -80,10 +77,6 @@ func (exec *execCtx) setLogger(l *logger.Logger) {
|
|||
)}
|
||||
}
|
||||
|
||||
func (exec execCtx) context() context.Context {
|
||||
return exec.ctx
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do we still need the field in the struct? Do we still need the field in the struct?
dstepanov-yadro
commented
No, we dont. It's dropped. No, we dont. It's dropped.
|
||||
}
|
||||
|
||||
func (exec execCtx) isLocal() bool {
|
||||
return exec.prm.common.LocalOnly()
|
||||
}
|
||||
|
@ -217,13 +210,13 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) writeCollectedHeader() bool {
|
||||
func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
|
||||
if exec.ctxRange() != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
err := exec.prm.objWriter.WriteHeader(
|
||||
exec.context(),
|
||||
ctx,
|
||||
exec.collectedObject.CutPayload(),
|
||||
)
|
||||
|
||||
|
@ -243,12 +236,12 @@ func (exec *execCtx) writeCollectedHeader() bool {
|
|||
return exec.status == statusOK
|
||||
}
|
||||
|
||||
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
|
||||
func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
|
||||
if exec.headOnly() {
|
||||
return true
|
||||
}
|
||||
|
||||
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload())
|
||||
err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
|
||||
|
||||
switch {
|
||||
default:
|
||||
|
@ -266,9 +259,9 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
|
|||
return err == nil
|
||||
}
|
||||
|
||||
func (exec *execCtx) writeCollectedObject() {
|
||||
if ok := exec.writeCollectedHeader(); ok {
|
||||
exec.writeObjectPayload(exec.collectedObject)
|
||||
func (exec *execCtx) writeCollectedObject(ctx context.Context) {
|
||||
if ok := exec.writeCollectedHeader(ctx); ok {
|
||||
exec.writeObjectPayload(ctx, exec.collectedObject)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -65,7 +65,6 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
|
|||
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
|
||||
exec := &execCtx{
|
||||
svc: s,
|
||||
ctx: ctx,
|
||||
prm: RangePrm{
|
||||
commonPrm: prm,
|
||||
},
|
||||
|
@ -78,22 +77,21 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
|
|||
|
||||
exec.setLogger(s.log)
|
||||
|
||||
//nolint: contextcheck
|
||||
exec.execute()
|
||||
exec.execute(ctx)
|
||||
|
||||
return exec.statusError
|
||||
}
|
||||
|
||||
func (exec *execCtx) execute() {
|
||||
func (exec *execCtx) execute(ctx context.Context) {
|
||||
exec.log.Debug("serving request...")
|
||||
|
||||
// perform local operation
|
||||
exec.executeLocal()
|
||||
exec.executeLocal(ctx)
|
||||
|
||||
exec.analyzeStatus(true)
|
||||
exec.analyzeStatus(ctx, true)
|
||||
}
|
||||
|
||||
func (exec *execCtx) analyzeStatus(execCnr bool) {
|
||||
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||
// analyze local result
|
||||
switch exec.status {
|
||||
case statusOK:
|
||||
|
@ -102,7 +100,7 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
|
|||
exec.log.Debug("requested object was marked as removed")
|
||||
case statusVIRTUAL:
|
||||
exec.log.Debug("requested object is virtual")
|
||||
exec.assemble()
|
||||
exec.assemble(ctx)
|
||||
case statusOutOfRange:
|
||||
exec.log.Debug("requested range is out of object bounds")
|
||||
default:
|
||||
|
@ -111,8 +109,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
|
|||
)
|
||||
|
||||
if execCnr {
|
||||
exec.executeOnContainer()
|
||||
exec.analyzeStatus(false)
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ func newTestClient() *testClient {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||
func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||
v, ok := c.results[exec.address().EncodeToString()]
|
||||
if !ok {
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -8,7 +9,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) executeLocal() {
|
||||
func (exec *execCtx) executeLocal(ctx context.Context) {
|
||||
var err error
|
||||
|
||||
exec.collectedObject, err = exec.svc.localStorage.get(exec)
|
||||
|
@ -28,7 +29,7 @@ func (exec *execCtx) executeLocal() {
|
|||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
exec.writeCollectedObject()
|
||||
exec.writeCollectedObject(ctx)
|
||||
case errors.As(err, &errRemoved):
|
||||
exec.status = statusINHUMED
|
||||
exec.err = errRemoved
|
||||
|
|
|
@ -59,7 +59,7 @@ type RangeHashPrm struct {
|
|||
salt []byte
|
||||
}
|
||||
|
||||
type RequestForwarder func(coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
|
||||
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
|
||||
|
||||
// HeadPrm groups parameters of Head service call.
|
||||
type HeadPrm struct {
|
||||
|
|
|
@ -18,7 +18,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
|
|||
return true
|
||||
}
|
||||
|
||||
obj, err := client.getObject(exec, info)
|
||||
obj, err := client.getObject(ctx, exec, info)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
|
@ -43,8 +43,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
|
|||
// has already been streamed to the requesting party
|
||||
if obj != nil {
|
||||
exec.collectedObject = obj
|
||||
//nolint: contextcheck
|
||||
exec.writeCollectedObject()
|
||||
exec.writeCollectedObject(ctx)
|
||||
}
|
||||
case errors.As(err, &errRemoved):
|
||||
exec.status = statusINHUMED
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
|
@ -22,7 +24,7 @@ type Service struct {
|
|||
type Option func(*cfg)
|
||||
|
||||
type getClient interface {
|
||||
getObject(*execCtx, client.NodeInfo) (*object.Object, error)
|
||||
getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
|
|
@ -87,10 +87,9 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// nolint: funlen
|
||||
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
||||
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
||||
if exec.isForwardingEnabled() {
|
||||
return exec.prm.forwarder(info, c.client)
|
||||
return exec.prm.forwarder(ctx, info, c.client)
|
||||
}
|
||||
|
||||
key, err := exec.key()
|
||||
|
@ -99,9 +98,66 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
|
|||
}
|
||||
|
||||
if exec.headOnly() {
|
||||
return c.getHeadOnly(ctx, exec, key)
|
||||
}
|
||||
// we don't specify payload writer because we accumulate
|
||||
// the object locally (even huge).
|
||||
if rng := exec.ctxRange(); rng != nil {
|
||||
// Current spec allows other storage node to deny access,
|
||||
// fallback to GET here.
|
||||
return c.getRange(ctx, exec, key, rng)
|
||||
}
|
||||
|
||||
return c.get(ctx, exec, key)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
|
||||
var prm internalclient.PayloadRangePrm
|
||||
|
||||
prm.SetContext(ctx)
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
prm.SetRange(rng)
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.PayloadRange(prm)
|
||||
if err != nil {
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
if errors.As(err, &errAccessDenied) {
|
||||
obj, err := c.get(ctx, exec, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
from := rng.GetOffset()
|
||||
to := from + rng.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return nil, new(apistatus.ObjectOutOfRange)
|
||||
}
|
||||
|
||||
return payloadOnlyObject(payload[from:to]), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return payloadOnlyObject(res.PayloadRange()), nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.HeadObjectPrm
|
||||
|
||||
prm.SetContext(exec.context())
|
||||
prm.SetContext(ctx)
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
|
@ -121,61 +177,12 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
|
|||
}
|
||||
|
||||
return res.Header(), nil
|
||||
}
|
||||
// we don't specify payload writer because we accumulate
|
||||
// the object locally (even huge).
|
||||
if rng := exec.ctxRange(); rng != nil {
|
||||
var prm internalclient.PayloadRangePrm
|
||||
|
||||
prm.SetContext(exec.context())
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
prm.SetRange(rng)
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.PayloadRange(prm)
|
||||
if err != nil {
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
if errors.As(err, &errAccessDenied) {
|
||||
// Current spec allows other storage node to deny access,
|
||||
// fallback to GET here.
|
||||
obj, err := c.get(exec, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
from := rng.GetOffset()
|
||||
to := from + rng.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return nil, new(apistatus.ObjectOutOfRange)
|
||||
}
|
||||
|
||||
return payloadOnlyObject(payload[from:to]), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return payloadOnlyObject(res.PayloadRange()), nil
|
||||
}
|
||||
|
||||
return c.get(exec, key)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.GetObjectPrm
|
||||
|
||||
prm.SetContext(exec.context())
|
||||
prm.SetContext(ctx)
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
|
|
168
pkg/services/object/get/v2/get_forwarder.go
Normal file
|
@ -0,0 +1,168 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
type getRequestForwarder struct {
|
||||
OnceResign *sync.Once
|
||||
OnceHeaderSending *sync.Once
|
||||
GlobalProgress int
|
||||
Key *ecdsa.PrivateKey
|
||||
Request *objectV2.GetRequest
|
||||
Stream *streamObjectWriter
|
||||
}
|
||||
|
||||
func (f *getRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
carpawell
commented
so not used second return value? so not used second return value?
dstepanov-yadro
commented
Do you think that with a lambda and one return value it will be easier to understand? ```forwardRequestToNode``` is used as an argument for ```func groupAddressRequestForwarder(...)``` function. ```groupAddressRequestForwarder``` requires two values to be returned. It is possible to use a lambda function, but I don't like it.
Do you think that with a lambda and one return value it will be easier to understand?
carpawell
commented
no, do not like lambdas usually as more complex thing for a reader.
originally, comment was about that: seems like anything that is > Do you think that with a lambda and one return value it will be easier to understand?
no, do not like lambdas usually as more complex thing for a reader.
> is used as an argument for func groupAddressRequestForwarder(...) function
originally, comment was about that: seems like anything that is `...forwarder...` should not return objects. if `groupAddressRequestForwarder` is used somewhere else AND that is not a complete refactor and we plan to reorganize it later, im ok with that
dstepanov-yadro
commented
HEAD request requires return result from forwarder HEAD request requires return result from forwarder
|
||||
var err error
|
||||
|
||||
// once compose and resign forwarding request
|
||||
f.OnceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(f.Request.GetMetaHeader())
|
||||
writeCurrentVersion(metaHdr)
|
||||
f.Request.SetMetaHeader(metaHdr)
|
||||
err = signature.SignServiceMessage(f.Key, f.Request)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
getStream, err := f.openStream(ctx, addr, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, f.readStream(ctx, c, getStream, pubkey)
|
||||
}
|
||||
|
||||
func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey []byte) error {
|
||||
// verify response key
|
||||
carpawell marked this conversation as resolved
Outdated
carpawell
commented
can we just return related to more than just that line can we just return `f.readStream` err if no details are added? (personally, i would add some details via `fmt.Errorf`)
related to more than just that line
dstepanov-yadro
commented
Fixed. Fixed.
|
||||
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetObjectPartInit) error {
|
||||
obj := new(objectV2.Object)
|
||||
|
||||
obj.SetObjectID(v.GetObjectID())
|
||||
obj.SetSignature(v.GetSignature())
|
||||
obj.SetHeader(v.GetHeader())
|
||||
|
||||
var err error
|
||||
f.OnceHeaderSending.Do(func() {
|
||||
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.GetResponseReader, error) {
|
||||
var getStream *rpc.GetResponseReader
|
||||
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
var e error
|
||||
getStream, e = rpc.GetObject(cli, f.Request, rpcclient.WithContext(ctx))
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stream opening failed: %w", err)
|
||||
}
|
||||
return getStream, nil
|
||||
}
|
||||
|
||||
func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddressClient, getStream *rpc.GetResponseReader, pubkey []byte) error {
|
||||
var (
|
||||
headWas bool
|
||||
resp = new(objectV2.GetResponse)
|
||||
localProgress int
|
||||
)
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := getStream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !headWas {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
internalclient.ReportError(c, err)
|
||||
return fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
if err := f.verifyResponse(resp, pubkey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||
default:
|
||||
return fmt.Errorf("unexpected object part %T", v)
|
||||
case *objectV2.GetObjectPartInit:
|
||||
if headWas {
|
||||
return errWrongMessageSeq
|
||||
}
|
||||
headWas = true
|
||||
if err := f.writeHeader(ctx, v); err != nil {
|
||||
return err
|
||||
}
|
||||
case *objectV2.GetObjectPartChunk:
|
||||
if !headWas {
|
||||
return errWrongMessageSeq
|
||||
}
|
||||
|
||||
origChunk := v.GetChunk()
|
||||
|
||||
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
|
||||
if len(chunk) == 0 {
|
||||
localProgress += len(origChunk)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
||||
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
|
||||
}
|
||||
|
||||
localProgress += len(origChunk)
|
||||
f.GlobalProgress += len(chunk)
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
return object.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
134
pkg/services/object/get/v2/get_range_forwarder.go
Normal file
|
@ -0,0 +1,134 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
type getRangeRequestForwarder struct {
|
||||
OnceResign *sync.Once
|
||||
GlobalProgress int
|
||||
Key *ecdsa.PrivateKey
|
||||
Request *objectV2.GetRangeRequest
|
||||
Stream *streamObjectRangeWriter
|
||||
}
|
||||
|
||||
func (f *getRangeRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
var err error
|
||||
|
||||
// once compose and resign forwarding request
|
||||
f.OnceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(f.Request.GetMetaHeader())
|
||||
writeCurrentVersion(metaHdr)
|
||||
|
||||
f.Request.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(f.Key, f.Request)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rangeStream, err := f.openStream(ctx, addr, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, f.readStream(ctx, rangeStream, c, pubkey)
|
||||
}
|
||||
|
||||
func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeResponse, pubkey []byte) error {
|
||||
// verify response key
|
||||
if err := internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
}
|
||||
|
||||
if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*rpc.ObjectRangeResponseReader, error) {
|
||||
// open stream
|
||||
var rangeStream *rpc.ObjectRangeResponseReader
|
||||
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
var e error
|
||||
rangeStream, e = rpc.GetObjectRange(cli, f.Request, rpcclient.WithContext(ctx))
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
|
||||
}
|
||||
return rangeStream, nil
|
||||
}
|
||||
|
||||
func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *rpc.ObjectRangeResponseReader, c client.MultiAddressClient, pubkey []byte) error {
|
||||
resp := new(objectV2.GetRangeResponse)
|
||||
var localProgress int
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := rangeStream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
internalclient.ReportError(c, err)
|
||||
return fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
if err := f.verifyResponse(resp, pubkey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
return fmt.Errorf("unexpected range type %T", v)
|
||||
case *objectV2.GetRangePartChunk:
|
||||
origChunk := v.GetChunk()
|
||||
|
||||
chunk := chunkToSend(f.GlobalProgress, localProgress, origChunk)
|
||||
if len(chunk) == 0 {
|
||||
localProgress += len(origChunk)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
||||
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
|
||||
}
|
||||
|
||||
localProgress += len(origChunk)
|
||||
f.GlobalProgress += len(chunk)
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
return object.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
175
pkg/services/object/get/v2/head_forwarder.go
Normal file
|
@ -0,0 +1,175 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type headRequestForwarder struct {
|
||||
Request *objectV2.HeadRequest
|
||||
Response *objectV2.HeadResponse
|
||||
OnceResign *sync.Once
|
||||
ObjectAddr oid.Address
|
||||
Key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
var err error
|
||||
|
||||
// once compose and resign forwarding request
|
||||
f.OnceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(f.Request.GetMetaHeader().GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(f.Request.GetMetaHeader())
|
||||
writeCurrentVersion(metaHdr)
|
||||
|
||||
f.Request.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(f.Key, f.Request)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
headResp, err := f.sendHeadRequest(ctx, addr, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := f.verifyResponse(headResp, pubkey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
hdr *objectV2.Header
|
||||
idSig *refs.Signature
|
||||
)
|
||||
|
||||
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||
case *objectV2.ShortHeader:
|
||||
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case *objectV2.HeaderWithSignature:
|
||||
if hdr, idSig, err = f.getHeaderAndSignature(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
|
||||
objv2 := new(objectV2.Object)
|
||||
objv2.SetHeader(hdr)
|
||||
objv2.SetSignature(idSig)
|
||||
|
||||
obj := object.NewFromV2(objv2)
|
||||
obj.SetID(f.ObjectAddr.Object())
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
|
||||
if !f.Request.GetBody().GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
hdr := new(objectV2.Header)
|
||||
hdr.SetPayloadLength(sh.GetPayloadLength())
|
||||
hdr.SetVersion(sh.GetVersion())
|
||||
hdr.SetOwnerID(sh.GetOwnerID())
|
||||
hdr.SetObjectType(sh.GetObjectType())
|
||||
hdr.SetCreationEpoch(sh.GetCreationEpoch())
|
||||
hdr.SetPayloadHash(sh.GetPayloadHash())
|
||||
hdr.SetHomomorphicHash(sh.GetHomomorphicHash())
|
||||
return hdr, nil
|
||||
}
|
||||
|
||||
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
|
||||
if f.Request.GetBody().GetMainOnly() {
|
||||
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
if hdrWithSig == nil {
|
||||
return nil, nil, errors.New("nil object part")
|
||||
}
|
||||
|
||||
hdr := hdrWithSig.GetHeader()
|
||||
idSig := hdrWithSig.GetSignature()
|
||||
|
||||
if idSig == nil {
|
||||
// TODO(@cthulhu-rider): #1387 use "const" error
|
||||
return nil, nil, errors.New("missing signature")
|
||||
}
|
||||
|
||||
binID, err := f.ObjectAddr.Object().Marshal()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("marshal ID: %w", err)
|
||||
}
|
||||
|
||||
var sig frostfscrypto.Signature
|
||||
if err := sig.ReadFromV2(*idSig); err != nil {
|
||||
return nil, nil, fmt.Errorf("can't read signature: %w", err)
|
||||
}
|
||||
|
||||
if !sig.Verify(binID) {
|
||||
return nil, nil, errors.New("invalid object ID signature")
|
||||
}
|
||||
|
||||
return hdr, idSig, nil
|
||||
}
|
||||
|
||||
func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network.Address, c client.MultiAddressClient) (*objectV2.HeadResponse, error) {
|
||||
var headResp *objectV2.HeadResponse
|
||||
err := c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
var e error
|
||||
headResp, e = rpc.HeadObject(cli, f.Request, rpcclient.WithContext(ctx))
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
return headResp, nil
|
||||
}
|
||||
|
||||
func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, pubkey []byte) error {
|
||||
// verify response key
|
||||
if err := internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
||||
return fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
if err := checkStatus(f.Response.GetMetaHeader().GetStatus()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -6,25 +6,18 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
|
@ -33,7 +26,6 @@ import (
|
|||
|
||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
|
||||
// nolint: funlen, gocognit
|
||||
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
|
||||
body := req.GetBody()
|
||||
|
||||
|
@ -49,8 +41,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
|
|||
return nil, fmt.Errorf("invalid object address: %w", err)
|
||||
}
|
||||
|
||||
meta := req.GetMetaHeader()
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -66,141 +56,26 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
|
|||
p.SetObjectWriter(streamWrapper)
|
||||
|
||||
if !commonPrm.LocalOnly() {
|
||||
var onceResign sync.Once
|
||||
|
||||
var onceHeaderSending sync.Once
|
||||
var globalProgress int
|
||||
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
var err error
|
||||
|
||||
key, err := s.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
fyrchik
commented
Is there any reason you don't just use non-pointer field in struct and omit the initialization? Is there any reason you don't just use non-pointer field in struct and omit the initialization?
dstepanov-yadro
commented
This is the easiest way to meet this requirement: Values containing the types defined in this package should not be copied. (https://pkg.go.dev/sync) This is the easiest way to meet this requirement: *Values containing the types defined in this package should not be copied.* (https://pkg.go.dev/sync)
carpawell
commented
but but `forwarder` is already a pointer? i am ok with that, just want to come to some approach that will be commonly adopted in that repo (the other code do not use pointer inside a pointer i guess)
dstepanov-yadro
commented
But you should always think how holder of > the other code do not use pointer inside a pointer i guess
But you should always think how holder of ```sync.Mutex (RWMutex, Once...)``` will be used.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
writeCurrentVersion(metaHdr)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
forwarder := &getRequestForwarder{
|
||||
OnceResign: &sync.Once{},
|
||||
OnceHeaderSending: &sync.Once{},
|
||||
GlobalProgress: 0,
|
||||
Key: key,
|
||||
Request: req,
|
||||
Stream: streamWrapper,
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.GetObject implementation,
|
||||
// perhaps it is worth highlighting the utility function in frostfs-api-go
|
||||
|
||||
// open stream
|
||||
var getStream *rpc.GetResponseReader
|
||||
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(stream.Context()))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stream opening failed: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
headWas bool
|
||||
resp = new(objectV2.GetResponse)
|
||||
localProgress int
|
||||
)
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := getStream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !headWas {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
internalclient.ReportError(c, err)
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response key
|
||||
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected object part %T", v)
|
||||
case *objectV2.GetObjectPartInit:
|
||||
if headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
headWas = true
|
||||
|
||||
obj := new(objectV2.Object)
|
||||
|
||||
obj.SetObjectID(v.GetObjectID())
|
||||
obj.SetSignature(v.GetSignature())
|
||||
obj.SetHeader(v.GetHeader())
|
||||
|
||||
onceHeaderSending.Do(func() {
|
||||
err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj))
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err)
|
||||
}
|
||||
case *objectV2.GetObjectPartChunk:
|
||||
if !headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
origChunk := v.GetChunk()
|
||||
|
||||
chunk := chunkToSend(globalProgress, localProgress, origChunk)
|
||||
if len(chunk) == 0 {
|
||||
localProgress += len(origChunk)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
|
||||
return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
|
||||
}
|
||||
|
||||
localProgress += len(origChunk)
|
||||
globalProgress += len(chunk)
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}))
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// nolint: funlen, gocognit
|
||||
func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) {
|
||||
body := req.GetBody()
|
||||
|
||||
|
@ -216,8 +91,6 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
return nil, fmt.Errorf("invalid object address: %w", err)
|
||||
}
|
||||
|
||||
meta := req.GetMetaHeader()
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -239,104 +112,20 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
}
|
||||
|
||||
if !commonPrm.LocalOnly() {
|
||||
var onceResign sync.Once
|
||||
var globalProgress int
|
||||
|
||||
key, err := s.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
var err error
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
writeCurrentVersion(metaHdr)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
forwarder := &getRangeRequestForwarder{
|
||||
OnceResign: &sync.Once{},
|
||||
GlobalProgress: 0,
|
||||
Key: key,
|
||||
Request: req,
|
||||
Stream: streamWrapper,
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.ObjectPayloadRangeData implementation,
|
||||
// perhaps it is worth highlighting the utility function in frostfs-api-go
|
||||
|
||||
// open stream
|
||||
var rangeStream *rpc.ObjectRangeResponseReader
|
||||
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(stream.Context()))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
|
||||
}
|
||||
|
||||
resp := new(objectV2.GetRangeResponse)
|
||||
var localProgress int
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := rangeStream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
internalclient.ReportError(c, err)
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response key
|
||||
if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
}
|
||||
|
||||
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected range type %T", v)
|
||||
case *objectV2.GetRangePartChunk:
|
||||
origChunk := v.GetChunk()
|
||||
|
||||
chunk := chunkToSend(globalProgress, localProgress, origChunk)
|
||||
if len(chunk) == 0 {
|
||||
localProgress += len(origChunk)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
|
||||
return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
|
||||
}
|
||||
|
||||
localProgress += len(origChunk)
|
||||
globalProgress += len(chunk)
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}))
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
|
||||
}
|
||||
|
||||
return p, nil
|
||||
|
@ -426,7 +215,6 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
|
|||
return nil
|
||||
}
|
||||
|
||||
// nolint: funlen
|
||||
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
|
||||
body := req.GetBody()
|
||||
|
||||
|
@ -442,8 +230,6 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
|||
return nil, fmt.Errorf("invalid object address: %w", err)
|
||||
}
|
||||
|
||||
meta := req.GetMetaHeader()
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -463,135 +249,20 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
|||
return p, nil
|
||||
}
|
||||
|
||||
var onceResign sync.Once
|
||||
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
var err error
|
||||
|
||||
key, err := s.keyStorage.GetKey(nil)
|
||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
@fyrchik @carpawell Is it an error that @fyrchik @carpawell Is it an error that ```key``` is requested at every head and get request? For get range request the key is requested once.
Fixed it in the last commit.
fyrchik
commented
Not an error, it should return the same key each time used. Not an error, it should return the same key each time used.
dstepanov-yadro
commented
Thx, then it can be requested once. Thx, then it can be requested once.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
writeCurrentVersion(metaHdr)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
forwarder := &headRequestForwarder{
|
||||
Request: req,
|
||||
Response: resp,
|
||||
OnceResign: &sync.Once{},
|
||||
ObjectAddr: objAddr,
|
||||
Key: key,
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.GetObjectHeader implementation,
|
||||
// perhaps it is worth highlighting the utility function in frostfs-api-go
|
||||
|
||||
// send Head request
|
||||
var headResp *objectV2.HeadResponse
|
||||
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response key
|
||||
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
hdr *objectV2.Header
|
||||
idSig *refs.Signature
|
||||
)
|
||||
|
||||
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||
case *objectV2.ShortHeader:
|
||||
if !body.GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
h := v
|
||||
|
||||
hdr = new(objectV2.Header)
|
||||
hdr.SetPayloadLength(h.GetPayloadLength())
|
||||
hdr.SetVersion(h.GetVersion())
|
||||
hdr.SetOwnerID(h.GetOwnerID())
|
||||
hdr.SetObjectType(h.GetObjectType())
|
||||
hdr.SetCreationEpoch(h.GetCreationEpoch())
|
||||
hdr.SetPayloadHash(h.GetPayloadHash())
|
||||
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
||||
case *objectV2.HeaderWithSignature:
|
||||
if body.GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
hdrWithSig := v
|
||||
if hdrWithSig == nil {
|
||||
return nil, errors.New("nil object part")
|
||||
}
|
||||
|
||||
hdr = hdrWithSig.GetHeader()
|
||||
idSig = hdrWithSig.GetSignature()
|
||||
|
||||
if idSig == nil {
|
||||
// TODO(@cthulhu-rider): #1387 use "const" error
|
||||
return nil, errors.New("missing signature")
|
||||
}
|
||||
|
||||
binID, err := objAddr.Object().Marshal()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal ID: %w", err)
|
||||
}
|
||||
|
||||
var sig frostfscrypto.Signature
|
||||
if err := sig.ReadFromV2(*idSig); err != nil {
|
||||
return nil, fmt.Errorf("can't read signature: %w", err)
|
||||
}
|
||||
|
||||
if !sig.Verify(binID) {
|
||||
return nil, errors.New("invalid object ID signature")
|
||||
}
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
|
||||
objv2 := new(objectV2.Object)
|
||||
objv2.SetHeader(hdr)
|
||||
objv2.SetSignature(idSig)
|
||||
|
||||
obj := object.NewFromV2(objv2)
|
||||
obj.SetID(objAddr.Object())
|
||||
|
||||
// convert the object
|
||||
return obj, nil
|
||||
}))
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(forwarder.forwardRequestToNode))
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
@ -659,8 +330,8 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
|
|||
return sh
|
||||
}
|
||||
|
||||
func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
|
||||
return func(info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
|
||||
func groupAddressRequestForwarder(f func(context.Context, network.Address, client.MultiAddressClient, []byte) (*object.Object, error)) getsvc.RequestForwarder {
|
||||
return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) {
|
||||
var (
|
||||
firstErr error
|
||||
res *object.Object
|
||||
|
@ -681,7 +352,7 @@ func groupAddressRequestForwarder(f func(network.Address, client.MultiAddressCli
|
|||
// would be nice to log otherwise
|
||||
}()
|
||||
|
||||
res, err = f(addr, c, key)
|
||||
res, err = f(ctx, addr, c, key)
|
||||
|
||||
return
|
||||
})
|
||||
|
|
Can we use more meaningful commit message for this? Like a sentence you use in the body.
And same for the last commit, we have 2 commits with identical message doing pretty different things.
Fixed