diff --git a/pkg/services/object/head/distributed.go b/pkg/services/object/head/distributed.go deleted file mode 100644 index dbff3eaa3..000000000 --- a/pkg/services/object/head/distributed.go +++ /dev/null @@ -1,143 +0,0 @@ -package headsvc - -import ( - "context" - "sync" - - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/network" - svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - "github.com/pkg/errors" -) - -type distributedHeader struct { - *cfg - - w *onceHeaderWriter - - traverser *placement.Traverser -} - -func (h *distributedHeader) head(ctx context.Context, prm *Prm) (*Response, error) { - if err := h.prepare(ctx, prm); err != nil { - return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h) - } - - return h.finish(ctx, prm) -} - -func (h *distributedHeader) prepare(ctx context.Context, prm *Prm) error { - var err error - - // get latest network map - nm, err := netmap.GetLatestNetworkMap(h.netMapSrc) - if err != nil { - return errors.Wrapf(err, "(%T) could not get latest network map", h) - } - - // get container to store the object - cnr, err := h.cnrSrc.Get(prm.addr.ContainerID()) - if err != nil { - return errors.Wrapf(err, "(%T) could not get container by ID", h) - } - - // allocate placement traverser options - traverseOpts := make([]placement.Option, 0, 4) - - // add common options - traverseOpts = append(traverseOpts, - // set processing container - placement.ForContainer(cnr), - - // set success count (1st incoming header) - placement.SuccessAfter(1), - - // set identifier of the processing object - placement.ForObject(prm.addr.ObjectID()), - ) - - // create placement builder from network map - builder := placement.NewNetworkMapBuilder(nm) - - if prm.common.LocalOnly() { - // use local-only placement builder - builder = svcutil.NewLocalPlacement(builder, h.localAddrSrc) - } - - // set placement builder - traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) - - // build placement traverser - if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { - return errors.Wrapf(err, "(%T) could not build placement traverser", h) - } - - return nil -} - -func (h *distributedHeader) finish(ctx context.Context, prm *Prm) (*Response, error) { - resp := new(Response) - - h.w = &onceHeaderWriter{ - once: new(sync.Once), - traverser: h.traverser, - resp: resp, - } - - ctx, h.w.cancel = context.WithCancel(ctx) - -loop: - for { - addrs := h.traverser.Next() - if len(addrs) == 0 { - break - } - - wg := new(sync.WaitGroup) - - for i := range addrs { - wg.Add(1) - - addr := addrs[i] - - if err := h.workerPool.Submit(func() { - defer wg.Done() - - if network.IsLocalAddress(h.localAddrSrc, addr) { - if err := h.localHeader.head(ctx, prm, h.w.write); err != nil { - svcutil.LogServiceError(h.log, "HEAD", addr, err) - } - - return - } - - head, err := h.remoteHeader.Head(ctx, &RemoteHeadPrm{ - commonHeadPrm: prm, - node: addr, - }) - if err != nil { - svcutil.LogServiceError(h.log, "HEAD", addr, err) - - return - } - - h.w.write(head) - }); err != nil { - wg.Done() - - svcutil.LogWorkerPoolError(h.log, "HEAD", err) - - break loop - } - } - - wg.Wait() - } - - if !h.traverser.Success() { - return nil, errors.Wrapf(ErrNotFound, "(%T) incomplete object Head operation", h) - } - - return resp, nil -} diff --git a/pkg/services/object/head/local.go b/pkg/services/object/head/local.go deleted file mode 100644 index 1435c7b44..000000000 --- a/pkg/services/object/head/local.go +++ /dev/null @@ -1,24 +0,0 @@ -package headsvc - -import ( - "context" - - "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/pkg/errors" -) - -type localHeader struct { - storage *engine.StorageEngine -} - -func (h *localHeader) head(ctx context.Context, prm *Prm, handler func(*object.Object)) error { - head, err := engine.HeadRaw(h.storage, prm.addr, prm.raw) - if err != nil { - return errors.Wrapf(err, "(%T) could not get header from local storage", h) - } - - handler(head) - - return nil -} diff --git a/pkg/services/object/head/relation.go b/pkg/services/object/head/relation.go deleted file mode 100644 index da7b7510f..000000000 --- a/pkg/services/object/head/relation.go +++ /dev/null @@ -1,44 +0,0 @@ -package headsvc - -import ( - "context" - - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/object" - objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" -) - -type RelationHeader struct { - srch RelationSearcher - - svc *Service -} - -func NewRelationHeader(srch RelationSearcher, svc *Service) *RelationHeader { - return &RelationHeader{ - srch: srch, - svc: svc, - } -} - -func (h *RelationHeader) HeadRelation(ctx context.Context, addr *objectSDK.Address, prm *objutil.CommonPrm) (*object.Object, error) { - id, err := h.srch.SearchRelation(ctx, addr, prm) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not find relation", h) - } - - a := objectSDK.NewAddress() - a.SetContainerID(addr.ContainerID()) - a.SetObjectID(id) - - r, err := h.svc.Head(ctx, new(Prm). - WithAddress(a). - WithCommonPrm(prm), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive relation header", h) - } - - return r.Header(), nil -} diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index eeeb1cec8..bd7a692b2 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -29,6 +29,8 @@ type RemoteHeadPrm struct { node *network.Address } +var ErrNotFound = errors.New("object header not found") + // NewRemoteHeader creates, initializes and returns new RemoteHeader instance. func NewRemoteHeader(keyStorage *util.KeyStorage, cache *cache.ClientCache, opts ...client.Option) *RemoteHeader { return &RemoteHeader{ diff --git a/pkg/services/object/head/res.go b/pkg/services/object/head/res.go deleted file mode 100644 index b64144f5e..000000000 --- a/pkg/services/object/head/res.go +++ /dev/null @@ -1,13 +0,0 @@ -package headsvc - -import ( - "github.com/nspcc-dev/neofs-node/pkg/core/object" -) - -type Response struct { - hdr *object.Object -} - -func (r *Response) Header() *object.Object { - return r.hdr -} diff --git a/pkg/services/object/head/service.go b/pkg/services/object/head/service.go deleted file mode 100644 index 95ee00f68..000000000 --- a/pkg/services/object/head/service.go +++ /dev/null @@ -1,125 +0,0 @@ -package headsvc - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/container" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/network/cache" - objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/util" - "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type RelationSearcher interface { - SearchRelation(context.Context, *objectSDK.Address, *objutil.CommonPrm) (*objectSDK.ID, error) -} - -type Service struct { - *cfg -} - -type Option func(*cfg) - -type cfg struct { - cnrSrc container.Source - - netMapSrc netmap.Source - - workerPool util.WorkerPool - - localAddrSrc network.LocalAddressSource - - localHeader localHeader - - remoteHeader RemoteHeader - - log *logger.Logger -} - -var ErrNotFound = errors.New("object header not found") - -func defaultCfg() *cfg { - return &cfg{ - workerPool: new(util.SyncWorkerPool), - log: zap.L(), - } -} - -func NewService(opts ...Option) *Service { - c := defaultCfg() - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -func (s *Service) Head(ctx context.Context, prm *Prm) (*Response, error) { - return (&distributedHeader{ - cfg: s.cfg, - }).head(ctx, prm) -} - -func WithKeyStorage(v *objutil.KeyStorage) Option { - return func(c *cfg) { - c.remoteHeader.keyStorage = v - } -} - -func WithLocalStorage(v *engine.StorageEngine) Option { - return func(c *cfg) { - c.localHeader.storage = v - } -} - -func WithContainerSource(v container.Source) Option { - return func(c *cfg) { - c.cnrSrc = v - } -} - -func WithNetworkMapSource(v netmap.Source) Option { - return func(c *cfg) { - c.netMapSrc = v - } -} - -func WithWorkerPool(v util.WorkerPool) Option { - return func(c *cfg) { - c.workerPool = v - } -} - -func WithLocalAddressSource(v network.LocalAddressSource) Option { - return func(c *cfg) { - c.localAddrSrc = v - } -} - -func WithClientCache(v *cache.ClientCache) Option { - return func(c *cfg) { - c.remoteHeader.clientCache = v - } -} - -func WithLogger(l *logger.Logger) Option { - return func(c *cfg) { - c.log = l - } -} - -func WithClientOptions(opts ...client.Option) Option { - return func(c *cfg) { - c.remoteHeader.clientOpts = opts - } -} diff --git a/pkg/services/object/head/util.go b/pkg/services/object/head/util.go deleted file mode 100644 index 002ac52ef..000000000 --- a/pkg/services/object/head/util.go +++ /dev/null @@ -1,27 +0,0 @@ -package headsvc - -import ( - "context" - "sync" - - "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" -) - -type onceHeaderWriter struct { - once *sync.Once - - traverser *placement.Traverser - - resp *Response - - cancel context.CancelFunc -} - -func (w *onceHeaderWriter) write(hdr *object.Object) { - w.once.Do(func() { - w.resp.hdr = hdr - w.traverser.SubmitSuccess() - w.cancel() - }) -} diff --git a/pkg/services/object/head/v2/service.go b/pkg/services/object/head/v2/service.go deleted file mode 100644 index 728a46192..000000000 --- a/pkg/services/object/head/v2/service.go +++ /dev/null @@ -1,57 +0,0 @@ -package headsvc - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/pkg/object" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" - "github.com/pkg/errors" -) - -// Service implements Head operation of Object service v2. -type Service struct { - *cfg -} - -// Option represents Service constructor option. -type Option func(*cfg) - -type cfg struct { - svc *headsvc.Service -} - -// NewService constructs Service instance from provided options. -func NewService(opts ...Option) *Service { - c := new(cfg) - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -// Head calls internal service and returns v2 object header. -func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { - r, err := s.svc.Head(ctx, toPrm(req)) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not get object header", s) - } - - var splitErr *object.SplitInfoError - - if errors.As(err, &splitErr) { - return splitInfoResponse(splitErr.SplitInfo()), nil - } - - return fromResponse(r, req.GetBody().GetMainOnly()), nil -} - -func WithInternalService(v *headsvc.Service) Option { - return func(c *cfg) { - c.svc = v - } -} diff --git a/pkg/services/object/head/v2/util.go b/pkg/services/object/head/v2/util.go deleted file mode 100644 index be41a9cfe..000000000 --- a/pkg/services/object/head/v2/util.go +++ /dev/null @@ -1,69 +0,0 @@ -package headsvc - -import ( - "github.com/nspcc-dev/neofs-api-go/pkg/object" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" -) - -func toPrm(req *objectV2.HeadRequest) *headsvc.Prm { - body := req.GetBody() - - return new(headsvc.Prm). - WithAddress( - object.NewAddressFromV2(body.GetAddress()), - ). - Short(body.GetMainOnly()). - WithCommonPrm(util.CommonPrmFromV2(req)). - WithRaw(body.GetRaw()) -} - -func fromResponse(r *headsvc.Response, short bool) *objectV2.HeadResponse { - fn := fullPartFromResponse - if short { - fn = shortPartFromResponse - } - - body := new(objectV2.HeadResponseBody) - body.SetHeaderPart(fn(r)) - - resp := new(objectV2.HeadResponse) - resp.SetBody(body) - - return resp -} - -func fullPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart { - obj := r.Header().ToV2() - - hs := new(objectV2.HeaderWithSignature) - hs.SetHeader(obj.GetHeader()) - hs.SetSignature(obj.GetSignature()) - - return hs -} - -func shortPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart { - hdr := r.Header().ToV2().GetHeader() - - sh := new(objectV2.ShortHeader) - sh.SetOwnerID(hdr.GetOwnerID()) - sh.SetCreationEpoch(hdr.GetCreationEpoch()) - sh.SetPayloadLength(hdr.GetPayloadLength()) - sh.SetVersion(hdr.GetVersion()) - sh.SetObjectType(hdr.GetObjectType()) - - return sh -} - -func splitInfoResponse(info *object.SplitInfo) *objectV2.HeadResponse { - resp := new(objectV2.HeadResponse) - - body := new(objectV2.HeadResponseBody) - resp.SetBody(body) - - body.SetHeaderPart(info.ToV2()) - - return resp -}