Refactor getsvc #277

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:object-3606 into master 2023-04-28 14:03:13 +00:00
5 changed files with 133 additions and 80 deletions
Showing only changes of commit ed214f5ca1 - Show all commits

View file

@ -181,7 +181,7 @@ func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser,
} }
} }
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (remoteStorage, bool) { func (exec execCtx) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
rs, err := exec.svc.remoteStorageConstructor.Get(info) rs, err := exec.svc.remoteStorageConstructor.Get(info)
if err != nil { if err != nil {
exec.status = statusUndefined exec.status = statusUndefined

View file

@ -2,6 +2,7 @@ package getsvc
import ( import (
"context" "context"
"crypto/ecdsa"
"crypto/rand" "crypto/rand"
"errors" "errors"
"fmt" "fmt"
@ -117,8 +118,15 @@ func newTestClient() *testClient {
} }
} }
func (c *testClient) GetObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
v, ok := c.results[exec.address().EncodeToString()] c.results[addr.EncodeToString()] = struct {
obj *objectSDK.Object
err error
}{obj: obj, err: err}
}
func (c *testClient) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
v, ok := c.results[address.EncodeToString()]
if !ok { if !ok {
var errNotFound apistatus.ObjectNotFound var errNotFound apistatus.ObjectNotFound
@ -129,14 +137,23 @@ func (c *testClient) GetObject(ctx context.Context, exec *execCtx, _ client.Node
return nil, v.err return nil, v.err
} }
return cutToRange(v.obj, exec.ctxRange()), nil return v.obj, nil
} }
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) { func (c *testClient) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
c.results[addr.EncodeToString()] = struct { return c.Get(ctx, address, requestParams)
obj *objectSDK.Object }
err error
}{obj: obj, err: err} func (c *testClient) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
obj, err := c.Get(ctx, address, requestParams)
if err != nil {
return nil, err
}
return cutToRange(obj, rng), nil
}
func (c *testClient) ForwardRequest(ctx context.Context, info client.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
return nil, fmt.Errorf("not implemented")
} }
func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) { func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
@ -249,6 +266,13 @@ func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte)
return &writePayloadError{} return &writePayloadError{}
} }
type testKeyStorage struct {
}
func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) {
return &ecdsa.PrivateKey{}, nil
}
func TestGetLocalOnly(t *testing.T) { func TestGetLocalOnly(t *testing.T) {
ctx := context.Background() ctx := context.Background()
@ -526,6 +550,7 @@ func TestGetRemoteSmall(t *testing.T) {
}, },
epochSource: testEpochReceiver(curEpoch), epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c, remoteStorageConstructor: c,
keyStore: &testKeyStorage{},
} }
} }
@ -1673,6 +1698,7 @@ func TestGetFromPastEpoch(t *testing.T) {
as[1][1]: c22, as[1][1]: c22,
}, },
}, },
keyStore: &testKeyStorage{},
} }
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()

View file

@ -18,12 +18,12 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
exec.log.Debug(logs.ProcessingNode) exec.log.Debug(logs.ProcessingNode)
client, ok := exec.remoteClient(info) rs, ok := exec.getRemoteStorage(info)
if !ok { if !ok {
return true return true
} }
obj, err := client.GetObject(ctx, exec, info) obj, err := exec.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError var errSplitInfo *objectSDK.SplitInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved var errRemoved *apistatus.ObjectAlreadyRemoved
@ -64,3 +64,37 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
return exec.status != statusUndefined return exec.status != statusUndefined
} }
func (exec *execCtx) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
if exec.isForwardingEnabled() {
return rs.ForwardRequest(ctx, info, exec.prm.forwarder)
}
key, err := exec.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: exec.curProcEpoch,
TTL: exec.prm.common.TTL(),
PrivateKey: key,
SessionToken: exec.prm.common.SessionToken(),
BearerToken: exec.prm.common.BearerToken(),
XHeaders: exec.prm.common.XHeaders(),
IsRaw: exec.isRaw(),
}
if exec.headOnly() {
return rs.Head(ctx, exec.address(), prm)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return rs.Range(ctx, exec.address(), rng, prm)
}
return rs.Get(ctx, exec.address(), prm)
}

View file

@ -5,17 +5,17 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
) )
type epochSource interface { type epochSource interface {
@ -37,18 +37,18 @@ type localStorageEngine interface {
} }
type clientConstructor interface { type clientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error) Get(coreclient.NodeInfo) (coreclient.MultiAddressClient, error)
} }
type remoteStorageConstructor interface { type remoteStorageConstructor interface {
Get(client.NodeInfo) (remoteStorage, error) Get(coreclient.NodeInfo) (remoteStorage, error)
} }
type multiclientRemoteStorageConstructor struct { type multiclientRemoteStorageConstructor struct {
clientConstructor clientConstructor clientConstructor clientConstructor
} }
func (c *multiclientRemoteStorageConstructor) Get(info client.NodeInfo) (remoteStorage, error) { func (c *multiclientRemoteStorageConstructor) Get(info coreclient.NodeInfo) (remoteStorage, error) {
clt, err := c.clientConstructor.Get(info) clt, err := c.clientConstructor.Get(info)
if err != nil { if err != nil {
return nil, err return nil, err
@ -59,10 +59,6 @@ func (c *multiclientRemoteStorageConstructor) Get(info client.NodeInfo) (remoteS
}, nil }, nil
} }
type remoteStorage interface {
GetObject(context.Context, *execCtx, client.NodeInfo) (*objectSDK.Object, error)
}
type localStorage interface { type localStorage interface {
Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error)
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error)
@ -111,48 +107,45 @@ func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*obj
return r.Object(), nil return r.Object(), nil
} }
type RemoteRequestParams struct {
Epoch uint64
TTL uint32
PrivateKey *ecdsa.PrivateKey
SessionToken *session.Object
BearerToken *bearer.Token
XHeaders []string
IsRaw bool
}
type remoteStorage interface {
Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error)
ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error)
}
type multiaddressRemoteStorage struct { type multiaddressRemoteStorage struct {
client coreclient.MultiAddressClient client coreclient.MultiAddressClient
} }
func (s *multiaddressRemoteStorage) GetObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*objectSDK.Object, error) { func (s *multiaddressRemoteStorage) ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
if exec.isForwardingEnabled() { return forwarder(ctx, info, s.client)
return exec.prm.forwarder(ctx, info, s.client)
}
key, err := exec.key()
if err != nil {
return nil, err
}
if exec.headOnly() {
return s.getHeadOnly(ctx, exec, key)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return s.getRange(ctx, exec, key, rng)
}
return s.get(ctx, exec, key)
} }
func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) { func (s *multiaddressRemoteStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
var prm internalclient.PayloadRangePrm var prm internalclient.PayloadRangePrm
prm.SetClient(s.client) prm.SetClient(s.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(requestParams.TTL)
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(requestParams.Epoch)
prm.SetAddress(exec.address()) prm.SetAddress(address)
prm.SetPrivateKey(key) prm.SetPrivateKey(requestParams.PrivateKey)
prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetSessionToken(requestParams.SessionToken)
prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetBearerToken(requestParams.BearerToken)
prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetXHeaders(requestParams.XHeaders)
prm.SetRange(rng) prm.SetRange(rng)
if requestParams.IsRaw {
if exec.isRaw() {
prm.SetRawFlag() prm.SetRawFlag()
} }
@ -160,7 +153,7 @@ func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx,
if err != nil { if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) { if errors.As(err, &errAccessDenied) {
obj, err := s.get(ctx, exec, key) obj, err := s.Get(ctx, address, requestParams)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -173,27 +166,27 @@ func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx,
return nil, new(apistatus.ObjectOutOfRange) return nil, new(apistatus.ObjectOutOfRange)
} }
return payloadOnlyObject(payload[from:to]), nil return s.payloadOnlyObject(payload[from:to]), nil
} }
return nil, err return nil, err
} }
return payloadOnlyObject(res.PayloadRange()), nil return s.payloadOnlyObject(res.PayloadRange()), nil
} }
func (s *multiaddressRemoteStorage) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { func (s *multiaddressRemoteStorage) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
var prm internalclient.HeadObjectPrm var prm internalclient.HeadObjectPrm
prm.SetClient(s.client) prm.SetClient(s.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(requestParams.TTL)
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(requestParams.Epoch)
prm.SetAddress(exec.address()) prm.SetAddress(address)
prm.SetPrivateKey(key) prm.SetPrivateKey(requestParams.PrivateKey)
prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetSessionToken(requestParams.SessionToken)
prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetBearerToken(requestParams.BearerToken)
prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetXHeaders(requestParams.XHeaders)
if exec.isRaw() { if requestParams.IsRaw {
prm.SetRawFlag() prm.SetRawFlag()
} }
@ -205,19 +198,19 @@ func (s *multiaddressRemoteStorage) getHeadOnly(ctx context.Context, exec *execC
return res.Header(), nil return res.Header(), nil
} }
func (s *multiaddressRemoteStorage) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { func (s *multiaddressRemoteStorage) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
var prm internalclient.GetObjectPrm var prm internalclient.GetObjectPrm
prm.SetClient(s.client) prm.SetClient(s.client)
prm.SetTTL(exec.prm.common.TTL()) prm.SetTTL(requestParams.TTL)
prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetNetmapEpoch(requestParams.Epoch)
prm.SetAddress(exec.address()) prm.SetAddress(address)
prm.SetPrivateKey(key) prm.SetPrivateKey(requestParams.PrivateKey)
prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetSessionToken(requestParams.SessionToken)
prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetBearerToken(requestParams.BearerToken)
prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetXHeaders(requestParams.XHeaders)
if exec.isRaw() { if requestParams.IsRaw {
prm.SetRawFlag() prm.SetRawFlag()
} }
@ -228,3 +221,10 @@ func (s *multiaddressRemoteStorage) get(ctx context.Context, exec *execCtx, key
return res.Object(), nil return res.Object(), nil
} }
func (s *multiaddressRemoteStorage) payloadOnlyObject(payload []byte) *objectSDK.Object {
obj := objectSDK.New()
obj.SetPayload(payload)
return obj
}

View file

@ -60,13 +60,6 @@ func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
return w.headWriter.WriteHeader(ctx, o) return w.headWriter.WriteHeader(ctx, o)
} }
func payloadOnlyObject(payload []byte) *object.Object {
obj := object.New()
obj.SetPayload(payload)
return obj
}
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error { func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
_, err := h.hash.Write(p) _, err := h.hash.Write(p)
return err return err