From 05f39639754d59c3200f07a56dc6966e2f859874 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 22 Sep 2020 18:04:08 +0300 Subject: [PATCH] [#38] service/object: Implement simplified object Head service Implement Head service w/o linking object processing and restoration from split-chain. Signed-off-by: Leonard Lyubich --- pkg/services/object/head/distributed.go | 138 ++++++++++++++++++++++++ pkg/services/object/head/local.go | 24 +++++ pkg/services/object/head/prm.go | 35 ++++++ pkg/services/object/head/remote.go | 46 ++++++++ pkg/services/object/head/res.go | 13 +++ pkg/services/object/head/service.go | 92 ++++++++++++++++ pkg/services/object/head/util.go | 27 +++++ pkg/services/object/head/v2/service.go | 50 +++++++++ pkg/services/object/head/v2/util.go | 62 +++++++++++ 9 files changed, 487 insertions(+) create mode 100644 pkg/services/object/head/distributed.go create mode 100644 pkg/services/object/head/local.go create mode 100644 pkg/services/object/head/prm.go create mode 100644 pkg/services/object/head/remote.go create mode 100644 pkg/services/object/head/res.go create mode 100644 pkg/services/object/head/service.go create mode 100644 pkg/services/object/head/util.go create mode 100644 pkg/services/object/head/v2/service.go create mode 100644 pkg/services/object/head/v2/util.go diff --git a/pkg/services/object/head/distributed.go b/pkg/services/object/head/distributed.go new file mode 100644 index 000000000..0e241fe5c --- /dev/null +++ b/pkg/services/object/head/distributed.go @@ -0,0 +1,138 @@ +package headsvc + +import ( + "context" + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/pkg/errors" +) + +type distributedHeader struct { + *cfg + + w *onceHeaderWriter + + traverser *placement.Traverser +} + +func (h *distributedHeader) head(ctx context.Context, prm *Prm) (*Response, error) { + if err := h.prepare(ctx, prm); err != nil { + return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h) + } + + return h.finish(ctx, prm) +} + +func (h *distributedHeader) prepare(ctx context.Context, prm *Prm) error { + var err error + + // get latest network map + nm, err := netmap.GetLatestNetworkMap(h.netMapSrc) + if err != nil { + return errors.Wrapf(err, "(%T) could not get latest network map", h) + } + + // get container to store the object + cnr, err := h.cnrSrc.Get(prm.addr.GetContainerID()) + if err != nil { + return errors.Wrapf(err, "(%T) could not get container by ID", h) + } + + // allocate placement traverser options + traverseOpts := make([]placement.Option, 0, 4) + + // add common options + traverseOpts = append(traverseOpts, + // set processing container + placement.ForContainer(cnr), + // set success count (1st incoming header) + placement.SuccessAfter(1), + ) + + // create placement builder from network map + builder := placement.NewNetworkMapBuilder(nm) + + if prm.local { + // use local-only placement builder + builder = util.NewLocalPlacement(placement.NewNetworkMapBuilder(nm), h.localAddrSrc) + } + + // set placement builder + traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) + + // build placement traverser + if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { + return errors.Wrapf(err, "(%T) could not build placement traverser", h) + } + + return nil +} + +func (h *distributedHeader) finish(ctx context.Context, prm *Prm) (*Response, error) { + resp := new(Response) + + h.w = &onceHeaderWriter{ + once: new(sync.Once), + traverser: h.traverser, + resp: resp, + } + + ctx, h.w.cancel = context.WithCancel(ctx) + +loop: + for { + addrs := h.traverser.Next() + if len(addrs) == 0 { + break + } + + wg := new(sync.WaitGroup) + + for i := range addrs { + wg.Add(1) + + addr := addrs[i] + + if err := h.workerPool.Submit(func() { + defer wg.Done() + + var header interface { + head(context.Context, *Prm, func(*object.Object)) error + } + + if network.IsLocalAddress(h.localAddrSrc, addr) { + header = &localHeader{ + storage: h.localStore, + } + } else { + header = &remoteHeader{ + key: h.key, + node: addr, + } + } + + if err := header.head(ctx, prm, h.w.write); err != nil { + // TODO: log error + return + } + }); err != nil { + wg.Done() + // TODO: log error + break loop + } + } + + wg.Wait() + } + + if !h.traverser.Success() { + return nil, errors.Errorf("(%T) incomplete object Head operation", h) + } + + return resp, nil +} diff --git a/pkg/services/object/head/local.go b/pkg/services/object/head/local.go new file mode 100644 index 000000000..1cec2c1c4 --- /dev/null +++ b/pkg/services/object/head/local.go @@ -0,0 +1,24 @@ +package headsvc + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/pkg/errors" +) + +type localHeader struct { + storage *localstore.Storage +} + +func (h *localHeader) head(ctx context.Context, prm *Prm, handler func(*object.Object)) error { + m, err := h.storage.Head(prm.addr) + if err != nil { + return errors.Wrapf(err, "(%T) could not get header from local storage", h) + } + + handler(m.Head()) + + return nil +} diff --git a/pkg/services/object/head/prm.go b/pkg/services/object/head/prm.go new file mode 100644 index 000000000..c45467d0b --- /dev/null +++ b/pkg/services/object/head/prm.go @@ -0,0 +1,35 @@ +package headsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type Prm struct { + local, short bool + + addr *object.Address +} + +func (p *Prm) OnlyLocal(v bool) *Prm { + if p != nil { + p.local = v + } + + return p +} + +func (p *Prm) Short(v bool) *Prm { + if p != nil { + p.short = v + } + + return p +} + +func (p *Prm) WithAddress(v *object.Address) *Prm { + if p != nil { + p.addr = v + } + + return p +} diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go new file mode 100644 index 000000000..705db3d9e --- /dev/null +++ b/pkg/services/object/head/remote.go @@ -0,0 +1,46 @@ +package headsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/pkg/errors" +) + +type remoteHeader struct { + key *ecdsa.PrivateKey + + node *network.Address +} + +func (h *remoteHeader) head(ctx context.Context, prm *Prm, handler func(*object.Object)) error { + addr := h.node.NetAddr() + + c, err := client.New(h.key, + client.WithAddress(addr), + ) + if err != nil { + return errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr) + } + + p := new(client.ObjectHeaderParams). + WithAddress(prm.addr) + + if prm.short { + p = p.WithMainFields() + } + + hdr, err := c.GetObjectHeader(ctx, p, + client.WithTTL(1), // FIXME: use constant + ) + if err != nil { + return errors.Wrapf(err, "(%T) could not head object in %s", h, addr) + } + + handler(object.NewFromSDK(hdr)) + + return nil +} diff --git a/pkg/services/object/head/res.go b/pkg/services/object/head/res.go new file mode 100644 index 000000000..b64144f5e --- /dev/null +++ b/pkg/services/object/head/res.go @@ -0,0 +1,13 @@ +package headsvc + +import ( + "github.com/nspcc-dev/neofs-node/pkg/core/object" +) + +type Response struct { + hdr *object.Object +} + +func (r *Response) Header() *object.Object { + return r.hdr +} diff --git a/pkg/services/object/head/service.go b/pkg/services/object/head/service.go new file mode 100644 index 000000000..3076a3fb3 --- /dev/null +++ b/pkg/services/object/head/service.go @@ -0,0 +1,92 @@ +package headsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/util" +) + +type Service struct { + *cfg +} + +type Option func(*cfg) + +type cfg struct { + key *ecdsa.PrivateKey + + localStore *localstore.Storage + + cnrSrc container.Source + + netMapSrc netmap.Source + + workerPool util.WorkerPool + + localAddrSrc network.LocalAddressSource +} + +func defaultCfg() *cfg { + return &cfg{ + workerPool: new(util.SyncWorkerPool), + } +} + +func NewService(opts ...Option) *Service { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +func (s *Service) Head(ctx context.Context, prm *Prm) (*Response, error) { + return (&distributedHeader{ + cfg: s.cfg, + }).head(ctx, prm) +} + +func WithKey(v *ecdsa.PrivateKey) Option { + return func(c *cfg) { + c.key = v + } +} + +func WithLocalStorage(v *localstore.Storage) Option { + return func(c *cfg) { + c.localStore = v + } +} + +func WithContainerSource(v container.Source) Option { + return func(c *cfg) { + c.cnrSrc = v + } +} + +func WithNetworkMapSource(v netmap.Source) Option { + return func(c *cfg) { + c.netMapSrc = v + } +} + +func WithWorkerPool(v util.WorkerPool) Option { + return func(c *cfg) { + c.workerPool = v + } +} + +func WithLocalAddressSource(v network.LocalAddressSource) Option { + return func(c *cfg) { + c.localAddrSrc = v + } +} diff --git a/pkg/services/object/head/util.go b/pkg/services/object/head/util.go new file mode 100644 index 000000000..002ac52ef --- /dev/null +++ b/pkg/services/object/head/util.go @@ -0,0 +1,27 @@ +package headsvc + +import ( + "context" + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" +) + +type onceHeaderWriter struct { + once *sync.Once + + traverser *placement.Traverser + + resp *Response + + cancel context.CancelFunc +} + +func (w *onceHeaderWriter) write(hdr *object.Object) { + w.once.Do(func() { + w.resp.hdr = hdr + w.traverser.SubmitSuccess() + w.cancel() + }) +} diff --git a/pkg/services/object/head/v2/service.go b/pkg/services/object/head/v2/service.go new file mode 100644 index 000000000..fbb8985a1 --- /dev/null +++ b/pkg/services/object/head/v2/service.go @@ -0,0 +1,50 @@ +package headsvc + +import ( + "context" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + "github.com/pkg/errors" +) + +// Service implements Head operation of Object service v2. +type Service struct { + *cfg +} + +// Option represents Service constructor option. +type Option func(*cfg) + +type cfg struct { + svc *headsvc.Service +} + +// NewService constructs Service instance from provided options. +func NewService(opts ...Option) *Service { + c := new(cfg) + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +// Head calls internal service and returns v2 object header. +func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { + r, err := s.svc.Head(ctx, toPrm(req)) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not get object header", s) + } + + return fromResponse(r, req.GetBody().GetMainOnly()), nil +} + +func WithInternalService(v *headsvc.Service) Option { + return func(c *cfg) { + c.svc = v + } +} diff --git a/pkg/services/object/head/v2/util.go b/pkg/services/object/head/v2/util.go new file mode 100644 index 000000000..fe1c264a3 --- /dev/null +++ b/pkg/services/object/head/v2/util.go @@ -0,0 +1,62 @@ +package headsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" +) + +func toPrm(req *objectV2.HeadRequest) *headsvc.Prm { + body := req.GetBody() + + return new(headsvc.Prm). + WithAddress( + object.NewAddressFromV2(body.GetAddress()), + ). + Short(body.GetMainOnly()). + OnlyLocal(req.GetMetaHeader().GetTTL() == 1) // FIXME: use constant +} + +func fromResponse(r *headsvc.Response, short bool) *objectV2.HeadResponse { + fn := fullPartFromResponse + if short { + fn = shortPartFromResponse + } + + body := new(objectV2.HeadResponseBody) + body.SetHeaderPart(fn(r)) + + resp := new(objectV2.HeadResponse) + resp.SetBody(body) + + return resp +} + +func fullPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart { + obj := r.Header().ToV2() + + hs := new(objectV2.HeaderWithSignature) + hs.SetHeader(obj.GetHeader()) + hs.SetSignature(obj.GetSignature()) + + p := new(objectV2.GetHeaderPartFull) + p.SetHeaderWithSignature(hs) + + return p +} + +func shortPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart { + hdr := r.Header().ToV2().GetHeader() + + sh := new(objectV2.ShortHeader) + sh.SetOwnerID(hdr.GetOwnerID()) + sh.SetCreationEpoch(hdr.GetCreationEpoch()) + sh.SetPayloadLength(hdr.GetPayloadLength()) + sh.SetVersion(hdr.GetVersion()) + sh.SetObjectType(hdr.GetObjectType()) + + p := new(objectV2.GetHeaderPartShort) + p.SetShortHeader(sh) + + return p +}