forked from TrueCloudLab/frostfs-node
[#57] services/object: Combine common service parameters
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
8cddbe58a6
commit
39c17253be
24 changed files with 109 additions and 61 deletions
|
@ -2,17 +2,20 @@ package getsvc
|
|||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
type Prm struct {
|
||||
local, full bool
|
||||
common *util.CommonPrm
|
||||
|
||||
full bool
|
||||
|
||||
addr *object.Address
|
||||
}
|
||||
|
||||
func (p *Prm) OnlyLocal(v bool) *Prm {
|
||||
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
|
||||
if p != nil {
|
||||
p.local = v
|
||||
p.common = v
|
||||
}
|
||||
|
||||
return p
|
||||
|
|
|
@ -37,7 +37,7 @@ func (s *Service) Get(ctx context.Context, prm *Prm) (*Streamer, error) {
|
|||
r, err := s.rngSvc.GetRange(ctx, new(rangesvc.Prm).
|
||||
WithAddress(prm.addr).
|
||||
FullRange().
|
||||
OnlyLocal(prm.local),
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not get range", s)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
func toPrm(req *objectV2.GetRequest) *getsvc.Prm {
|
||||
|
@ -11,7 +12,7 @@ func toPrm(req *objectV2.GetRequest) *getsvc.Prm {
|
|||
WithAddress(
|
||||
object.NewAddressFromV2(req.GetBody().GetAddress()),
|
||||
).
|
||||
OnlyLocal(req.GetMetaHeader().GetTTL() == 1) // FIXME: use constant
|
||||
WithCommonPrm(util.CommonPrmFromV2(req))
|
||||
}
|
||||
|
||||
func fromResponse(res *getsvc.Streamer) objectV2.GetObjectStreamer {
|
||||
|
|
|
@ -61,7 +61,7 @@ func (h *distributedHeader) prepare(ctx context.Context, prm *Prm) error {
|
|||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.local {
|
||||
if prm.common.LocalOnly() {
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(builder, h.localAddrSrc)
|
||||
}
|
||||
|
|
|
@ -2,17 +2,20 @@ package headsvc
|
|||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
type Prm struct {
|
||||
local, short bool
|
||||
common *util.CommonPrm
|
||||
|
||||
short bool
|
||||
|
||||
addr *object.Address
|
||||
}
|
||||
|
||||
func (p *Prm) OnlyLocal(v bool) *Prm {
|
||||
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
|
||||
if p != nil {
|
||||
p.local = v
|
||||
p.common = v
|
||||
}
|
||||
|
||||
return p
|
||||
|
|
|
@ -63,7 +63,7 @@ func (s *Service) Head(ctx context.Context, prm *Prm) (*Response, error) {
|
|||
r, err := (&distributedHeader{
|
||||
cfg: s.cfg,
|
||||
}).head(ctx, prm)
|
||||
if err == nil || prm.local {
|
||||
if err == nil || prm.common.LocalOnly() {
|
||||
return r, err
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
func toPrm(req *objectV2.HeadRequest) *headsvc.Prm {
|
||||
|
@ -14,7 +15,7 @@ func toPrm(req *objectV2.HeadRequest) *headsvc.Prm {
|
|||
object.NewAddressFromV2(body.GetAddress()),
|
||||
).
|
||||
Short(body.GetMainOnly()).
|
||||
OnlyLocal(req.GetMetaHeader().GetTTL() == 1) // FIXME: use constant
|
||||
WithCommonPrm(util.CommonPrmFromV2(req))
|
||||
}
|
||||
|
||||
func fromResponse(r *headsvc.Response, short bool) *objectV2.HeadResponse {
|
||||
|
|
|
@ -3,11 +3,12 @@ package putsvc
|
|||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
)
|
||||
|
||||
type PutInitPrm struct {
|
||||
local bool
|
||||
common *util.CommonPrm
|
||||
|
||||
hdr *object.RawObject
|
||||
|
||||
|
@ -20,6 +21,14 @@ type PutChunkPrm struct {
|
|||
chunk []byte
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithCommonPrm(v *util.CommonPrm) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.common = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.hdr = v
|
||||
|
@ -36,14 +45,6 @@ func (p *PutInitPrm) WithSession(v *token.SessionToken) *PutInitPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) OnlyLocal(v bool) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.local = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
||||
if p != nil {
|
||||
p.chunk = v
|
||||
|
|
|
@ -106,7 +106,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
|||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.local {
|
||||
if prm.common.LocalOnly() {
|
||||
// restrict success count to 1 stored copy (to local storage)
|
||||
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ type streamer struct {
|
|||
func (s *streamer) Send(req *object.PutRequest) (err error) {
|
||||
switch v := req.GetBody().GetObjectPart().(type) {
|
||||
case *object.PutObjectPartInit:
|
||||
if err = s.stream.Init(toInitPrm(v, req.GetMetaHeader().GetSessionToken(), req.GetMetaHeader().GetTTL())); err != nil {
|
||||
if err = s.stream.Init(toInitPrm(v, req)); err != nil {
|
||||
err = errors.Wrapf(err, "(%T) could not init object put stream", s)
|
||||
}
|
||||
case *object.PutObjectPartChunk:
|
||||
|
|
|
@ -1,27 +1,23 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
func toInitPrm(req *objectV2.PutObjectPartInit, t *session.SessionToken, ttl uint32) *putsvc.PutInitPrm {
|
||||
func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) *putsvc.PutInitPrm {
|
||||
oV2 := new(objectV2.Object)
|
||||
oV2.SetObjectID(req.GetObjectID())
|
||||
oV2.SetSignature(req.GetSignature())
|
||||
oV2.SetHeader(req.GetHeader())
|
||||
oV2.SetObjectID(part.GetObjectID())
|
||||
oV2.SetSignature(part.GetSignature())
|
||||
oV2.SetHeader(part.GetHeader())
|
||||
|
||||
return new(putsvc.PutInitPrm).
|
||||
WithObject(
|
||||
object.NewRawFromV2(oV2),
|
||||
).
|
||||
WithSession(
|
||||
token.NewSessionTokenFromV2(t),
|
||||
).
|
||||
OnlyLocal(ttl == 1) // FIXME: use constant
|
||||
WithCommonPrm(util.CommonPrmFromV2(req))
|
||||
}
|
||||
|
||||
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {
|
||||
|
|
|
@ -2,19 +2,22 @@ package rangesvc
|
|||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
type Prm struct {
|
||||
local, full bool
|
||||
common *util.CommonPrm
|
||||
|
||||
full bool
|
||||
|
||||
addr *object.Address
|
||||
|
||||
rng *object.Range
|
||||
}
|
||||
|
||||
func (p *Prm) OnlyLocal(v bool) *Prm {
|
||||
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
|
||||
if p != nil {
|
||||
p.local = v
|
||||
p.common = v
|
||||
}
|
||||
|
||||
return p
|
||||
|
|
|
@ -59,7 +59,7 @@ func NewService(opts ...Option) *Service {
|
|||
func (s *Service) GetRange(ctx context.Context, prm *Prm) (*Result, error) {
|
||||
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
||||
WithAddress(prm.addr).
|
||||
OnlyLocal(prm.local),
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
|
||||
|
@ -114,7 +114,7 @@ func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *objuti
|
|||
|
||||
head, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
||||
WithAddress(addr).
|
||||
OnlyLocal(prm.local),
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not receive object header", s)
|
||||
|
|
|
@ -104,7 +104,7 @@ func (p *streamer) switchToObject(id *object.ID) error {
|
|||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if p.prm.local {
|
||||
if p.prm.common.LocalOnly() {
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(builder, p.localAddrSrc)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
func toPrm(req *objectV2.GetRangeRequest) *rangesvc.Prm {
|
||||
|
@ -14,7 +15,7 @@ func toPrm(req *objectV2.GetRangeRequest) *rangesvc.Prm {
|
|||
object.NewAddressFromV2(body.GetAddress()),
|
||||
).
|
||||
WithRange(object.NewRangeFromV2(body.GetRange())).
|
||||
OnlyLocal(req.GetMetaHeader().GetTTL() == 1) // FIXME: use constant
|
||||
WithCommonPrm(util.CommonPrmFromV2(req))
|
||||
}
|
||||
|
||||
func fromResponse(stream rangesvc.Streamer) objectV2.GetRangeObjectStreamer {
|
||||
|
|
|
@ -58,7 +58,7 @@ func (h *distributedHasher) prepare(ctx context.Context, prm *Prm) error {
|
|||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.local {
|
||||
if prm.common.LocalOnly() {
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(builder, h.localAddrSrc)
|
||||
}
|
||||
|
|
|
@ -3,10 +3,11 @@ package rangehashsvc
|
|||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
type Prm struct {
|
||||
local bool
|
||||
common *util.CommonPrm
|
||||
|
||||
addr *object.Address
|
||||
|
||||
|
@ -17,9 +18,9 @@ type Prm struct {
|
|||
salt []byte
|
||||
}
|
||||
|
||||
func (p *Prm) OnlyLocal(v bool) *Prm {
|
||||
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
|
||||
if p != nil {
|
||||
p.local = v
|
||||
p.common = v
|
||||
}
|
||||
|
||||
return p
|
||||
|
|
|
@ -65,7 +65,7 @@ func NewService(opts ...Option) *Service {
|
|||
func (s *Service) GetRangeHash(ctx context.Context, prm *Prm) (*Response, error) {
|
||||
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
||||
WithAddress(prm.addr).
|
||||
OnlyLocal(prm.local),
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
|
||||
|
@ -124,7 +124,7 @@ func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.Ra
|
|||
|
||||
head, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
||||
WithAddress(addr).
|
||||
OnlyLocal(prm.local),
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive object header", s)
|
||||
|
@ -167,9 +167,9 @@ func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.Ra
|
|||
// here we cannot receive SHA256 checksum through GetRangeHash service
|
||||
// since SHA256 is not homomorphic
|
||||
res, err := s.rangeSvc.GetRange(ctx, new(rangesvc.Prm).
|
||||
OnlyLocal(prm.local).
|
||||
WithAddress(addr).
|
||||
WithRange(nextRng),
|
||||
WithRange(nextRng).
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive payload range for %v checksum", s, prm.typ)
|
||||
|
@ -187,10 +187,10 @@ func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.Ra
|
|||
resp, err := (&distributedHasher{
|
||||
cfg: s.cfg,
|
||||
}).head(ctx, new(Prm).
|
||||
OnlyLocal(prm.local).
|
||||
WithAddress(addr).
|
||||
WithChecksumType(prm.typ).
|
||||
FromRanges(nextRng),
|
||||
FromRanges(nextRng).
|
||||
WithCommonPrm(prm.common),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not receive %v checksum", s, prm.typ)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||
rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
@ -33,9 +34,9 @@ func toPrm(req *objectV2.GetRangeHashRequest) (*rangehashsvc.Prm, error) {
|
|||
WithAddress(
|
||||
object.NewAddressFromV2(body.GetAddress()),
|
||||
).
|
||||
OnlyLocal(req.GetMetaHeader().GetTTL() == 1). // FIXME: use constant
|
||||
WithChecksumType(typ).
|
||||
FromRanges(rngs...), nil
|
||||
FromRanges(rngs...).
|
||||
WithCommonPrm(util.CommonPrmFromV2(req)), nil
|
||||
}
|
||||
|
||||
func fromResponse(r *rangehashsvc.Response, typ refs.ChecksumType) *objectV2.GetRangeHashResponse {
|
||||
|
|
|
@ -3,10 +3,11 @@ package searchsvc
|
|||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
type Prm struct {
|
||||
local bool
|
||||
common *util.CommonPrm
|
||||
|
||||
cid *container.ID
|
||||
|
||||
|
@ -29,9 +30,9 @@ func (p *Prm) WithSearchQuery(v query.Query) *Prm {
|
|||
return p
|
||||
}
|
||||
|
||||
func (p *Prm) OnlyLocal(v bool) *Prm {
|
||||
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
|
||||
if p != nil {
|
||||
p.local = v
|
||||
p.common = v
|
||||
}
|
||||
|
||||
return p
|
||||
|
|
|
@ -108,7 +108,7 @@ func (p *Streamer) preparePrm(prm *Prm) error {
|
|||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.local {
|
||||
if prm.common.LocalOnly() {
|
||||
// restrict success count to 1 stored copy (to local storage)
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(1))
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ func NewService(opts ...Option) *Service {
|
|||
|
||||
// Search calls internal service and returns v2 search object streamer.
|
||||
func (s *Service) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
|
||||
prm, err := toPrm(req.GetBody(), req.GetMetaHeader().GetTTL())
|
||||
prm, err := toPrm(req.GetBody(), req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not convert search parameters", s)
|
||||
}
|
||||
|
|
|
@ -7,17 +7,18 @@ import (
|
|||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
|
||||
queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func toPrm(req *object.SearchRequestBody, ttl uint32) (*searchsvc.Prm, error) {
|
||||
func toPrm(body *object.SearchRequestBody, req *object.SearchRequest) (*searchsvc.Prm, error) {
|
||||
var q query.Query
|
||||
|
||||
switch v := req.GetVersion(); v {
|
||||
switch v := body.GetVersion(); v {
|
||||
default:
|
||||
return nil, errors.Errorf("unsupported query version #%d", v)
|
||||
case 1:
|
||||
fs := req.GetFilters()
|
||||
fs := body.GetFilters()
|
||||
fsV1 := make([]*queryV1.Filter, 0, len(fs))
|
||||
|
||||
for _, f := range fs {
|
||||
|
@ -34,10 +35,10 @@ func toPrm(req *object.SearchRequestBody, ttl uint32) (*searchsvc.Prm, error) {
|
|||
|
||||
return new(searchsvc.Prm).
|
||||
WithContainerID(
|
||||
container.NewIDFromV2(req.GetContainerID()),
|
||||
container.NewIDFromV2(body.GetContainerID()),
|
||||
).
|
||||
WithSearchQuery(q).
|
||||
OnlyLocal(ttl == 1), nil // FIXME: use constant
|
||||
WithCommonPrm(util.CommonPrmFromV2(req)), nil
|
||||
}
|
||||
|
||||
func fromResponse(r *searchsvc.Response) *object.SearchResponse {
|
||||
|
|
35
pkg/services/object/util/prm.go
Normal file
35
pkg/services/object/util/prm.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
)
|
||||
|
||||
type CommonPrm struct {
|
||||
local bool
|
||||
}
|
||||
|
||||
func (p *CommonPrm) WithLocalOnly(v bool) *CommonPrm {
|
||||
if p != nil {
|
||||
p.local = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *CommonPrm) LocalOnly() bool {
|
||||
if p != nil {
|
||||
return p.local
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func CommonPrmFromV2(req interface {
|
||||
GetMetaHeader() *session.RequestMetaHeader
|
||||
}) *CommonPrm {
|
||||
meta := req.GetMetaHeader()
|
||||
|
||||
return &CommonPrm{
|
||||
local: meta.GetTTL() <= 1, // FIXME: use constant
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue