diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go new file mode 100644 index 0000000000..9faa8f7fb7 --- /dev/null +++ b/pkg/services/object/put/distributed.go @@ -0,0 +1,106 @@ +package putsvc + +import ( + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" + "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/pkg/errors" +) + +type distributedTarget struct { + traverseOpts []placement.Option + + workerPool util.WorkerPool + + obj *object.RawObject + + chunks [][]byte + + nodeTargetInitializer func(*network.Address) transformer.ObjectTarget +} + +var errIncompletePut = errors.New("incomplete object put") + +func (t *distributedTarget) WriteHeader(obj *object.RawObject) error { + t.obj = obj + + return nil +} + +func (t *distributedTarget) Write(p []byte) (n int, err error) { + t.chunks = append(t.chunks, p) + + return len(p), nil +} + +func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { + traverser, err := placement.NewTraverser( + append(t.traverseOpts, placement.ForObject(t.obj.GetID()))..., + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not create object placement traverser", t) + } + + sz := 0 + + for i := range t.chunks { + sz += len(t.chunks[i]) + } + + payload := make([]byte, 0, sz) + + for i := range t.chunks { + payload = append(payload, t.chunks[i]...) + } + + t.obj.SetPayload(payload) + +loop: + for { + addrs := traverser.Next() + if len(addrs) == 0 { + break + } + + wg := new(sync.WaitGroup) + + for i := range addrs { + wg.Add(1) + + addr := addrs[i] + + if err := t.workerPool.Submit(func() { + defer wg.Done() + + target := t.nodeTargetInitializer(addr) + + if err := target.WriteHeader(t.obj); err != nil { + // TODO: log error + return + } else if _, err := target.Close(); err != nil { + // TODO: log error + return + } + + traverser.SubmitSuccess() + }); err != nil { + wg.Done() + // TODO: log error + break loop + } + } + + wg.Wait() + } + + if !traverser.Success() { + return nil, errIncompletePut + } + + return new(transformer.AccessIdentifiers). + WithSelfID(t.obj.GetID()), nil +} diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go new file mode 100644 index 0000000000..efa7ffb536 --- /dev/null +++ b/pkg/services/object/put/local.go @@ -0,0 +1,72 @@ +package putsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" + "github.com/pkg/errors" +) + +type localPlacement struct { + builder placement.Builder + + localAddrSrc network.LocalAddressSource +} + +func (p *localPlacement) BuildPlacement(addr *objectSDK.Address, policy *netmap.PlacementPolicy) ([]netmap.Nodes, error) { + vs, err := p.builder.BuildPlacement(addr, policy) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not build object placement", p) + } + + for i := range vs { + for j := range vs[i] { + addr, err := network.AddressFromString(vs[i][j].NetworkAddress()) + if err != nil { + // TODO: log error + continue + } + + if network.IsLocalAddress(p.localAddrSrc, addr) { + return []netmap.Nodes{{vs[i][j]}}, nil + } + } + } + + return nil, errors.Errorf("(%T) local node is outside of object placement", p) +} + +type localTarget struct { + storage *localstore.Storage + + obj *object.RawObject + + payload []byte +} + +func (t *localTarget) WriteHeader(obj *object.RawObject) error { + t.obj = obj + + t.payload = make([]byte, 0, obj.GetPayloadSize()) + + return nil +} + +func (t *localTarget) Write(p []byte) (n int, err error) { + t.payload = append(t.payload, p...) + + return len(p), nil +} + +func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) { + if err := t.storage.Put(t.obj.Object()); err != nil { + return nil, errors.Wrapf(err, "(%T) could not put object to local storage", t) + } + + return new(transformer.AccessIdentifiers). + WithSelfID(t.obj.GetID()), nil +} diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go new file mode 100644 index 0000000000..5fe78e4bba --- /dev/null +++ b/pkg/services/object/put/prm.go @@ -0,0 +1,53 @@ +package putsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/token" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" +) + +type PutInitPrm struct { + local bool + + hdr *object.RawObject + + token *token.SessionToken + + traverseOpts []placement.Option +} + +type PutChunkPrm struct { + chunk []byte +} + +func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm { + if p != nil { + p.hdr = v + } + + return p +} + +func (p *PutInitPrm) WithSession(v *token.SessionToken) *PutInitPrm { + if p != nil { + p.token = v + } + + return p +} + +func (p *PutInitPrm) OnlyLocal(v bool) *PutInitPrm { + if p != nil { + p.local = v + } + + return p +} + +func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { + if p != nil { + p.chunk = v + } + + return p +} diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go new file mode 100644 index 0000000000..ebd550cfe6 --- /dev/null +++ b/pkg/services/object/put/remote.go @@ -0,0 +1,54 @@ +package putsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" + "github.com/pkg/errors" +) + +type remoteTarget struct { + transformer.ObjectTarget + + ctx context.Context + + key *ecdsa.PrivateKey + + addr *network.Address + + obj *object.Object +} + +func (t *remoteTarget) WriteHeader(obj *object.RawObject) error { + t.obj = obj.Object() + + return nil +} + +func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { + addr := t.addr.NetAddr() + + c, err := client.New(t.key, + client.WithAddress(addr), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr) + } + + id, err := c.PutObject(t.ctx, new(client.PutObjectParams). + WithObject( + t.obj.SDK(), + ), + client.WithTTL(1), // FIXME: use constant + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not put object to %s", t, addr) + } + + return new(transformer.AccessIdentifiers). + WithSelfID(id), nil +} diff --git a/pkg/services/object/put/res.go b/pkg/services/object/put/res.go new file mode 100644 index 0000000000..3ac432687b --- /dev/null +++ b/pkg/services/object/put/res.go @@ -0,0 +1,13 @@ +package putsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type PutResponse struct { + id *object.ID +} + +func (r *PutResponse) ObjectID() *object.ID { + return r.id +} diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go new file mode 100644 index 0000000000..f26044d4d4 --- /dev/null +++ b/pkg/services/object/put/service.go @@ -0,0 +1,114 @@ +package putsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" + "github.com/nspcc-dev/neofs-node/pkg/util" +) + +type MaxSizeSource interface { + MaxObjectSize() uint64 +} + +type Service struct { + *cfg +} + +type Option func(*cfg) + +type cfg struct { + key *ecdsa.PrivateKey + + maxSizeSrc MaxSizeSource + + tokenStore *storage.TokenStore + + localStore *localstore.Storage + + cnrSrc container.Source + + netMapSrc netmap.Source + + workerPool util.WorkerPool + + localAddrSrc network.LocalAddressSource +} + +func defaultCfg() *cfg { + return &cfg{ + workerPool: new(util.SyncWorkerPool), + } +} + +func NewService(opts ...Option) *Service { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +func (p *Service) Put(ctx context.Context) (*Streamer, error) { + return &Streamer{ + cfg: p.cfg, + ctx: ctx, + }, nil +} + +func WithKey(v *ecdsa.PrivateKey) Option { + return func(c *cfg) { + c.key = v + } +} + +func WithMaxSizeSource(v MaxSizeSource) Option { + return func(c *cfg) { + c.maxSizeSrc = v + } +} + +func WithTokenStorage(v *storage.TokenStore) Option { + return func(c *cfg) { + c.tokenStore = v + } +} + +func WithLocalStorage(v *localstore.Storage) Option { + return func(c *cfg) { + c.localStore = v + } +} + +func WithContainerSource(v container.Source) Option { + return func(c *cfg) { + c.cnrSrc = v + } +} + +func WithNetworkMapSource(v netmap.Source) Option { + return func(c *cfg) { + c.netMapSrc = v + } +} + +func WithWorkerPool(v util.WorkerPool) Option { + return func(c *cfg) { + c.workerPool = v + } +} + +func WithLocalAddressSource(v network.LocalAddressSource) Option { + return func(c *cfg) { + c.localAddrSrc = v + } +} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go new file mode 100644 index 0000000000..55e5a2c90d --- /dev/null +++ b/pkg/services/object/put/streamer.go @@ -0,0 +1,170 @@ +package putsvc + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" + "github.com/pkg/errors" +) + +type Streamer struct { + *cfg + + ctx context.Context + + target transformer.ObjectTarget +} + +var errNotInit = errors.New("stream not initialized") + +var errInitRecall = errors.New("init recall") + +var errPrivateTokenNotFound = errors.New("private token not found") + +func (p *Streamer) Init(prm *PutInitPrm) error { + // initialize destination target + if err := p.initTarget(prm); err != nil { + return errors.Wrapf(err, "(%T) could not initialize object target", p) + } + + return errors.Wrapf( + p.target.WriteHeader(prm.hdr), + "(%T) could not write header to target", p, + ) +} + +func (p *Streamer) initTarget(prm *PutInitPrm) error { + // prevent re-calling + if p.target != nil { + return errInitRecall + } + + // prepare needed put parameters + if err := p.preparePrm(prm); err != nil { + return errors.Wrapf(err, "(%T) could not prepare put parameters", p) + } + + if prm.token == nil { + // prepare untrusted-Put object target + p.target = p.newCommonTarget(prm) + + return nil + } + + // prepare trusted-Put object target + + // get private token from local storage + pToken := p.tokenStore.Get(prm.token.OwnerID(), prm.token.ID()) + if pToken == nil { + return errPrivateTokenNotFound + } + + p.target = transformer.NewPayloadSizeLimiter( + p.maxSizeSrc.MaxObjectSize(), + func() transformer.ObjectTarget { + return transformer.NewFormatTarget(pToken.SessionKey(), p.newCommonTarget(prm)) + }, + ) + + return nil +} + +func (p *Streamer) preparePrm(prm *PutInitPrm) error { + var err error + + // get latest network map + nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) + if err != nil { + return errors.Wrapf(err, "(%T) could not get latest network map", p) + } + + // get container to store the object + cnr, err := p.cnrSrc.Get(prm.hdr.GetContainerID()) + if err != nil { + return errors.Wrapf(err, "(%T) could not get container by ID", p) + } + + // allocate placement traverser options + prm.traverseOpts = make([]placement.Option, 0, 4) + + // add common options + prm.traverseOpts = append(prm.traverseOpts, + // set processing container + placement.ForContainer(cnr), + + // set identifier of the processing object + placement.ForObject(prm.hdr.GetID()), + ) + + // create placement builder from network map + builder := placement.NewNetworkMapBuilder(nm) + + if prm.local { + // restrict success count to 1 stored copy (to local storage) + prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1)) + + // use local-only placement builder + builder = &localPlacement{ + builder: placement.NewNetworkMapBuilder(nm), + localAddrSrc: p.localAddrSrc, + } + } + + // set placement builder + prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder)) + + return nil +} + +func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { + return &distributedTarget{ + traverseOpts: prm.traverseOpts, + workerPool: p.workerPool, + nodeTargetInitializer: func(addr *network.Address) transformer.ObjectTarget { + if network.IsLocalAddress(p.localAddrSrc, addr) { + return &localTarget{ + storage: p.localStore, + } + } else { + return &remoteTarget{ + ctx: p.ctx, + key: p.key, + addr: addr, + } + } + }, + } +} + +func (p *Streamer) SendChunk(prm *PutChunkPrm) error { + if p.target == nil { + return errNotInit + } + + _, err := p.target.Write(prm.chunk) + + return errors.Wrapf(err, "(%T) could not write payload chunk to target", p) +} + +func (p *Streamer) Close() (*PutResponse, error) { + if p.target == nil { + return nil, errNotInit + } + + ids, err := p.target.Close() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not close object target", p) + } + + id := ids.ParentID() + if id == nil { + id = ids.SelfID() + } + + return &PutResponse{ + id: id, + }, nil +} diff --git a/pkg/services/object/put/v2/service.go b/pkg/services/object/put/v2/service.go new file mode 100644 index 0000000000..d8d80cc08a --- /dev/null +++ b/pkg/services/object/put/v2/service.go @@ -0,0 +1,52 @@ +package putsvc + +import ( + "context" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "github.com/pkg/errors" +) + +// Service implements Put operation of Object service v2. +type Service struct { + *cfg +} + +// Option represents Service constructor option. +type Option func(*cfg) + +type cfg struct { + svc *putsvc.Service +} + +// NewService constructs Service instance from provided options. +func NewService(opts ...Option) *Service { + c := new(cfg) + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +// Put calls internal service and returns v2 object streamer. +func (s *Service) Put(ctx context.Context) (objectV2.PutObjectStreamer, error) { + stream, err := s.svc.Put(ctx) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not open object put stream", s) + } + + return &streamer{ + stream: stream, + }, nil +} + +func WithInternalService(v *putsvc.Service) Option { + return func(c *cfg) { + c.svc = v + } +} diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go new file mode 100644 index 0000000000..61e14cdc76 --- /dev/null +++ b/pkg/services/object/put/v2/streamer.go @@ -0,0 +1,37 @@ +package putsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/object" + putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "github.com/pkg/errors" +) + +type streamer struct { + stream *putsvc.Streamer +} + +func (s *streamer) Send(req *object.PutRequest) (err error) { + switch v := req.GetBody().GetObjectPart().(type) { + case *object.PutObjectPartInit: + if err = s.stream.Init(toInitPrm(v, req.GetMetaHeader().GetSessionToken(), req.GetMetaHeader().GetTTL())); err != nil { + err = errors.Wrapf(err, "(%T) could not init object put stream", s) + } + case *object.PutObjectPartChunk: + if err = s.stream.SendChunk(toChunkPrm(v)); err != nil { + err = errors.Wrapf(err, "(%T) could not send payload chunk", s) + } + default: + err = errors.Errorf("(%T) invalid object put stream part type %T", s, v) + } + + return +} + +func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { + resp, err := s.stream.Close() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not object put stream", s) + } + + return fromPutResponse(resp), nil +} diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go new file mode 100644 index 0000000000..8f4d21ff76 --- /dev/null +++ b/pkg/services/object/put/v2/util.go @@ -0,0 +1,40 @@ +package putsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/token" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" +) + +func toInitPrm(req *objectV2.PutObjectPartInit, t *session.SessionToken, ttl uint32) *putsvc.PutInitPrm { + oV2 := new(objectV2.Object) + oV2.SetObjectID(req.GetObjectID()) + oV2.SetSignature(req.GetSignature()) + oV2.SetHeader(req.GetHeader()) + + return new(putsvc.PutInitPrm). + WithObject( + object.NewRawFromV2(oV2), + ). + WithSession( + token.NewSessionTokenFromV2(t), + ). + OnlyLocal(ttl == 1) // FIXME: use constant +} + +func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm { + return new(putsvc.PutChunkPrm). + WithChunk(req.GetChunk()) +} + +func fromPutResponse(r *putsvc.PutResponse) *objectV2.PutResponse { + body := new(objectV2.PutResponseBody) + body.SetObjectID(r.ObjectID().ToV2()) + + resp := new(objectV2.PutResponse) + resp.SetBody(body) + + return resp +}