diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index cffb4830..32d90709 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -249,6 +249,7 @@ func initObjectService(c *cfg) { sPutV2 := putsvcV2.NewService( putsvcV2.WithInternalService(sPut), + putsvcV2.WithKeyStorage(keyStorage), ) sSearch := searchsvc.New( diff --git a/pkg/services/object/put/v2/service.go b/pkg/services/object/put/v2/service.go index 2f83db19..38309671 100644 --- a/pkg/services/object/put/v2/service.go +++ b/pkg/services/object/put/v2/service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) // Service implements Put operation of Object service v2. @@ -17,7 +18,8 @@ type Service struct { type Option func(*cfg) type cfg struct { - svc *putsvc.Service + svc *putsvc.Service + keyStorage *util.KeyStorage } // NewService constructs Service instance from provided options. @@ -41,7 +43,8 @@ func (s *Service) Put(ctx context.Context) (object.PutObjectStream, error) { } return &streamer{ - stream: stream, + stream: stream, + keyStorage: s.keyStorage, }, nil } @@ -50,3 +53,9 @@ func WithInternalService(v *putsvc.Service) Option { c.svc = v } } + +func WithKeyStorage(ks *util.KeyStorage) Option { + return func(c *cfg) { + c.keyStorage = ks + } +} diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index ef75ebc0..e09122e1 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -4,14 +4,18 @@ import ( "fmt" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/session" "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/rpc" + sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) type streamer struct { stream *putsvc.Streamer + keyStorage *util.KeyStorage saveChunks bool init *object.PutRequest chunks []*object.PutRequest @@ -47,7 +51,23 @@ func (s *streamer) Send(req *object.PutRequest) (err error) { err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v) } - return + if err != nil || !s.saveChunks { + return + } + + metaHdr := new(sessionV2.RequestMetaHeader) + meta := req.GetMetaHeader() + st := session.NewTokenFromV2(meta.GetSessionToken()) + + metaHdr.SetTTL(meta.GetTTL() - 1) + metaHdr.SetOrigin(meta) + req.SetMetaHeader(metaHdr) + + key, err := s.keyStorage.GetKey(st) + if err != nil { + return err + } + return signature.SignServiceMessage(key, req) } func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {