forked from TrueCloudLab/frostfs-node
[#1288] putSvc: Respect TTL for EC put
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
ef4cea6d19
commit
8021bacc43
1 changed files with 19 additions and 6 deletions
|
@ -17,6 +17,7 @@ import (
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
@ -39,7 +40,7 @@ type ecWriter struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
relayed, err := e.relayIfNotContainerNode(ctx)
|
relayed, err := e.relayIfNotContainerNode(ctx, obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -65,7 +66,7 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
||||||
return e.writeRawObject(ctx, obj)
|
return e.writeRawObject(ctx, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
|
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||||
if e.relay == nil {
|
if e.relay == nil {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
@ -77,7 +78,13 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
|
||||||
// object can be splitted or saved local
|
// object can be splitted or saved local
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if err := e.relayToContainerNode(ctx); err != nil {
|
objID := object.AddressOf(obj).Object()
|
||||||
|
var index uint32
|
||||||
|
if obj.ECHeader() != nil {
|
||||||
|
objID = obj.ECHeader().Parent()
|
||||||
|
index = obj.ECHeader().Index()
|
||||||
|
}
|
||||||
|
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
|
@ -102,18 +109,20 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
|
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
|
||||||
t, err := placement.NewTraverser(e.placementOpts...)
|
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var lastErr error
|
var lastErr error
|
||||||
|
offset := int(index)
|
||||||
for {
|
for {
|
||||||
nodes := t.Next()
|
nodes := t.Next()
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
for _, node := range nodes {
|
for idx := range nodes {
|
||||||
|
node := nodes[(idx+offset)%len(nodes)]
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
client.NodeInfoFromNetmapElement(&info, node)
|
client.NodeInfoFromNetmapElement(&info, node)
|
||||||
|
|
||||||
|
@ -149,6 +158,10 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
if e.commonPrm.LocalOnly() {
|
||||||
|
return e.writePartLocal(ctx, obj)
|
||||||
|
}
|
||||||
|
|
||||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in a new issue