Respect TTL for EC put #1288

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/ec_put_respect_ttl into master 2024-08-02 13:01:45 +00:00

View file

@ -17,6 +17,7 @@ import (
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -39,7 +40,7 @@ type ecWriter struct {
} }
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx) relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil { if err != nil {
return err return err
} }
@ -65,7 +66,7 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return e.writeRawObject(ctx, obj) return e.writeRawObject(ctx, obj)
} }
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) { func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil { if e.relay == nil {
return false, nil return false, nil
} }
@ -77,7 +78,13 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
// object can be splitted or saved local // object can be splitted or saved local
return false, nil return false, nil
} }
if err := e.relayToContainerNode(ctx); err != nil { objID := object.AddressOf(obj).Object()
var index uint32
if obj.ECHeader() != nil {
objID = obj.ECHeader().Parent()
index = obj.ECHeader().Index()
}
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err return false, err
} }
return true, nil return true, nil
@ -102,18 +109,20 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
return false, nil return false, nil
} }
func (e *ecWriter) relayToContainerNode(ctx context.Context) error { func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(e.placementOpts...) t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil { if err != nil {
return err return err
} }
var lastErr error var lastErr error
offset := int(index)
for { for {
nodes := t.Next() nodes := t.Next()
if len(nodes) == 0 { if len(nodes) == 0 {
break break
} }
for _, node := range nodes { for idx := range nodes {
node := nodes[(idx+offset)%len(nodes)]
var info client.NodeInfo var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node) client.NodeInfoFromNetmapElement(&info, node)
@ -149,6 +158,10 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
} }
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj)
}
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...) t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil { if err != nil {
return err return err