forked from TrueCloudLab/frostfs-node
[#291] Remove unused code from pkg/services/object/head
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
3260e9263e
commit
a51211eda7
9 changed files with 2 additions and 502 deletions
|
@ -1,143 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type distributedHeader struct {
|
|
||||||
*cfg
|
|
||||||
|
|
||||||
w *onceHeaderWriter
|
|
||||||
|
|
||||||
traverser *placement.Traverser
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *distributedHeader) head(ctx context.Context, prm *Prm) (*Response, error) {
|
|
||||||
if err := h.prepare(ctx, prm); err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
return h.finish(ctx, prm)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *distributedHeader) prepare(ctx context.Context, prm *Prm) error {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// get latest network map
|
|
||||||
nm, err := netmap.GetLatestNetworkMap(h.netMapSrc)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get latest network map", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
// get container to store the object
|
|
||||||
cnr, err := h.cnrSrc.Get(prm.addr.ContainerID())
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get container by ID", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
// allocate placement traverser options
|
|
||||||
traverseOpts := make([]placement.Option, 0, 4)
|
|
||||||
|
|
||||||
// add common options
|
|
||||||
traverseOpts = append(traverseOpts,
|
|
||||||
// set processing container
|
|
||||||
placement.ForContainer(cnr),
|
|
||||||
|
|
||||||
// set success count (1st incoming header)
|
|
||||||
placement.SuccessAfter(1),
|
|
||||||
|
|
||||||
// set identifier of the processing object
|
|
||||||
placement.ForObject(prm.addr.ObjectID()),
|
|
||||||
)
|
|
||||||
|
|
||||||
// create placement builder from network map
|
|
||||||
builder := placement.NewNetworkMapBuilder(nm)
|
|
||||||
|
|
||||||
if prm.common.LocalOnly() {
|
|
||||||
// use local-only placement builder
|
|
||||||
builder = svcutil.NewLocalPlacement(builder, h.localAddrSrc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// set placement builder
|
|
||||||
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
|
|
||||||
|
|
||||||
// build placement traverser
|
|
||||||
if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not build placement traverser", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *distributedHeader) finish(ctx context.Context, prm *Prm) (*Response, error) {
|
|
||||||
resp := new(Response)
|
|
||||||
|
|
||||||
h.w = &onceHeaderWriter{
|
|
||||||
once: new(sync.Once),
|
|
||||||
traverser: h.traverser,
|
|
||||||
resp: resp,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, h.w.cancel = context.WithCancel(ctx)
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
addrs := h.traverser.Next()
|
|
||||||
if len(addrs) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
|
|
||||||
for i := range addrs {
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
addr := addrs[i]
|
|
||||||
|
|
||||||
if err := h.workerPool.Submit(func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
if network.IsLocalAddress(h.localAddrSrc, addr) {
|
|
||||||
if err := h.localHeader.head(ctx, prm, h.w.write); err != nil {
|
|
||||||
svcutil.LogServiceError(h.log, "HEAD", addr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
head, err := h.remoteHeader.Head(ctx, &RemoteHeadPrm{
|
|
||||||
commonHeadPrm: prm,
|
|
||||||
node: addr,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
svcutil.LogServiceError(h.log, "HEAD", addr, err)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
h.w.write(head)
|
|
||||||
}); err != nil {
|
|
||||||
wg.Done()
|
|
||||||
|
|
||||||
svcutil.LogWorkerPoolError(h.log, "HEAD", err)
|
|
||||||
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
if !h.traverser.Success() {
|
|
||||||
return nil, errors.Wrapf(ErrNotFound, "(%T) incomplete object Head operation", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
|
@ -1,24 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type localHeader struct {
|
|
||||||
storage *engine.StorageEngine
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *localHeader) head(ctx context.Context, prm *Prm, handler func(*object.Object)) error {
|
|
||||||
head, err := engine.HeadRaw(h.storage, prm.addr, prm.raw)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get header from local storage", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
handler(head)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,44 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type RelationHeader struct {
|
|
||||||
srch RelationSearcher
|
|
||||||
|
|
||||||
svc *Service
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRelationHeader(srch RelationSearcher, svc *Service) *RelationHeader {
|
|
||||||
return &RelationHeader{
|
|
||||||
srch: srch,
|
|
||||||
svc: svc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *RelationHeader) HeadRelation(ctx context.Context, addr *objectSDK.Address, prm *objutil.CommonPrm) (*object.Object, error) {
|
|
||||||
id, err := h.srch.SearchRelation(ctx, addr, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not find relation", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
a := objectSDK.NewAddress()
|
|
||||||
a.SetContainerID(addr.ContainerID())
|
|
||||||
a.SetObjectID(id)
|
|
||||||
|
|
||||||
r, err := h.svc.Head(ctx, new(Prm).
|
|
||||||
WithAddress(a).
|
|
||||||
WithCommonPrm(prm),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not receive relation header", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Header(), nil
|
|
||||||
}
|
|
|
@ -29,6 +29,8 @@ type RemoteHeadPrm struct {
|
||||||
node *network.Address
|
node *network.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrNotFound = errors.New("object header not found")
|
||||||
|
|
||||||
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
|
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
|
||||||
func NewRemoteHeader(keyStorage *util.KeyStorage, cache *cache.ClientCache, opts ...client.Option) *RemoteHeader {
|
func NewRemoteHeader(keyStorage *util.KeyStorage, cache *cache.ClientCache, opts ...client.Option) *RemoteHeader {
|
||||||
return &RemoteHeader{
|
return &RemoteHeader{
|
||||||
|
|
|
@ -1,13 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Response struct {
|
|
||||||
hdr *object.Object
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Response) Header() *object.Object {
|
|
||||||
return r.hdr
|
|
||||||
}
|
|
|
@ -1,125 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
|
||||||
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type RelationSearcher interface {
|
|
||||||
SearchRelation(context.Context, *objectSDK.Address, *objutil.CommonPrm) (*objectSDK.ID, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Service struct {
|
|
||||||
*cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
cnrSrc container.Source
|
|
||||||
|
|
||||||
netMapSrc netmap.Source
|
|
||||||
|
|
||||||
workerPool util.WorkerPool
|
|
||||||
|
|
||||||
localAddrSrc network.LocalAddressSource
|
|
||||||
|
|
||||||
localHeader localHeader
|
|
||||||
|
|
||||||
remoteHeader RemoteHeader
|
|
||||||
|
|
||||||
log *logger.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
var ErrNotFound = errors.New("object header not found")
|
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
|
||||||
return &cfg{
|
|
||||||
workerPool: new(util.SyncWorkerPool),
|
|
||||||
log: zap.L(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewService(opts ...Option) *Service {
|
|
||||||
c := defaultCfg()
|
|
||||||
|
|
||||||
for i := range opts {
|
|
||||||
opts[i](c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Service{
|
|
||||||
cfg: c,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) Head(ctx context.Context, prm *Prm) (*Response, error) {
|
|
||||||
return (&distributedHeader{
|
|
||||||
cfg: s.cfg,
|
|
||||||
}).head(ctx, prm)
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithKeyStorage(v *objutil.KeyStorage) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.remoteHeader.keyStorage = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLocalStorage(v *engine.StorageEngine) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.localHeader.storage = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithContainerSource(v container.Source) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.cnrSrc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithNetworkMapSource(v netmap.Source) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.netMapSrc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithWorkerPool(v util.WorkerPool) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.workerPool = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLocalAddressSource(v network.LocalAddressSource) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.localAddrSrc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithClientCache(v *cache.ClientCache) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.remoteHeader.clientCache = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLogger(l *logger.Logger) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.log = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithClientOptions(opts ...client.Option) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.remoteHeader.clientOpts = opts
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,27 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
||||||
)
|
|
||||||
|
|
||||||
type onceHeaderWriter struct {
|
|
||||||
once *sync.Once
|
|
||||||
|
|
||||||
traverser *placement.Traverser
|
|
||||||
|
|
||||||
resp *Response
|
|
||||||
|
|
||||||
cancel context.CancelFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *onceHeaderWriter) write(hdr *object.Object) {
|
|
||||||
w.once.Do(func() {
|
|
||||||
w.resp.hdr = hdr
|
|
||||||
w.traverser.SubmitSuccess()
|
|
||||||
w.cancel()
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
||||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Service implements Head operation of Object service v2.
|
|
||||||
type Service struct {
|
|
||||||
*cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option represents Service constructor option.
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
svc *headsvc.Service
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
|
||||||
func NewService(opts ...Option) *Service {
|
|
||||||
c := new(cfg)
|
|
||||||
|
|
||||||
for i := range opts {
|
|
||||||
opts[i](c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Service{
|
|
||||||
cfg: c,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Head calls internal service and returns v2 object header.
|
|
||||||
func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
|
||||||
r, err := s.svc.Head(ctx, toPrm(req))
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not get object header", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
var splitErr *object.SplitInfoError
|
|
||||||
|
|
||||||
if errors.As(err, &splitErr) {
|
|
||||||
return splitInfoResponse(splitErr.SplitInfo()), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return fromResponse(r, req.GetBody().GetMainOnly()), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithInternalService(v *headsvc.Service) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.svc = v
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,69 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
||||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
func toPrm(req *objectV2.HeadRequest) *headsvc.Prm {
|
|
||||||
body := req.GetBody()
|
|
||||||
|
|
||||||
return new(headsvc.Prm).
|
|
||||||
WithAddress(
|
|
||||||
object.NewAddressFromV2(body.GetAddress()),
|
|
||||||
).
|
|
||||||
Short(body.GetMainOnly()).
|
|
||||||
WithCommonPrm(util.CommonPrmFromV2(req)).
|
|
||||||
WithRaw(body.GetRaw())
|
|
||||||
}
|
|
||||||
|
|
||||||
func fromResponse(r *headsvc.Response, short bool) *objectV2.HeadResponse {
|
|
||||||
fn := fullPartFromResponse
|
|
||||||
if short {
|
|
||||||
fn = shortPartFromResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
body := new(objectV2.HeadResponseBody)
|
|
||||||
body.SetHeaderPart(fn(r))
|
|
||||||
|
|
||||||
resp := new(objectV2.HeadResponse)
|
|
||||||
resp.SetBody(body)
|
|
||||||
|
|
||||||
return resp
|
|
||||||
}
|
|
||||||
|
|
||||||
func fullPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart {
|
|
||||||
obj := r.Header().ToV2()
|
|
||||||
|
|
||||||
hs := new(objectV2.HeaderWithSignature)
|
|
||||||
hs.SetHeader(obj.GetHeader())
|
|
||||||
hs.SetSignature(obj.GetSignature())
|
|
||||||
|
|
||||||
return hs
|
|
||||||
}
|
|
||||||
|
|
||||||
func shortPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart {
|
|
||||||
hdr := r.Header().ToV2().GetHeader()
|
|
||||||
|
|
||||||
sh := new(objectV2.ShortHeader)
|
|
||||||
sh.SetOwnerID(hdr.GetOwnerID())
|
|
||||||
sh.SetCreationEpoch(hdr.GetCreationEpoch())
|
|
||||||
sh.SetPayloadLength(hdr.GetPayloadLength())
|
|
||||||
sh.SetVersion(hdr.GetVersion())
|
|
||||||
sh.SetObjectType(hdr.GetObjectType())
|
|
||||||
|
|
||||||
return sh
|
|
||||||
}
|
|
||||||
|
|
||||||
func splitInfoResponse(info *object.SplitInfo) *objectV2.HeadResponse {
|
|
||||||
resp := new(objectV2.HeadResponse)
|
|
||||||
|
|
||||||
body := new(objectV2.HeadResponseBody)
|
|
||||||
resp.SetBody(body)
|
|
||||||
|
|
||||||
body.SetHeaderPart(info.ToV2())
|
|
||||||
|
|
||||||
return resp
|
|
||||||
}
|
|
Loading…
Reference in a new issue