forked from TrueCloudLab/frostfs-node
[#501] object/put: reduce TTL of the relayed request
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
3468491224
commit
b8a7c11e57
3 changed files with 33 additions and 3 deletions
|
@ -249,6 +249,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sPutV2 := putsvcV2.NewService(
|
sPutV2 := putsvcV2.NewService(
|
||||||
putsvcV2.WithInternalService(sPut),
|
putsvcV2.WithInternalService(sPut),
|
||||||
|
putsvcV2.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
|
||||||
sSearch := searchsvc.New(
|
sSearch := searchsvc.New(
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Service implements Put operation of Object service v2.
|
// Service implements Put operation of Object service v2.
|
||||||
|
@ -17,7 +18,8 @@ type Service struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
svc *putsvc.Service
|
svc *putsvc.Service
|
||||||
|
keyStorage *util.KeyStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
// NewService constructs Service instance from provided options.
|
||||||
|
@ -41,7 +43,8 @@ func (s *Service) Put(ctx context.Context) (object.PutObjectStream, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &streamer{
|
return &streamer{
|
||||||
stream: stream,
|
stream: stream,
|
||||||
|
keyStorage: s.keyStorage,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,3 +53,9 @@ func WithInternalService(v *putsvc.Service) Option {
|
||||||
c.svc = v
|
c.svc = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithKeyStorage(ks *util.KeyStorage) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.keyStorage = ks
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -4,14 +4,18 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
||||||
|
sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type streamer struct {
|
type streamer struct {
|
||||||
stream *putsvc.Streamer
|
stream *putsvc.Streamer
|
||||||
|
keyStorage *util.KeyStorage
|
||||||
saveChunks bool
|
saveChunks bool
|
||||||
init *object.PutRequest
|
init *object.PutRequest
|
||||||
chunks []*object.PutRequest
|
chunks []*object.PutRequest
|
||||||
|
@ -47,7 +51,23 @@ func (s *streamer) Send(req *object.PutRequest) (err error) {
|
||||||
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
|
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
if err != nil || !s.saveChunks {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
metaHdr := new(sessionV2.RequestMetaHeader)
|
||||||
|
meta := req.GetMetaHeader()
|
||||||
|
st := session.NewTokenFromV2(meta.GetSessionToken())
|
||||||
|
|
||||||
|
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||||
|
metaHdr.SetOrigin(meta)
|
||||||
|
req.SetMetaHeader(metaHdr)
|
||||||
|
|
||||||
|
key, err := s.keyStorage.GetKey(st)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return signature.SignServiceMessage(key, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
|
func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
|
||||||
|
|
Loading…
Reference in a new issue