diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index e57decb6..6b8e0e99 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -181,7 +181,7 @@ func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, } } -func (exec execCtx) remoteClient(info clientcore.NodeInfo) (remoteStorage, bool) { +func (exec execCtx) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) { rs, err := exec.svc.remoteStorageConstructor.Get(info) if err != nil { exec.status = statusUndefined diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 15a14ac1..d6ff0088 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -2,6 +2,7 @@ package getsvc import ( "context" + "crypto/ecdsa" "crypto/rand" "errors" "fmt" @@ -117,8 +118,15 @@ func newTestClient() *testClient { } } -func (c *testClient) GetObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { - v, ok := c.results[exec.address().EncodeToString()] +func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) { + c.results[addr.EncodeToString()] = struct { + obj *objectSDK.Object + err error + }{obj: obj, err: err} +} + +func (c *testClient) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) { + v, ok := c.results[address.EncodeToString()] if !ok { var errNotFound apistatus.ObjectNotFound @@ -129,14 +137,23 @@ func (c *testClient) GetObject(ctx context.Context, exec *execCtx, _ client.Node return nil, v.err } - return cutToRange(v.obj, exec.ctxRange()), nil + return v.obj, nil } -func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) { - c.results[addr.EncodeToString()] = struct { - obj *objectSDK.Object - err error - }{obj: obj, err: err} +func (c *testClient) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) { + return c.Get(ctx, address, requestParams) +} + +func (c *testClient) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) { + obj, err := c.Get(ctx, address, requestParams) + if err != nil { + return nil, err + } + return cutToRange(obj, rng), nil +} + +func (c *testClient) ForwardRequest(ctx context.Context, info client.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) { + return nil, fmt.Errorf("not implemented") } func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) { @@ -249,6 +266,13 @@ func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte) return &writePayloadError{} } +type testKeyStorage struct { +} + +func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) { + return &ecdsa.PrivateKey{}, nil +} + func TestGetLocalOnly(t *testing.T) { ctx := context.Background() @@ -526,6 +550,7 @@ func TestGetRemoteSmall(t *testing.T) { }, epochSource: testEpochReceiver(curEpoch), remoteStorageConstructor: c, + keyStore: &testKeyStorage{}, } } @@ -1673,6 +1698,7 @@ func TestGetFromPastEpoch(t *testing.T) { as[1][1]: c22, }, }, + keyStore: &testKeyStorage{}, } w := NewSimpleObjectWriter() diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index 56be476f..d4c5d7a6 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -18,12 +18,12 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool exec.log.Debug(logs.ProcessingNode) - client, ok := exec.remoteClient(info) + rs, ok := exec.getRemoteStorage(info) if !ok { return true } - obj, err := client.GetObject(ctx, exec, info) + obj, err := exec.getRemote(ctx, rs, info) var errSplitInfo *objectSDK.SplitInfoError var errRemoved *apistatus.ObjectAlreadyRemoved @@ -64,3 +64,37 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool return exec.status != statusUndefined } + +func (exec *execCtx) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) { + if exec.isForwardingEnabled() { + return rs.ForwardRequest(ctx, info, exec.prm.forwarder) + } + + key, err := exec.key() + if err != nil { + return nil, err + } + + prm := RemoteRequestParams{ + Epoch: exec.curProcEpoch, + TTL: exec.prm.common.TTL(), + PrivateKey: key, + SessionToken: exec.prm.common.SessionToken(), + BearerToken: exec.prm.common.BearerToken(), + XHeaders: exec.prm.common.XHeaders(), + IsRaw: exec.isRaw(), + } + + if exec.headOnly() { + return rs.Head(ctx, exec.address(), prm) + } + // we don't specify payload writer because we accumulate + // the object locally (even huge). + if rng := exec.ctxRange(); rng != nil { + // Current spec allows other storage node to deny access, + // fallback to GET here. + return rs.Range(ctx, exec.address(), rng, prm) + } + + return rs.Get(ctx, exec.address(), prm) +} diff --git a/pkg/services/object/get/types.go b/pkg/services/object/get/types.go index 47f27c0f..800d3799 100644 --- a/pkg/services/object/get/types.go +++ b/pkg/services/object/get/types.go @@ -5,17 +5,17 @@ import ( "crypto/ecdsa" "errors" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" ) type epochSource interface { @@ -37,18 +37,18 @@ type localStorageEngine interface { } type clientConstructor interface { - Get(client.NodeInfo) (client.MultiAddressClient, error) + Get(coreclient.NodeInfo) (coreclient.MultiAddressClient, error) } type remoteStorageConstructor interface { - Get(client.NodeInfo) (remoteStorage, error) + Get(coreclient.NodeInfo) (remoteStorage, error) } type multiclientRemoteStorageConstructor struct { clientConstructor clientConstructor } -func (c *multiclientRemoteStorageConstructor) Get(info client.NodeInfo) (remoteStorage, error) { +func (c *multiclientRemoteStorageConstructor) Get(info coreclient.NodeInfo) (remoteStorage, error) { clt, err := c.clientConstructor.Get(info) if err != nil { return nil, err @@ -59,10 +59,6 @@ func (c *multiclientRemoteStorageConstructor) Get(info client.NodeInfo) (remoteS }, nil } -type remoteStorage interface { - GetObject(context.Context, *execCtx, client.NodeInfo) (*objectSDK.Object, error) -} - type localStorage interface { Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) @@ -111,48 +107,45 @@ func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*obj return r.Object(), nil } +type RemoteRequestParams struct { + Epoch uint64 + TTL uint32 + PrivateKey *ecdsa.PrivateKey + SessionToken *session.Object + BearerToken *bearer.Token + XHeaders []string + IsRaw bool +} + +type remoteStorage interface { + Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) + Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) + Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) + + ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) +} + type multiaddressRemoteStorage struct { client coreclient.MultiAddressClient } -func (s *multiaddressRemoteStorage) GetObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*objectSDK.Object, error) { - if exec.isForwardingEnabled() { - return exec.prm.forwarder(ctx, info, s.client) - } - - key, err := exec.key() - if err != nil { - return nil, err - } - - if exec.headOnly() { - return s.getHeadOnly(ctx, exec, key) - } - // we don't specify payload writer because we accumulate - // the object locally (even huge). - if rng := exec.ctxRange(); rng != nil { - // Current spec allows other storage node to deny access, - // fallback to GET here. - return s.getRange(ctx, exec, key, rng) - } - - return s.get(ctx, exec, key) +func (s *multiaddressRemoteStorage) ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) { + return forwarder(ctx, info, s.client) } -func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) { +func (s *multiaddressRemoteStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) { var prm internalclient.PayloadRangePrm prm.SetClient(s.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) + prm.SetTTL(requestParams.TTL) + prm.SetNetmapEpoch(requestParams.Epoch) + prm.SetAddress(address) + prm.SetPrivateKey(requestParams.PrivateKey) + prm.SetSessionToken(requestParams.SessionToken) + prm.SetBearerToken(requestParams.BearerToken) + prm.SetXHeaders(requestParams.XHeaders) prm.SetRange(rng) - - if exec.isRaw() { + if requestParams.IsRaw { prm.SetRawFlag() } @@ -160,7 +153,7 @@ func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx, if err != nil { var errAccessDenied *apistatus.ObjectAccessDenied if errors.As(err, &errAccessDenied) { - obj, err := s.get(ctx, exec, key) + obj, err := s.Get(ctx, address, requestParams) if err != nil { return nil, err } @@ -173,27 +166,27 @@ func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx, return nil, new(apistatus.ObjectOutOfRange) } - return payloadOnlyObject(payload[from:to]), nil + return s.payloadOnlyObject(payload[from:to]), nil } return nil, err } - return payloadOnlyObject(res.PayloadRange()), nil + return s.payloadOnlyObject(res.PayloadRange()), nil } -func (s *multiaddressRemoteStorage) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { +func (s *multiaddressRemoteStorage) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) { var prm internalclient.HeadObjectPrm prm.SetClient(s.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) + prm.SetTTL(requestParams.TTL) + prm.SetNetmapEpoch(requestParams.Epoch) + prm.SetAddress(address) + prm.SetPrivateKey(requestParams.PrivateKey) + prm.SetSessionToken(requestParams.SessionToken) + prm.SetBearerToken(requestParams.BearerToken) + prm.SetXHeaders(requestParams.XHeaders) - if exec.isRaw() { + if requestParams.IsRaw { prm.SetRawFlag() } @@ -205,19 +198,19 @@ func (s *multiaddressRemoteStorage) getHeadOnly(ctx context.Context, exec *execC return res.Header(), nil } -func (s *multiaddressRemoteStorage) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { +func (s *multiaddressRemoteStorage) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) { var prm internalclient.GetObjectPrm prm.SetClient(s.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) + prm.SetTTL(requestParams.TTL) + prm.SetNetmapEpoch(requestParams.Epoch) + prm.SetAddress(address) + prm.SetPrivateKey(requestParams.PrivateKey) + prm.SetSessionToken(requestParams.SessionToken) + prm.SetBearerToken(requestParams.BearerToken) + prm.SetXHeaders(requestParams.XHeaders) - if exec.isRaw() { + if requestParams.IsRaw { prm.SetRawFlag() } @@ -228,3 +221,10 @@ func (s *multiaddressRemoteStorage) get(ctx context.Context, exec *execCtx, key return res.Object(), nil } + +func (s *multiaddressRemoteStorage) payloadOnlyObject(payload []byte) *objectSDK.Object { + obj := objectSDK.New() + obj.SetPayload(payload) + + return obj +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 3038d0d5..2e01809d 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -60,13 +60,6 @@ func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error { return w.headWriter.WriteHeader(ctx, o) } -func payloadOnlyObject(payload []byte) *object.Object { - obj := object.New() - obj.SetPayload(payload) - - return obj -} - func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error { _, err := h.hash.Write(p) return err