Multiple copy numbers #221

Merged
fyrchik merged 1 commit from feat/multiple-copies-number into master 2023-05-05 16:07:14 +00:00
4 changed files with 40 additions and 8 deletions
Showing only changes of commit 604ca51264 - Show all commits

View file

@ -32,6 +32,7 @@ Changelog for FrostFS Node
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
- Tree service now saves the last synchronization height which persists across restarts (#82)
- Add tracing support (#135)
- Multiple (and a fix for single) copies number support for `PUT` requests (#221)
### Changed
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects

View file

@ -42,6 +42,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
return p
}
func (p *PutInitPrm) WithCopyNumbers(v []uint32) *PutInitPrm {
if p != nil && len(v) > 0 {
p.traverseOpts = append(p.traverseOpts, placement.WithCopyNumbers(v))
}
return p
}
func (p *PutInitPrm) WithRelay(f func(context.Context, client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
if p != nil {
p.relay = f

View file

@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
object.NewFromV2(oV2),
).
WithRelay(s.relayRequest).
WithCommonPrm(commonPrm), nil
WithCommonPrm(commonPrm).
WithCopyNumbers(part.GetCopiesNumber()), nil
}
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {

View file

@ -38,6 +38,7 @@ type Traverser struct {
type cfg struct {
trackCopies bool
copyNumbers []uint32
flatSuccess *uint32
@ -84,19 +85,23 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
return nil, fmt.Errorf("could not build placement: %w", err)
}
// backward compatibility for scalar `copies_number`
if len(cfg.copyNumbers) == 1 {
cfg.flatSuccess = &cfg.copyNumbers[0]
}
var rem []int
if cfg.flatSuccess != nil {
ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)}
} else {
replNum := cfg.policy.NumberOfReplicas()
rem = make([]int, 0, replNum)
rem = defaultCopiesVector(cfg.policy)
for i := 0; i < replNum; i++ {
if cfg.trackCopies {
rem = append(rem, int(cfg.policy.ReplicaNumberByIndex(i)))
} else {
rem = append(rem, -1)
for i := range rem {
if !cfg.trackCopies {
rem[i] = -1
} else if len(cfg.copyNumbers) > i {
rem[i] = int(cfg.copyNumbers[i])
}
}
}
@ -108,6 +113,17 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
}, nil
}
func defaultCopiesVector(policy netmap.PlacementPolicy) []int {
replNum := policy.NumberOfReplicas()
copyVector := make([]int, 0, replNum)
for i := 0; i < replNum; i++ {
copyVector = append(copyVector, int(policy.ReplicaNumberByIndex(i)))
}
return copyVector
}
func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
sz := 0
for i := range ns {
@ -265,3 +281,9 @@ func WithoutSuccessTracking() Option {
c.trackCopies = false
}
}
func WithCopyNumbers(v []uint32) Option {
return func(c *cfg) {
c.copyNumbers = v
}
}