[#33] service/object: Implement object Put distributed service

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-09-21 17:31:31 +03:00 committed by Alex Vanin
parent dcfb6a6b3a
commit 57f8d3745d
10 changed files with 711 additions and 0 deletions

View file

@ -0,0 +1,106 @@
package putsvc
import (
"sync"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/pkg/errors"
)
type distributedTarget struct {
traverseOpts []placement.Option
workerPool util.WorkerPool
obj *object.RawObject
chunks [][]byte
nodeTargetInitializer func(*network.Address) transformer.ObjectTarget
}
var errIncompletePut = errors.New("incomplete object put")
func (t *distributedTarget) WriteHeader(obj *object.RawObject) error {
t.obj = obj
return nil
}
func (t *distributedTarget) Write(p []byte) (n int, err error) {
t.chunks = append(t.chunks, p)
return len(p), nil
}
func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
traverser, err := placement.NewTraverser(
append(t.traverseOpts, placement.ForObject(t.obj.GetID()))...,
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not create object placement traverser", t)
}
sz := 0
for i := range t.chunks {
sz += len(t.chunks[i])
}
payload := make([]byte, 0, sz)
for i := range t.chunks {
payload = append(payload, t.chunks[i]...)
}
t.obj.SetPayload(payload)
loop:
for {
addrs := traverser.Next()
if len(addrs) == 0 {
break
}
wg := new(sync.WaitGroup)
for i := range addrs {
wg.Add(1)
addr := addrs[i]
if err := t.workerPool.Submit(func() {
defer wg.Done()
target := t.nodeTargetInitializer(addr)
if err := target.WriteHeader(t.obj); err != nil {
// TODO: log error
return
} else if _, err := target.Close(); err != nil {
// TODO: log error
return
}
traverser.SubmitSuccess()
}); err != nil {
wg.Done()
// TODO: log error
break loop
}
}
wg.Wait()
}
if !traverser.Success() {
return nil, errIncompletePut
}
return new(transformer.AccessIdentifiers).
WithSelfID(t.obj.GetID()), nil
}

View file

@ -0,0 +1,72 @@
package putsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors"
)
type localPlacement struct {
builder placement.Builder
localAddrSrc network.LocalAddressSource
}
func (p *localPlacement) BuildPlacement(addr *objectSDK.Address, policy *netmap.PlacementPolicy) ([]netmap.Nodes, error) {
vs, err := p.builder.BuildPlacement(addr, policy)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not build object placement", p)
}
for i := range vs {
for j := range vs[i] {
addr, err := network.AddressFromString(vs[i][j].NetworkAddress())
if err != nil {
// TODO: log error
continue
}
if network.IsLocalAddress(p.localAddrSrc, addr) {
return []netmap.Nodes{{vs[i][j]}}, nil
}
}
}
return nil, errors.Errorf("(%T) local node is outside of object placement", p)
}
type localTarget struct {
storage *localstore.Storage
obj *object.RawObject
payload []byte
}
func (t *localTarget) WriteHeader(obj *object.RawObject) error {
t.obj = obj
t.payload = make([]byte, 0, obj.GetPayloadSize())
return nil
}
func (t *localTarget) Write(p []byte) (n int, err error) {
t.payload = append(t.payload, p...)
return len(p), nil
}
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
if err := t.storage.Put(t.obj.Object()); err != nil {
return nil, errors.Wrapf(err, "(%T) could not put object to local storage", t)
}
return new(transformer.AccessIdentifiers).
WithSelfID(t.obj.GetID()), nil
}

View file

@ -0,0 +1,53 @@
package putsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
)
type PutInitPrm struct {
local bool
hdr *object.RawObject
token *token.SessionToken
traverseOpts []placement.Option
}
type PutChunkPrm struct {
chunk []byte
}
func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm {
if p != nil {
p.hdr = v
}
return p
}
func (p *PutInitPrm) WithSession(v *token.SessionToken) *PutInitPrm {
if p != nil {
p.token = v
}
return p
}
func (p *PutInitPrm) OnlyLocal(v bool) *PutInitPrm {
if p != nil {
p.local = v
}
return p
}
func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
if p != nil {
p.chunk = v
}
return p
}

View file

@ -0,0 +1,54 @@
package putsvc
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors"
)
type remoteTarget struct {
transformer.ObjectTarget
ctx context.Context
key *ecdsa.PrivateKey
addr *network.Address
obj *object.Object
}
func (t *remoteTarget) WriteHeader(obj *object.RawObject) error {
t.obj = obj.Object()
return nil
}
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
addr := t.addr.NetAddr()
c, err := client.New(t.key,
client.WithAddress(addr),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr)
}
id, err := c.PutObject(t.ctx, new(client.PutObjectParams).
WithObject(
t.obj.SDK(),
),
client.WithTTL(1), // FIXME: use constant
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not put object to %s", t, addr)
}
return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
}

View file

@ -0,0 +1,13 @@
package putsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
type PutResponse struct {
id *object.ID
}
func (r *PutResponse) ObjectID() *object.ID {
return r.id
}

View file

@ -0,0 +1,114 @@
package putsvc
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
"github.com/nspcc-dev/neofs-node/pkg/util"
)
type MaxSizeSource interface {
MaxObjectSize() uint64
}
type Service struct {
*cfg
}
type Option func(*cfg)
type cfg struct {
key *ecdsa.PrivateKey
maxSizeSrc MaxSizeSource
tokenStore *storage.TokenStore
localStore *localstore.Storage
cnrSrc container.Source
netMapSrc netmap.Source
workerPool util.WorkerPool
localAddrSrc network.LocalAddressSource
}
func defaultCfg() *cfg {
return &cfg{
workerPool: new(util.SyncWorkerPool),
}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func (p *Service) Put(ctx context.Context) (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
ctx: ctx,
}, nil
}
func WithKey(v *ecdsa.PrivateKey) Option {
return func(c *cfg) {
c.key = v
}
}
func WithMaxSizeSource(v MaxSizeSource) Option {
return func(c *cfg) {
c.maxSizeSrc = v
}
}
func WithTokenStorage(v *storage.TokenStore) Option {
return func(c *cfg) {
c.tokenStore = v
}
}
func WithLocalStorage(v *localstore.Storage) Option {
return func(c *cfg) {
c.localStore = v
}
}
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
func WithNetworkMapSource(v netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = v
}
}
func WithWorkerPool(v util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = v
}
}
func WithLocalAddressSource(v network.LocalAddressSource) Option {
return func(c *cfg) {
c.localAddrSrc = v
}
}

View file

@ -0,0 +1,170 @@
package putsvc
import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors"
)
type Streamer struct {
*cfg
ctx context.Context
target transformer.ObjectTarget
}
var errNotInit = errors.New("stream not initialized")
var errInitRecall = errors.New("init recall")
var errPrivateTokenNotFound = errors.New("private token not found")
func (p *Streamer) Init(prm *PutInitPrm) error {
// initialize destination target
if err := p.initTarget(prm); err != nil {
return errors.Wrapf(err, "(%T) could not initialize object target", p)
}
return errors.Wrapf(
p.target.WriteHeader(prm.hdr),
"(%T) could not write header to target", p,
)
}
func (p *Streamer) initTarget(prm *PutInitPrm) error {
// prevent re-calling
if p.target != nil {
return errInitRecall
}
// prepare needed put parameters
if err := p.preparePrm(prm); err != nil {
return errors.Wrapf(err, "(%T) could not prepare put parameters", p)
}
if prm.token == nil {
// prepare untrusted-Put object target
p.target = p.newCommonTarget(prm)
return nil
}
// prepare trusted-Put object target
// get private token from local storage
pToken := p.tokenStore.Get(prm.token.OwnerID(), prm.token.ID())
if pToken == nil {
return errPrivateTokenNotFound
}
p.target = transformer.NewPayloadSizeLimiter(
p.maxSizeSrc.MaxObjectSize(),
func() transformer.ObjectTarget {
return transformer.NewFormatTarget(pToken.SessionKey(), p.newCommonTarget(prm))
},
)
return nil
}
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to store the object
cnr, err := p.cnrSrc.Get(prm.hdr.GetContainerID())
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
prm.traverseOpts = make([]placement.Option, 0, 4)
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set identifier of the processing object
placement.ForObject(prm.hdr.GetID()),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.local {
// restrict success count to 1 stored copy (to local storage)
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = &localPlacement{
builder: placement.NewNetworkMapBuilder(nm),
localAddrSrc: p.localAddrSrc,
}
}
// set placement builder
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
return nil
}
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
return &distributedTarget{
traverseOpts: prm.traverseOpts,
workerPool: p.workerPool,
nodeTargetInitializer: func(addr *network.Address) transformer.ObjectTarget {
if network.IsLocalAddress(p.localAddrSrc, addr) {
return &localTarget{
storage: p.localStore,
}
} else {
return &remoteTarget{
ctx: p.ctx,
key: p.key,
addr: addr,
}
}
},
}
}
func (p *Streamer) SendChunk(prm *PutChunkPrm) error {
if p.target == nil {
return errNotInit
}
_, err := p.target.Write(prm.chunk)
return errors.Wrapf(err, "(%T) could not write payload chunk to target", p)
}
func (p *Streamer) Close() (*PutResponse, error) {
if p.target == nil {
return nil, errNotInit
}
ids, err := p.target.Close()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not close object target", p)
}
id := ids.ParentID()
if id == nil {
id = ids.SelfID()
}
return &PutResponse{
id: id,
}, nil
}

View file

@ -0,0 +1,52 @@
package putsvc
import (
"context"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/pkg/errors"
)
// Service implements Put operation of Object service v2.
type Service struct {
*cfg
}
// Option represents Service constructor option.
type Option func(*cfg)
type cfg struct {
svc *putsvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
// Put calls internal service and returns v2 object streamer.
func (s *Service) Put(ctx context.Context) (objectV2.PutObjectStreamer, error) {
stream, err := s.svc.Put(ctx)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not open object put stream", s)
}
return &streamer{
stream: stream,
}, nil
}
func WithInternalService(v *putsvc.Service) Option {
return func(c *cfg) {
c.svc = v
}
}

View file

@ -0,0 +1,37 @@
package putsvc
import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/pkg/errors"
)
type streamer struct {
stream *putsvc.Streamer
}
func (s *streamer) Send(req *object.PutRequest) (err error) {
switch v := req.GetBody().GetObjectPart().(type) {
case *object.PutObjectPartInit:
if err = s.stream.Init(toInitPrm(v, req.GetMetaHeader().GetSessionToken(), req.GetMetaHeader().GetTTL())); err != nil {
err = errors.Wrapf(err, "(%T) could not init object put stream", s)
}
case *object.PutObjectPartChunk:
if err = s.stream.SendChunk(toChunkPrm(v)); err != nil {
err = errors.Wrapf(err, "(%T) could not send payload chunk", s)
}
default:
err = errors.Errorf("(%T) invalid object put stream part type %T", s, v)
}
return
}
func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
resp, err := s.stream.Close()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not object put stream", s)
}
return fromPutResponse(resp), nil
}

View file

@ -0,0 +1,40 @@
package putsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/token"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
)
func toInitPrm(req *objectV2.PutObjectPartInit, t *session.SessionToken, ttl uint32) *putsvc.PutInitPrm {
oV2 := new(objectV2.Object)
oV2.SetObjectID(req.GetObjectID())
oV2.SetSignature(req.GetSignature())
oV2.SetHeader(req.GetHeader())
return new(putsvc.PutInitPrm).
WithObject(
object.NewRawFromV2(oV2),
).
WithSession(
token.NewSessionTokenFromV2(t),
).
OnlyLocal(ttl == 1) // FIXME: use constant
}
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {
return new(putsvc.PutChunkPrm).
WithChunk(req.GetChunk())
}
func fromPutResponse(r *putsvc.PutResponse) *objectV2.PutResponse {
body := new(objectV2.PutResponseBody)
body.SetObjectID(r.ObjectID().ToV2())
resp := new(objectV2.PutResponse)
resp.SetBody(body)
return resp
}