forked from TrueCloudLab/frostfs-node
[#38] service/object: Implement simplified object Head service
Implement Head service w/o linking object processing and restoration from split-chain. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
21fc85540a
commit
05f3963975
9 changed files with 487 additions and 0 deletions
138
pkg/services/object/head/distributed.go
Normal file
138
pkg/services/object/head/distributed.go
Normal file
|
@ -0,0 +1,138 @@
|
|||
package headsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type distributedHeader struct {
|
||||
*cfg
|
||||
|
||||
w *onceHeaderWriter
|
||||
|
||||
traverser *placement.Traverser
|
||||
}
|
||||
|
||||
func (h *distributedHeader) head(ctx context.Context, prm *Prm) (*Response, error) {
|
||||
if err := h.prepare(ctx, prm); err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h)
|
||||
}
|
||||
|
||||
return h.finish(ctx, prm)
|
||||
}
|
||||
|
||||
func (h *distributedHeader) prepare(ctx context.Context, prm *Prm) error {
|
||||
var err error
|
||||
|
||||
// get latest network map
|
||||
nm, err := netmap.GetLatestNetworkMap(h.netMapSrc)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not get latest network map", h)
|
||||
}
|
||||
|
||||
// get container to store the object
|
||||
cnr, err := h.cnrSrc.Get(prm.addr.GetContainerID())
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not get container by ID", h)
|
||||
}
|
||||
|
||||
// allocate placement traverser options
|
||||
traverseOpts := make([]placement.Option, 0, 4)
|
||||
|
||||
// add common options
|
||||
traverseOpts = append(traverseOpts,
|
||||
// set processing container
|
||||
placement.ForContainer(cnr),
|
||||
// set success count (1st incoming header)
|
||||
placement.SuccessAfter(1),
|
||||
)
|
||||
|
||||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.local {
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(placement.NewNetworkMapBuilder(nm), h.localAddrSrc)
|
||||
}
|
||||
|
||||
// set placement builder
|
||||
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
|
||||
|
||||
// build placement traverser
|
||||
if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not build placement traverser", h)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *distributedHeader) finish(ctx context.Context, prm *Prm) (*Response, error) {
|
||||
resp := new(Response)
|
||||
|
||||
h.w = &onceHeaderWriter{
|
||||
once: new(sync.Once),
|
||||
traverser: h.traverser,
|
||||
resp: resp,
|
||||
}
|
||||
|
||||
ctx, h.w.cancel = context.WithCancel(ctx)
|
||||
|
||||
loop:
|
||||
for {
|
||||
addrs := h.traverser.Next()
|
||||
if len(addrs) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
|
||||
for i := range addrs {
|
||||
wg.Add(1)
|
||||
|
||||
addr := addrs[i]
|
||||
|
||||
if err := h.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
var header interface {
|
||||
head(context.Context, *Prm, func(*object.Object)) error
|
||||
}
|
||||
|
||||
if network.IsLocalAddress(h.localAddrSrc, addr) {
|
||||
header = &localHeader{
|
||||
storage: h.localStore,
|
||||
}
|
||||
} else {
|
||||
header = &remoteHeader{
|
||||
key: h.key,
|
||||
node: addr,
|
||||
}
|
||||
}
|
||||
|
||||
if err := header.head(ctx, prm, h.w.write); err != nil {
|
||||
// TODO: log error
|
||||
return
|
||||
}
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
// TODO: log error
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
if !h.traverser.Success() {
|
||||
return nil, errors.Errorf("(%T) incomplete object Head operation", h)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue