From ee58b390bb8954c7631188dec17cc3ae8ae59701 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Tue, 11 Apr 2023 21:01:00 +0300
Subject: [PATCH] [#221] node: Allow using vector `copies_number`
Also, take into account that value in general (it was not used before at
all).
Signed-off-by: Pavel Karpy
---
CHANGELOG.md | 1 +
pkg/services/object/put/prm.go | 8 +++++
pkg/services/object/put/v2/util.go | 3 +-
.../object_manager/placement/traverser.go | 36 +++++++++++++++----
4 files changed, 40 insertions(+), 8 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 281386256..6ed434da5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -34,6 +34,7 @@ Changelog for FrostFS Node
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
- Tree service now saves the last synchronization height which persists across restarts (#82)
- Add tracing support (#135)
+- Multiple (and a fix for single) copies number support for `PUT` requests (#221)
### Changed
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go
index 27d9c9c7a..c8d1b29a2 100644
--- a/pkg/services/object/put/prm.go
+++ b/pkg/services/object/put/prm.go
@@ -42,6 +42,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
return p
}
+func (p *PutInitPrm) WithCopyNumbers(v []uint32) *PutInitPrm {
+ if p != nil && len(v) > 0 {
+ p.traverseOpts = append(p.traverseOpts, placement.WithCopyNumbers(v))
+ }
+
+ return p
+}
+
func (p *PutInitPrm) WithRelay(f func(context.Context, client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
if p != nil {
p.relay = f
diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go
index 790f061f1..758470f6c 100644
--- a/pkg/services/object/put/v2/util.go
+++ b/pkg/services/object/put/v2/util.go
@@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
object.NewFromV2(oV2),
).
WithRelay(s.relayRequest).
- WithCommonPrm(commonPrm), nil
+ WithCommonPrm(commonPrm).
+ WithCopyNumbers(part.GetCopiesNumber()), nil
}
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {
diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go
index 75d5fbfd1..e46240a86 100644
--- a/pkg/services/object_manager/placement/traverser.go
+++ b/pkg/services/object_manager/placement/traverser.go
@@ -38,6 +38,7 @@ type Traverser struct {
type cfg struct {
trackCopies bool
+ copyNumbers []uint32
flatSuccess *uint32
@@ -84,19 +85,23 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
return nil, fmt.Errorf("could not build placement: %w", err)
}
+ // backward compatibility for scalar `copies_number`
+ if len(cfg.copyNumbers) == 1 {
+ cfg.flatSuccess = &cfg.copyNumbers[0]
+ }
+
var rem []int
if cfg.flatSuccess != nil {
ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)}
} else {
- replNum := cfg.policy.NumberOfReplicas()
- rem = make([]int, 0, replNum)
+ rem = defaultCopiesVector(cfg.policy)
- for i := 0; i < replNum; i++ {
- if cfg.trackCopies {
- rem = append(rem, int(cfg.policy.ReplicaNumberByIndex(i)))
- } else {
- rem = append(rem, -1)
+ for i := range rem {
+ if !cfg.trackCopies {
+ rem[i] = -1
+ } else if len(cfg.copyNumbers) > i {
+ rem[i] = int(cfg.copyNumbers[i])
}
}
}
@@ -108,6 +113,17 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
}, nil
}
+func defaultCopiesVector(policy netmap.PlacementPolicy) []int {
+ replNum := policy.NumberOfReplicas()
+ copyVector := make([]int, 0, replNum)
+
+ for i := 0; i < replNum; i++ {
+ copyVector = append(copyVector, int(policy.ReplicaNumberByIndex(i)))
+ }
+
+ return copyVector
+}
+
func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
sz := 0
for i := range ns {
@@ -265,3 +281,9 @@ func WithoutSuccessTracking() Option {
c.trackCopies = false
}
}
+
+func WithCopyNumbers(v []uint32) Option {
+ return func(c *cfg) {
+ c.copyNumbers = v
+ }
+}