Add EC replication #1129

Merged
dstepanov-yadro merged 7 commits from dstepanov-yadro/frostfs-node:feat/ec_restore into master 2024-09-04 19:51:08 +00:00
25 changed files with 1440 additions and 143 deletions

View file

@ -13,7 +13,7 @@ type keySpaceIterator struct {
cur *engine.Cursor cur *engine.Cursor
} }
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) { func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.Info, error) {
var prm engine.ListWithCursorPrm var prm engine.ListWithCursorPrm
prm.WithCursor(it.cur) prm.WithCursor(it.cur)
prm.WithCount(batchSize) prm.WithCount(batchSize)

View file

@ -29,7 +29,6 @@ import (
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2" getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2" putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
@ -231,7 +230,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
return err return err
} }
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor) remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
pol := policer.New( pol := policer.New(
policer.WithLogger(c.log), policer.WithLogger(c.log),
@ -242,11 +241,33 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
placement.NewNetworkMapSourceBuilder(c.netMapSource), placement.NewNetworkMapSourceBuilder(c.netMapSource),
), ),
policer.WithRemoteObjectHeaderFunc( policer.WithRemoteObjectHeaderFunc(
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) { func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a) prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw)
return remoteHeader.Head(ctx, prm) return remoteReader.Head(ctx, prm)
}, },
), ),
policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
var prm engine.HeadPrm
fyrchik marked this conversation as resolved
Review

Why have you decided to go with 2 func options instead of an interface?

Why have you decided to go with 2 func options instead of an interface?
Review

Because this approach has already been used in policer.

Because this approach has already been used in policer.
prm.WithAddress(a)
res, err := c.cfgObject.cfgLocalStorage.localStorage.Head(ctx, prm)
if err != nil {
return nil, err
}
return res.Header(), nil
}),
policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a)
return remoteReader.Get(ctx, prm)
}),
policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
var prm engine.GetPrm
prm.WithAddress(a)
res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm)
if err != nil {
return nil, err
}
return res.Object(), nil
}),
policer.WithNetmapKeys(c), policer.WithNetmapKeys(c),
policer.WithHeadTimeout( policer.WithHeadTimeout(
policerconfig.HeadTimeout(c.appCfg), policerconfig.HeadTimeout(c.appCfg),
@ -265,6 +286,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
}), }),
policer.WithPool(c.cfgObject.pool.replication), policer.WithPool(c.cfgObject.pool.replication),
policer.WithMetrics(c.metricsCollector.PolicerMetrics()), policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
policer.WithKeyStorage(keyStorage),
) )
c.workers = append(c.workers, worker{ c.workers = append(c.workers, worker{
@ -297,6 +319,9 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
replicator.WithRemoteSender( replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, cache), putsvc.NewRemoteSender(keyStorage, cache),
), ),
replicator.WithRemoteGetter(
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
),
replicator.WithMetrics(c.metricsCollector.Replicator()), replicator.WithMetrics(c.metricsCollector.Replicator()),
) )
} }

View file

@ -529,4 +529,16 @@ const (
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available" EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
ECFailedToSendToContainerNode = "failed to send EC object to container node" ECFailedToSendToContainerNode = "failed to send EC object to container node"
ECFailedToSaveECPart = "failed to save EC part" ECFailedToSaveECPart = "failed to save EC part"
PolicerNodeIsNotECObjectNode = "current node is not EC object node"
PolicerFailedToGetLocalECChunks = "failed to get local EC chunks"
PolicerMissingECChunk = "failed to find EC chunk on any of the nodes"
PolicerFailedToDecodeECChunkID = "failed to decode EC chunk ID"
PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk"
ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage"
ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage"
PolicerCouldNotGetObjectFromNodeMoving = "could not get EC object from the node, moving current chunk to the node"
PolicerCouldNotRestoreObjectNotEnoughChunks = "could not restore EC object: not enough chunks"
PolicerFailedToRestoreObject = "failed to restore EC object"
PolicerCouldNotGetChunk = "could not get EC chunk"
PolicerCouldNotGetChunks = "could not get EC chunks"
) )

View file

@ -167,17 +167,29 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
token := obj.SessionToken() token := obj.SessionToken()
ownerID := obj.OwnerID() ownerID := obj.OwnerID()
if token == nil && obj.ECHeader() != nil {
role, err := v.isIROrContainerNode(obj, binKey)
if err != nil {
return err
}
if role == acl.RoleContainer {
// EC part could be restored or created by container node, so ownerID could not match object signature
return nil
}
return v.checkOwnerKey(ownerID, key)
}
if token == nil || !token.AssertAuthKey(&key) { if token == nil || !token.AssertAuthKey(&key) {
return v.checkOwnerKey(ownerID, key) return v.checkOwnerKey(ownerID, key)
} }
if v.verifyTokenIssuer { if v.verifyTokenIssuer {
signerIsIROrContainerNode, err := v.isIROrContainerNode(obj, binKey) role, err := v.isIROrContainerNode(obj, binKey)
if err != nil { if err != nil {
return err return err
} }
if signerIsIROrContainerNode { if role == acl.RoleContainer || role == acl.RoleInnerRing {
return nil return nil
} }
@ -190,10 +202,10 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
return nil return nil
} }
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (bool, error) { func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (acl.Role, error) {
cnrID, containerIDSet := obj.ContainerID() cnrID, containerIDSet := obj.ContainerID()
if !containerIDSet { if !containerIDSet {
return false, errNilCID return acl.RoleOthers, errNilCID
} }
cnrIDBin := make([]byte, sha256.Size) cnrIDBin := make([]byte, sha256.Size)
@ -201,14 +213,14 @@ func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey [
cnr, err := v.containers.Get(cnrID) cnr, err := v.containers.Get(cnrID)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err) return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
} }
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value) res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
if err != nil { if err != nil {
return false, err return acl.RoleOthers, err
} }
return res.Role == acl.RoleContainer || res.Role == acl.RoleInnerRing, nil return res.Role, nil
} }
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error { func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {

View file

@ -7,14 +7,21 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
// AddressWithType groups object address with its FrostFS type ECInfo struct {
// object type. ParentID oid.ID
type AddressWithType struct { Index uint32
Total uint32
}
// Info groups object address with its FrostFS
// object info.
type Info struct {
Address oid.Address Address oid.Address
Type objectSDK.Type Type objectSDK.Type
IsLinkingObject bool IsLinkingObject bool
ECInfo *ECInfo
} }
func (v AddressWithType) String() string { func (v Info) String() string {
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject) return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
} }

View file

@ -622,7 +622,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
return shards, nil return shards, nil
} }
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",

View file

@ -68,12 +68,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
// ListWithCursorRes contains values returned from ListWithCursor operation. // ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct { type ListWithCursorRes struct {
addrList []objectcore.AddressWithType addrList []objectcore.Info
cursor *Cursor cursor *Cursor
} }
// AddressList returns addresses selected by ListWithCursor operation. // AddressList returns addresses selected by ListWithCursor operation.
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType { func (l ListWithCursorRes) AddressList() []objectcore.Info {
return l.addrList return l.addrList
} }
@ -98,7 +98,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
// Returns ErrEndOfListing if there are no more objects to return or count // Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero. // parameter set to zero.
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) { func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
result := make([]objectcore.AddressWithType, 0, prm.count) result := make([]objectcore.Info, 0, prm.count)
// Set initial cursors // Set initial cursors
cursor := prm.cursor cursor := prm.cursor

View file

@ -76,8 +76,8 @@ func TestListWithCursor(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
expected := make([]object.AddressWithType, 0, tt.objectNum) expected := make([]object.Info, 0, tt.objectNum)
got := make([]object.AddressWithType, 0, tt.objectNum) got := make([]object.Info, 0, tt.objectNum)
for i := 0; i < tt.objectNum; i++ { for i := 0; i < tt.objectNum; i++ {
containerID := cidtest.ID() containerID := cidtest.ID()
@ -88,7 +88,7 @@ func TestListWithCursor(t *testing.T) {
err := e.Put(context.Background(), prm) err := e.Put(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) expected = append(expected, object.Info{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
} }
var prm ListWithCursorPrm var prm ListWithCursorPrm

View file

@ -47,12 +47,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) {
// ListRes contains values returned from ListWithCursor operation. // ListRes contains values returned from ListWithCursor operation.
type ListRes struct { type ListRes struct {
addrList []objectcore.AddressWithType addrList []objectcore.Info
cursor *Cursor cursor *Cursor
} }
// AddressList returns addresses selected by ListWithCursor operation. // AddressList returns addresses selected by ListWithCursor operation.
func (l ListRes) AddressList() []objectcore.AddressWithType { func (l ListRes) AddressList() []objectcore.Info {
return l.addrList return l.addrList
} }
@ -89,7 +89,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
return res, ErrDegradedMode return res, ErrDegradedMode
} }
result := make([]objectcore.AddressWithType, 0, prm.count) result := make([]objectcore.Info, 0, prm.count)
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
@ -99,7 +99,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor threshold := cursor == nil // threshold is a flag to ignore cursor
var bucketName []byte var bucketName []byte
var err error var err error
@ -183,11 +183,11 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID cnt cid.ID, // container ID
to []objectcore.AddressWithType, // listing result to []objectcore.Info, // listing result
limit int, // stop listing at `limit` items in result limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately threshold bool, // ignore cursor and start immediately
) ([]objectcore.AddressWithType, []byte, *Cursor, error) { ) ([]objectcore.Info, []byte, *Cursor, error) {
if cursor == nil { if cursor == nil {
cursor = new(Cursor) cursor = new(Cursor)
} }
@ -219,18 +219,27 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
} }
var isLinkingObj bool var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular { if objType == objectSDK.TypeRegular {
var o objectSDK.Object var o objectSDK.Object
if err := o.Unmarshal(v); err != nil { if err := o.Unmarshal(v); err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
isLinkingObj = isLinkObject(&o) isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
} }
var a oid.Address var a oid.Address
a.SetContainer(cnt) a.SetContainer(cnt)
a.SetObject(obj) a.SetObject(obj)
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj}) to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
count++ count++
} }

View file

@ -77,7 +77,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
total = containers * 4 // regular + ts + child + lock total = containers * 4 // regular + ts + child + lock
) )
expected := make([]object.AddressWithType, 0, total) expected := make([]object.Info, 0, total)
// fill metabase with objects // fill metabase with objects
for i := 0; i < containers; i++ { for i := 0; i < containers; i++ {
@ -88,21 +88,21 @@ func TestLisObjectsWithCursor(t *testing.T) {
obj.SetType(objectSDK.TypeRegular) obj.SetType(objectSDK.TypeRegular)
err := putBig(db, obj) err := putBig(db, obj)
require.NoError(t, err) require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular}) expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
// add one tombstone // add one tombstone
obj = testutil.GenerateObjectWithCID(containerID) obj = testutil.GenerateObjectWithCID(containerID)
obj.SetType(objectSDK.TypeTombstone) obj.SetType(objectSDK.TypeTombstone)
err = putBig(db, obj) err = putBig(db, obj)
require.NoError(t, err) require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone}) expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
// add one lock // add one lock
obj = testutil.GenerateObjectWithCID(containerID) obj = testutil.GenerateObjectWithCID(containerID)
obj.SetType(objectSDK.TypeLock) obj.SetType(objectSDK.TypeLock)
err = putBig(db, obj) err = putBig(db, obj)
require.NoError(t, err) require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock}) expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
// add one inhumed (do not include into expected) // add one inhumed (do not include into expected)
obj = testutil.GenerateObjectWithCID(containerID) obj = testutil.GenerateObjectWithCID(containerID)
@ -124,12 +124,12 @@ func TestLisObjectsWithCursor(t *testing.T) {
child.SetSplitID(splitID) child.SetSplitID(splitID)
err = putBig(db, child) err = putBig(db, child)
require.NoError(t, err) require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
} }
t.Run("success with various count", func(t *testing.T) { t.Run("success with various count", func(t *testing.T) {
for countPerReq := 1; countPerReq <= total; countPerReq++ { for countPerReq := 1; countPerReq <= total; countPerReq++ {
got := make([]object.AddressWithType, 0, total) got := make([]object.Info, 0, total)
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil) res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
require.NoError(t, err, "count:%d", countPerReq) require.NoError(t, err, "count:%d", countPerReq)
@ -211,7 +211,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
} }
} }
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) { func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.Info, *meta.Cursor, error) {
var listPrm meta.ListPrm var listPrm meta.ListPrm
listPrm.SetCount(count) listPrm.SetCount(count)
listPrm.SetCursor(cursor) listPrm.SetCursor(cursor)

View file

@ -42,7 +42,7 @@ type ListWithCursorPrm struct {
// ListWithCursorRes contains values returned from ListWithCursor operation. // ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct { type ListWithCursorRes struct {
addrList []objectcore.AddressWithType addrList []objectcore.Info
cursor *Cursor cursor *Cursor
} }
@ -59,7 +59,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
} }
// AddressList returns addresses selected by ListWithCursor operation. // AddressList returns addresses selected by ListWithCursor operation.
func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType { func (r ListWithCursorRes) AddressList() []objectcore.Info {
return r.addrList return r.addrList
} }

View file

@ -77,7 +77,7 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
Obj: obj, Obj: obj,
Nodes: nodes, Nodes: nodes,
} }
fyrchik marked this conversation as resolved Outdated

HandleReplicationTask? handle is a verb, replicate is a verb too

`HandleReplicationTask`? `handle` is a verb, `replicate` is a verb too

Done

Done
s.replicator.HandleTask(ctx, task, &res) s.replicator.HandleReplicationTask(ctx, task, &res)
if res.count == 0 { if res.count == 0 {
return errors.New("object was not replicated") return errors.New("object was not replicated")

View file

@ -0,0 +1,55 @@
package getsvc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type RemoteGetPrm struct {
Address oid.Address
Node netmapSDK.NodeInfo
}
type RemoteGetter struct {
s remoteStorageConstructor
es epochSource
ks keyStorage
}
func (g *RemoteGetter) Get(ctx context.Context, prm RemoteGetPrm) (*objectSDK.Object, error) {
var nodeInfo client.NodeInfo
if err := client.NodeInfoFromRawNetmapElement(&nodeInfo, netmapCore.Node(prm.Node)); err != nil {
return nil, err
}
rs, err := g.s.Get(nodeInfo)
if err != nil {
return nil, err
}
epoch, err := g.es.Epoch()
if err != nil {
return nil, err
}
key, err := g.ks.GetKey(nil)
if err != nil {
return nil, err
}
r := RemoteRequestParams{
Epoch: epoch,
TTL: 1,
PrivateKey: key,
}
return rs.Get(ctx, prm.Address, r)
}
func NewRemoteGetter(cc clientConstructor, es epochSource, ks keyStorage) *RemoteGetter {
return &RemoteGetter{
s: &multiclientRemoteStorageConstructor{clientConstructor: cc},
es: es,
ks: ks,
}
}

View file

@ -1,17 +0,0 @@
package headsvc
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type Prm struct {
addr oid.Address
}
func (p *Prm) WithAddress(v oid.Address) *Prm {
if p != nil {
p.addr = v
}
return p
}

View file

@ -34,10 +34,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var ( var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
errInvalidECObject = errors.New("object must be splitted to EC parts")
)
type putSingleRequestSigner struct { type putSingleRequestSigner struct {
req *objectAPI.PutSingleRequest req *objectAPI.PutSingleRequest
@ -181,10 +178,6 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac
} }
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
return errInvalidECObject
}
commonPrm, err := svcutil.CommonPrmFromV2(req) commonPrm, err := svcutil.CommonPrmFromV2(req)
if err != nil { if err != nil {
return err return err

View file

@ -1,4 +1,4 @@
package headsvc package object
import ( import (
"context" "context"
@ -18,18 +18,18 @@ type ClientConstructor interface {
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error) Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
} }
// RemoteHeader represents utility for getting // RemoteReader represents utility for getting
// the object header from a remote host. // the object from a remote host.
type RemoteHeader struct { type RemoteReader struct {
keyStorage *util.KeyStorage keyStorage *util.KeyStorage
clientCache ClientConstructor clientCache ClientConstructor
} }
// RemoteHeadPrm groups remote header operation parameters. // RemoteRequestPrm groups remote operation parameters.
type RemoteHeadPrm struct { type RemoteRequestPrm struct {
commonHeadPrm *Prm addr oid.Address
raw bool
node netmap.NodeInfo node netmap.NodeInfo
} }
@ -37,16 +37,16 @@ const remoteOpTTL = 1
var ErrNotFound = errors.New("object header not found") var ErrNotFound = errors.New("object header not found")
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance. // NewRemoteReader creates, initializes and returns new RemoteHeader instance.
func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteHeader { func NewRemoteReader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteReader {
return &RemoteHeader{ return &RemoteReader{
keyStorage: keyStorage, keyStorage: keyStorage,
clientCache: cache, clientCache: cache,
} }
} }
// WithNodeInfo sets information about the remote node. // WithNodeInfo sets information about the remote node.
func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm { func (p *RemoteRequestPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteRequestPrm {
if p != nil { if p != nil {
p.node = v p.node = v
} }
@ -55,16 +55,23 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
} }
// WithObjectAddress sets object address. // WithObjectAddress sets object address.
func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm { func (p *RemoteRequestPrm) WithObjectAddress(v oid.Address) *RemoteRequestPrm {
if p != nil { if p != nil {
p.commonHeadPrm = new(Prm).WithAddress(v) p.addr = v
} }
return p return p
} }
func (p *RemoteRequestPrm) WithRaw(v bool) *RemoteRequestPrm {
if p != nil {
p.raw = v
}
return p
}
// Head requests object header from the remote node. // Head requests object header from the remote node.
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) { func (h *RemoteReader) Head(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
key, err := h.keyStorage.GetKey(nil) key, err := h.keyStorage.GetKey(nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err) return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
@ -86,8 +93,11 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
headPrm.SetClient(c) headPrm.SetClient(c)
headPrm.SetPrivateKey(key) headPrm.SetPrivateKey(key)
headPrm.SetAddress(prm.commonHeadPrm.addr) headPrm.SetAddress(prm.addr)
headPrm.SetTTL(remoteOpTTL) headPrm.SetTTL(remoteOpTTL)
if prm.raw {
headPrm.SetRawFlag()
}
res, err := internalclient.HeadObject(ctx, headPrm) res, err := internalclient.HeadObject(ctx, headPrm)
if err != nil { if err != nil {
@ -96,3 +106,39 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
return res.Header(), nil return res.Header(), nil
} }
func (h *RemoteReader) Get(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
key, err := h.keyStorage.GetKey(nil)
if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
}
var info clientcore.NodeInfo
err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(prm.node))
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := h.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err)
}
var getPrm internalclient.GetObjectPrm
getPrm.SetClient(c)
getPrm.SetPrivateKey(key)
getPrm.SetAddress(prm.addr)
getPrm.SetTTL(remoteOpTTL)
if prm.raw {
getPrm.SetRawFlag()
}
res, err := internalclient.GetObject(ctx, getPrm)
if err != nil {
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err)
}
return res.Object(), nil
}

View file

@ -18,19 +18,15 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error { func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
addr := addrWithType.Address cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
idCnr := addr.Container()
idObj := addr.Object()
cnr, err := p.cnrSrc.Get(idCnr)
if err != nil { if err != nil {
if client.IsErrContainerNotFound(err) { if client.IsErrContainerNotFound(err) {
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr) existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container())
if errWasRemoved != nil { if errWasRemoved != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved) return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
} else if existed { } else if existed {
err := p.buryFn(ctx, addrWithType.Address) err := p.buryFn(ctx, objInfo.Address)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err) return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
} }
@ -41,11 +37,16 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
} }
policy := cnr.Value.PlacementPolicy() policy := cnr.Value.PlacementPolicy()
if policycore.IsECPlacement(policy) {
// EC not supported yet by policer
return nil
}
if policycore.IsECPlacement(policy) {
return p.processECContainerObject(ctx, objInfo, policy)
}
return p.processRepContainerObject(ctx, objInfo, policy)
}
func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
idObj := objInfo.Address.Object()
idCnr := objInfo.Address.Container()
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
@ -53,11 +54,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
c := &placementRequirements{} c := &placementRequirements{}
var numOfContainerNodes int
fyrchik marked this conversation as resolved Outdated

Seems like a separate fix, unrelated to the commit

Seems like a separate fix, unrelated to the commit

Done

Done
for i := range nn {
numOfContainerNodes += len(nn[i])
}
// cached info about already checked nodes // cached info about already checked nodes
checkedNodes := newNodeCache() checkedNodes := newNodeCache()
@ -68,15 +64,24 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
default: default:
} }
p.processNodes(ctx, c, addrWithType, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes) shortage := policy.ReplicaDescriptor(i).NumberOfObjects()
if objInfo.Type == objectSDK.TypeLock || objInfo.Type == objectSDK.TypeTombstone || objInfo.IsLinkingObject {
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
// for correct object removal protection:
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nn[i]))
}
p.processRepNodes(ctx, c, objInfo, nn[i], shortage, checkedNodes)
} }
if !c.needLocalCopy && c.removeLocalCopy { if !c.needLocalCopy && c.removeLocalCopy {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
zap.Stringer("object", addr), zap.Stringer("object", objInfo.Address),
) )
p.cbRedundantCopy(ctx, addr) p.cbRedundantCopy(ctx, objInfo.Address)
} }
return nil return nil
} }
@ -89,23 +94,13 @@ type placementRequirements struct {
removeLocalCopy bool removeLocalCopy bool
} }
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType, func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info,
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache, nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache,
) { ) {
addr := addrWithType.Address addr := objInfo.Address
typ := addrWithType.Type
// Number of copies that are stored on maintenance nodes. // Number of copies that are stored on maintenance nodes.
var uncheckedCopies int var uncheckedCopies int
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
// for correct object removal protection:
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nodes))
}
for i := 0; shortage > 0 && i < len(nodes); i++ { for i := 0; shortage > 0 && i < len(nodes); i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -133,7 +128,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader(callCtx, nodes[i], addr) _, err := p.remoteHeader(callCtx, nodes[i], addr, false)
cancel() cancel()
@ -196,7 +191,7 @@ func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address
Nodes: nodes, Nodes: nodes,
} }
p.replicator.HandleTask(ctx, task, checkedNodes) p.replicator.HandleReplicationTask(ctx, task, checkedNodes)
case uncheckedCopies > 0: case uncheckedCopies > 0:
// If we have more copies than needed, but some of them are from the maintenance nodes, // If we have more copies than needed, but some of them are from the maintenance nodes,

390
pkg/services/policer/ec.go Normal file
View file

@ -0,0 +1,390 @@
package policer
import (
"context"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var errNoECinfoReturnded = errors.New("no EC info returned")
type ecChunkProcessResult struct {
validPlacement bool
removeLocal bool
}
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector")
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
if objInfo.ECInfo == nil {
return p.processECContainerRepObject(ctx, objInfo, policy)
}
return p.processECContainerECObject(ctx, objInfo, policy)
}
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
// All of them must be stored on all of the container nodes.
func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
objID := objInfo.Address.Object()
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objID, policy)
if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
}
fyrchik marked this conversation as resolved Outdated

len(nn) is currently 1 for all EC policies.
Can we simplify the code having this in mind?
Because when it won't be 1, we will also likely have different EC X.Y or REP N descriptors for different indexes in this array.

The distinction between REP/EC could be done based on each nn element, not on the uppermost level.

`len(nn)` is currently 1 for all EC policies. Can we simplify the code having this in mind? Because when it won't be 1, we will also likely have different `EC X.Y` or `REP N` descriptors for different indexes in this array. The distinction between `REP`/`EC` could be done based on each `nn` element, not on the uppermost level.

fixed

fixed
if len(nn) != 1 {
return errInvalidECPlacement
}
c := &placementRequirements{}
checkedNodes := newNodeCache()
fyrchik marked this conversation as resolved Outdated

Could you elaborate a bit what this comment is trying to explain and why it is here?

Could you elaborate a bit what this comment is trying to explain and why it is here?

Refactored, see last commit.

Refactored, see last commit.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
p.processRepNodes(ctx, c, objInfo, nn[0], uint32(len(nn[0])), checkedNodes)
if !c.needLocalCopy && c.removeLocalCopy {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
zap.Stringer("object", objInfo.Address),
)
p.cbRedundantCopy(ctx, objInfo.Address)
}
return nil
}
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
}
if len(nn) != 1 {
return errInvalidECPlacement
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
res := p.processECChunk(ctx, objInfo, nn[0])
if !res.validPlacement {
// drop local chunk only if all required chunks are in place
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0])
}
p.adjustECPlacement(ctx, objInfo, nn[0], policy)
if res.removeLocal {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
p.cbRedundantCopy(ctx, objInfo.Address)
}
return nil
}
// processECChunk replicates EC chunk if needed.
fyrchik marked this conversation as resolved Outdated

It returns a struct.

It returns a `struct`.

Fixed

Fixed
func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) ecChunkProcessResult {
var removeLocalChunk bool
requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))]
if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) {
// current node is required node, we are happy
return ecChunkProcessResult{
validPlacement: true,
}
}
if requiredNode.IsMaintenance() {
// consider maintenance mode has object, but do not drop local copy
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
return ecChunkProcessResult{}
}
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address, false)
cancel()
if err == nil {
removeLocalChunk = true
} else if client.IsErrObjectNotFound(err) {
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", objInfo.Address), zap.Uint32("shortage", 1))
task := replicator.Task{
NumCopies: 1,
Addr: objInfo.Address,
Nodes: []netmap.NodeInfo{requiredNode},
}
p.replicator.HandleReplicationTask(ctx, task, newNodeCache())
} else if isClientErrMaintenance(err) {
// consider maintenance mode has object, but do not drop local copy
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
} else {
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error()))
}
return ecChunkProcessResult{
removeLocal: removeLocalChunk,
}
}
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool {
var parentAddress oid.Address
parentAddress.SetContainer(objInfo.Address.Container())
parentAddress.SetObject(objInfo.ECInfo.ParentID)
requiredChunkIndexes := p.collectRequiredECChunks(nodes, objInfo)
if len(requiredChunkIndexes) == 0 {
p.log.Info(logs.PolicerNodeIsNotECObjectNode, zap.Stringer("object", objInfo.ECInfo.ParentID))
return true
}
err := p.resolveLocalECChunks(ctx, parentAddress, requiredChunkIndexes)
if err != nil {
p.log.Error(logs.PolicerFailedToGetLocalECChunks, zap.Error(err), zap.Stringer("object", parentAddress))
return false
}
if len(requiredChunkIndexes) == 0 {
return true
}
indexToObjectID := make(map[uint32]oid.ID)
success := p.resolveRemoteECChunks(ctx, parentAddress, nodes, requiredChunkIndexes, indexToObjectID)
if !success {
return false
}
for index, candidates := range requiredChunkIndexes {
var addr oid.Address
addr.SetContainer(objInfo.Address.Container())
addr.SetObject(indexToObjectID[index])
p.replicator.HandlePullTask(ctx, replicator.Task{
Review

Just for curiosity: do we always need to pull missing chunks from remote node?
processECChunk sets removeLocal = true but a chunk, that is being replicated, is not removed yet until pullRequiredECChunks's run. Could a policer try to reconstruct missing chunk if the local node, by good fortune, keeps required chunks for reconstruction?

I suppose this is not a big deal and barely gives a big profit...

Just for curiosity: do we *always* need to pull missing chunks from remote node? `processECChunk` sets `removeLocal = true` but a chunk, that is being replicated, is not removed yet until `pullRequiredECChunks`'s run. Could a policer try to `reconstruct` missing chunk if the local node, *by good fortune*, keeps required chunks for reconstruction? I suppose this is not a big deal and barely gives a big profit...
Review

By default local node should keep only one chunk.

By default local node should keep only one chunk.
Addr: addr,
Nodes: candidates,
})
}
// there was some missing chunks, it's not ok
return false
}
func (p *Policer) collectRequiredECChunks(nodes []netmap.NodeInfo, objInfo objectcore.Info) map[uint32][]netmap.NodeInfo {
fyrchik marked this conversation as resolved Outdated

By design there is only 1 required chunk to store on the node (EC policy implies UNIQUE).
Why do we return a map here? It seems the code could be greatly simplified, having this in mind.

_By design_ there is only 1 required chunk to store on the node (EC policy implies `UNIQUE`). Why do we return a map here? It seems the code could be greatly simplified, having this in mind.

Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks.

Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks.

Evacuation does not affect the set of required chunks in any way.

Evacuation does not affect the set of required chunks in any way.

Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy.

Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy.

Yes, but it is moved to a node which should not store it, and I have thought this function determines whether the node should store it?
Another way to look at it: policer must depend only on the network map and placement policy, evacuation is a separate process.

Yes, but it is moved to a node which _should not_ store it, and I have thought this function determines whether the node _should_ store it? Another way to look at it: policer _must_ depend only on the network map and placement policy, evacuation is a separate process.

Actually, it may happen in a situation when we have EC 3.1 and 1 node went offline, so we have 3 nodes in the network map.
What is the behaviour in this case?

Actually, it may happen in a situation when we have `EC 3.1` and 1 node went offline, so we have 3 nodes in the network map. What is the behaviour in this case?

this function determines whether the node should store it

This function collects all nodes that store required chunks.

What is the behaviour in this case?

Policer will get an error from offline node and will skip object.

> this function determines whether the node should store it This function collects all nodes that store required chunks. > What is the behaviour in this case? Policer will get an error from offline node and will skip object.
requiredChunkIndexes := make(map[uint32][]netmap.NodeInfo)
for i, n := range nodes {
if uint32(i) == objInfo.ECInfo.Total {
break
}
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
requiredChunkIndexes[uint32(i)] = []netmap.NodeInfo{}
}
}
return requiredChunkIndexes
}
func (p *Policer) resolveLocalECChunks(ctx context.Context, parentAddress oid.Address, required map[uint32][]netmap.NodeInfo) error {
_, err := p.localHeader(ctx, parentAddress)
var eiErr *objectSDK.ECInfoError
if err == nil { // should not be happen
return errNoECinfoReturnded
}
if !errors.As(err, &eiErr) {
return err
}
for _, ch := range eiErr.ECInfo().Chunks {
delete(required, ch.Index)
}
return nil
}
func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.Address, nodes []netmap.NodeInfo, required map[uint32][]netmap.NodeInfo, indexToObjectID map[uint32]oid.ID) bool {
var eiErr *objectSDK.ECInfoError
for _, n := range nodes {
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
continue
}
_, err := p.remoteHeader(ctx, n, parentAddress, true)
if !errors.As(err, &eiErr) {
continue
}
for _, ch := range eiErr.ECInfo().Chunks {
if candidates, ok := required[ch.Index]; ok {
candidates = append(candidates, n)
required[ch.Index] = candidates
var chunkID oid.ID
if err := chunkID.ReadFromV2(ch.ID); err != nil {
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
return false
}
if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID {
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed),
zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
return false
}
indexToObjectID[ch.Index] = chunkID
}
}
}
for index, candidates := range required {
if len(candidates) == 0 {
p.log.Error(logs.PolicerMissingECChunk, zap.Stringer("object", parentAddress), zap.Uint32("index", index))
return false
}
}
return true
}
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) {
var parentAddress oid.Address
parentAddress.SetContainer(objInfo.Address.Container())
parentAddress.SetObject(objInfo.ECInfo.ParentID)
var eiErr *objectSDK.ECInfoError
resolved := make(map[uint32][]netmap.NodeInfo)
chunkIDs := make(map[uint32]oid.ID)
restore := true // do not restore EC chunks if some node returned error
for idx, n := range nodes {
if uint32(idx) >= objInfo.ECInfo.Total && uint32(len(resolved)) == objInfo.ECInfo.Total {
return
}
var err error
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
_, err = p.localHeader(ctx, parentAddress)
} else {
_, err = p.remoteHeader(ctx, n, parentAddress, true)
}
if errors.As(err, &eiErr) {
for _, ch := range eiErr.ECInfo().Chunks {
resolved[ch.Index] = append(resolved[ch.Index], n)
var ecInfoChunkID oid.ID
if err := ecInfoChunkID.ReadFromV2(ch.ID); err != nil {
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
return
}
if chunkID, exist := chunkIDs[ch.Index]; exist && chunkID != ecInfoChunkID {
aarifullin marked this conversation as resolved Outdated

I found existed, exist is not appropriate pair name. You could rename chunkID to ecInfoChunkID, but existed to chunkID

I found `existed, exist` is not appropriate pair name. You could rename `chunkID` to `ecInfoChunkID`, but `existed` to `chunkID`

Done

Done
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", chunkID),
aarifullin marked this conversation as resolved Outdated

Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs?

Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs?

A bug that I hope we don't have :)

A bug that I hope we don't have :)
zap.Stringer("second", ecInfoChunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
return
}
chunkIDs[ch.Index] = ecInfoChunkID
}
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
p.log.Warn(logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
p.replicator.HandleReplicationTask(ctx, replicator.Task{
NumCopies: 1,
Addr: objInfo.Address,
Nodes: []netmap.NodeInfo{n},
}, newNodeCache())
restore = false
}
}
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
return
}
if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() {
var found []uint32
fyrchik marked this conversation as resolved Outdated

Do you mean found? It is already a past participle from find, founded is what happened with Saint-Petersburg in 1703.

Do you mean `found`? It is already a past participle from `find`, `founded` is what happened with Saint-Petersburg in 1703.

Fixed

Fixed
for i := range resolved {
found = append(found, i)
}
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
return
}
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy)
}
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) {
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
if err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
parts := p.collectExistedChunks(ctx, objInfo, existedChunks, parentAddress, chunkIDs)
if parts == nil {
return
}
key, err := p.keyStorage.GetKey(nil)
if err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
required := make([]bool, len(parts))
for i, p := range parts {
if p == nil {
required[i] = true
}
}
fyrchik marked this conversation as resolved Outdated

You call Reconstruct(), then you call Split on the result. These operations are inverse to each other.
What is the purpose?

You call `Reconstruct()`, then you call `Split` on the result. These operations are inverse to each other. What is the purpose?

Fixed

Fixed
if err := c.ReconstructParts(parts, required, key); err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
for idx, part := range parts {
if _, exists := existedChunks[uint32(idx)]; exists {
continue
}
var addr oid.Address
addr.SetContainer(parentAddress.Container())
pID, _ := part.ID()
addr.SetObject(pID)
targetNode := nodes[idx%len(nodes)]
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
Addr: addr,
Obj: part,
})
} else {
p.replicator.HandleReplicationTask(ctx, replicator.Task{
NumCopies: 1,
Addr: addr,
Nodes: []netmap.NodeInfo{targetNode},
Obj: part,
}, newNodeCache())
}
}
}
func (p *Policer) collectExistedChunks(ctx context.Context, objInfo objectcore.Info, existedChunks map[uint32][]netmap.NodeInfo, parentAddress oid.Address, chunkIDs map[uint32]oid.ID) []*objectSDK.Object {
parts := make([]*objectSDK.Object, objInfo.ECInfo.Total)
errGroup, egCtx := errgroup.WithContext(ctx)
for idx, nodes := range existedChunks {
idx := idx
nodes := nodes
errGroup.Go(func() error {
var objID oid.Address
objID.SetContainer(parentAddress.Container())
objID.SetObject(chunkIDs[idx])
var obj *objectSDK.Object
var err error
for _, node := range nodes {
if p.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
obj, err = p.localObject(egCtx, objID)
} else {
obj, err = p.remoteObject(egCtx, node, objID)
}
if err == nil {
break
}
p.log.Warn(logs.PolicerCouldNotGetChunk, zap.Stringer("object", parentAddress), zap.Stringer("chunkID", objID), zap.Error(err), zap.String("node", hex.EncodeToString(node.PublicKey())))
}
if obj != nil {
parts[idx] = obj
}
return nil
})
}
if err := errGroup.Wait(); err != nil {
p.log.Error(logs.PolicerCouldNotGetChunks, zap.Stringer("object", parentAddress), zap.Error(err))
return nil
}
return parts
}

View file

@ -0,0 +1,565 @@
package policer
import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
func TestECChunkHasValidPlacement(t *testing.T) {
t.Parallel()
chunkAddress := oidtest.Address()
parentID := oidtest.ID()
var policy netmapSDK.PlacementPolicy
require.NoError(t, policy.DecodeString("EC 2.1"))
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(chunkAddress.Container()) {
return cnr, nil
}
return nil, new(apistatus.ContainerNotFound)
},
}
nodes := make([]netmapSDK.NodeInfo, 4)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
return [][]netmapSDK.NodeInfo{nodes}, nil
}
return nil, errors.New("unexpected placement build")
}
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
require.True(t, raw, "remote header for parent object must be called with raw flag")
index := int(ni.PublicKey()[0])
require.True(t, index == 1 || index == 2, "invalid node to get parent header")
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = uint32(index)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(remoteHeadFn),
WithLocalObjectHeaderFunc(localHeadFn),
WithPool(testPool(t)),
)
objInfo := objectcore.Info{
Address: chunkAddress,
Type: objectSDK.TypeRegular,
ECInfo: &objectcore.ECInfo{
ParentID: parentID,
Index: 0,
Total: 3,
},
}
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
}
func TestECChunkHasInvalidPlacement(t *testing.T) {
t.Parallel()
chunkAddress := oidtest.Address()
parentID := oidtest.ID()
chunkObject := objectSDK.New()
chunkObject.SetContainerID(chunkAddress.Container())
chunkObject.SetID(chunkAddress.Object())
chunkObject.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
chunkObject.SetPayloadSize(uint64(10))
chunkObject.SetECHeader(objectSDK.NewECHeader(parentID, 1, 3, []byte{}, 0))
var policy netmapSDK.PlacementPolicy
require.NoError(t, policy.DecodeString("EC 2.1"))
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(chunkAddress.Container()) {
return cnr, nil
}
return nil, new(apistatus.ContainerNotFound)
},
}
nodes := make([]netmapSDK.NodeInfo, 4)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
return [][]netmapSDK.NodeInfo{nodes}, nil
}
return nil, errors.New("unexpected placement build")
}
objInfo := objectcore.Info{
Address: chunkAddress,
Type: objectSDK.TypeRegular,
ECInfo: &objectcore.ECInfo{
ParentID: parentID,
Index: 1,
Total: 3,
},
}
t.Run("node0 has chunk1, node1 has chunk0 and chunk1", func(t *testing.T) {
// policer should pull chunk0 on first run and drop chunk1 on second run
var allowDrop bool
requiredChunkID := oidtest.ID()
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
return chunkObject, nil
}
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.Index = 0
ch.SetID(requiredChunkID)
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 2
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
return nil, new(apistatus.ObjectNotFound)
}
require.Fail(t, "unexpected remote HEAD")
return nil, fmt.Errorf("unexpected remote HEAD")
}
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
if allowDrop {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.SetID(requiredChunkID)
ch.Index = 0
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var pullCounter atomic.Int64
var dropped []oid.Address
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithLocalObjectHeaderFunc(localHeadF),
WithReplicator(&testReplicator{
handlePullTask: (func(ctx context.Context, r replicator.Task) {
require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID &&
len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task")
pullCounter.Add(1)
}),
}),
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
require.True(t, allowDrop, "invalid redundent copy call")
dropped = append(dropped, a)
}),
WithPool(testPool(t)),
)
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
require.Equal(t, 0, len(dropped), "invalid dropped count")
allowDrop = true
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
require.Equal(t, 1, len(dropped), "invalid dropped count")
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
})
t.Run("node0 has chunk0 and chunk1, node1 has chunk1", func(t *testing.T) {
// policer should drop chunk1
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
return chunkObject, nil
}
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 2
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
return nil, new(apistatus.ObjectNotFound)
}
require.Fail(t, "unexpected remote HEAD")
return nil, fmt.Errorf("unexpected remote HEAD")
}
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.SetID(oidtest.ID())
ch.Index = 0
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var dropped []oid.Address
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithLocalObjectHeaderFunc(localHeadF),
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
dropped = append(dropped, a)
}),
WithPool(testPool(t)),
)
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 1, len(dropped), "invalid dropped count")
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
})
t.Run("node0 has chunk0 and chunk1, node1 has no chunks", func(t *testing.T) {
// policer should replicate chunk1 to node1 on first run and drop chunk1 on node0 on second run
var secondRun bool
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
if !secondRun {
return nil, new(apistatus.ObjectNotFound)
}
return chunkObject, nil
}
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 2
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
return nil, new(apistatus.ObjectNotFound)
}
require.Fail(t, "unexpected remote HEAD")
return nil, fmt.Errorf("unexpected remote HEAD")
}
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.SetID(oidtest.ID())
ch.Index = 0
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var dropped []oid.Address
var replicated []replicator.Task
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithLocalObjectHeaderFunc(localHeadF),
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
dropped = append(dropped, a)
}),
WithReplicator(&testReplicator{
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
replicated = append(replicated, t)
},
}),
WithPool(testPool(t)),
)
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 0, len(dropped), "invalid dropped count")
require.Equal(t, 1, len(replicated), "invalid replicated count")
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
secondRun = true
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 1, len(replicated), "invalid replicated count")
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
require.Equal(t, 1, len(dropped), "invalid dropped count")
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
})
}
func TestECChunkRestore(t *testing.T) {
// node0 has chunk0, node1 has chunk1
// policer should replicate chunk0 to node2 on the first run
// then restore EC object and replicate chunk2 to node2 on the second run
t.Parallel()
payload := make([]byte, 64)
rand.Read(payload)
parentAddress := oidtest.Address()
parentObject := objectSDK.New()
parentObject.SetContainerID(parentAddress.Container())
parentObject.SetPayload(payload)
parentObject.SetPayloadSize(64)
objectSDK.CalculateAndSetPayloadChecksum(parentObject)
err := objectSDK.CalculateAndSetID(parentObject)
require.NoError(t, err)
id, _ := parentObject.ID()
parentAddress.SetObject(id)
chunkIDs := make([]oid.ID, 3)
c, err := erasurecode.NewConstructor(2, 1)
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
chunks, err := c.Split(parentObject, &key.PrivateKey)
require.NoError(t, err)
for i, ch := range chunks {
chunkIDs[i], _ = ch.ID()
}
var policy netmapSDK.PlacementPolicy
require.NoError(t, policy.DecodeString("EC 2.1"))
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(parentAddress.Container()) {
return cnr, nil
}
return nil, new(apistatus.ContainerNotFound)
},
}
nodes := make([]netmapSDK.NodeInfo, 4)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
if cnr.Equals(parentAddress.Container()) && obj.Equals(parentAddress.Object()) {
return [][]netmapSDK.NodeInfo{nodes}, nil
}
return nil, errors.New("unexpected placement build")
}
var secondRun bool
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
require.True(t, raw, "remote header for parent object must be called with raw flag")
index := int(ni.PublicKey()[0])
require.True(t, index == 1 || index == 2 || index == 3, "invalid node to get parent header")
require.True(t, a == parentAddress, "invalid address to get remote header")
if index == 1 {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[1])
ch.Index = uint32(1)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if index == 2 && secondRun {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[0])
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
return nil, new(apistatus.ObjectNotFound)
}
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a == parentAddress, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[0])
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var replicatedObj []*objectSDK.Object
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(remoteHeadFn),
WithLocalObjectHeaderFunc(localHeadFn),
WithReplicator(&testReplicator{
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
if t.Obj != nil {
replicatedObj = append(replicatedObj, t.Obj)
}
},
}),
WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a.Container() == parentAddress.Container() && a.Object() == chunkIDs[0], "invalid local object request")
return chunks[0], nil
}),
WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
index := ni.PublicKey()[0]
if index == 2 {
return nil, new(apistatus.ObjectNotFound)
}
return chunks[index], nil
}),
WithPool(testPool(t)),
WithKeyStorage(util.NewKeyStorage(&key.PrivateKey, nil, nil)),
)
var chunkAddress oid.Address
chunkAddress.SetContainer(parentAddress.Container())
chunkAddress.SetObject(chunkIDs[0])
objInfo := objectcore.Info{
Address: chunkAddress,
Type: objectSDK.TypeRegular,
ECInfo: &objectcore.ECInfo{
ParentID: parentAddress.Object(),
Index: 0,
Total: 3,
},
}
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
secondRun = true
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 1, len(replicatedObj), "invalid replicated objects count")
chunks[2].SetSignature(nil)
expectedData, err := chunks[2].MarshalJSON()
require.NoError(t, err)
replicatedObj[0].SetSignature(nil)
actualData, err := replicatedObj[0].MarshalJSON()
require.NoError(t, err)
require.EqualValues(t, string(expectedData), string(actualData), "invalid restored objects")
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -22,7 +23,7 @@ import (
// Note that the underlying implementation might be circular: i.e. it can restart // Note that the underlying implementation might be circular: i.e. it can restart
// when the end of the key space is reached. // when the end of the key space is reached.
type KeySpaceIterator interface { type KeySpaceIterator interface {
Next(context.Context, uint32) ([]objectcore.AddressWithType, error) Next(context.Context, uint32) ([]objectcore.Info, error)
Rewind() Rewind()
} }
@ -35,11 +36,20 @@ type BuryFunc func(context.Context, oid.Address) error
// Replicator is the interface to a consumer of replication tasks. // Replicator is the interface to a consumer of replication tasks.
type Replicator interface { type Replicator interface {
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
HandlePullTask(ctx context.Context, task replicator.Task)
HandleLocalPutTask(ctx context.Context, task replicator.Task)
} }
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error) type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error)
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type cfg struct { type cfg struct {
headTimeout time.Duration headTimeout time.Duration
@ -56,6 +66,8 @@ type cfg struct {
remoteHeader RemoteObjectHeaderFunc remoteHeader RemoteObjectHeaderFunc
localHeader LocalObjectHeaderFunc
netmapKeys netmap.AnnouncedKeys netmapKeys netmap.AnnouncedKeys
replicator Replicator replicator Replicator
@ -69,6 +81,12 @@ type cfg struct {
evictDuration, sleepDuration time.Duration evictDuration, sleepDuration time.Duration
metrics MetricsRegister metrics MetricsRegister
remoteObject RemoteObjectGetFunc
localObject LocalObjectGetFunc
keyStorage *util.KeyStorage
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -125,13 +143,32 @@ func WithPlacementBuilder(v placement.Builder) Option {
} }
} }
// WithRemoteObjectHeader returns option to set object header receiver of Policer. // WithRemoteObjectHeader returns option to set remote object header receiver of Policer.
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option { func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
return func(c *cfg) { return func(c *cfg) {
c.remoteHeader = v c.remoteHeader = v
} }
} }
// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer.
func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
return func(c *cfg) {
c.localHeader = v
}
}
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
return func(c *cfg) {
c.remoteObject = v
}
}
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
return func(c *cfg) {
c.localObject = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys. // WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option { func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) { return func(c *cfg) {
@ -169,3 +206,9 @@ func WithMetrics(m MetricsRegister) Option {
c.metrics = m c.metrics = m
} }
} }
func WithKeyStorage(ks *util.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = ks
}
}

View file

@ -26,7 +26,7 @@ import (
func TestBuryObjectWithoutContainer(t *testing.T) { func TestBuryObjectWithoutContainer(t *testing.T) {
// Key space // Key space
addr := oidtest.Address() addr := oidtest.Address()
objs := []objectcore.AddressWithType{ objs := []objectcore.Info{
{ {
Address: addr, Address: addr,
Type: objectSDK.TypeRegular, Type: objectSDK.TypeRegular,
@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
maintenanceNodes []int maintenanceNodes []int
wantRemoveRedundant bool wantRemoveRedundant bool
wantReplicateTo []int wantReplicateTo []int
ecInfo *objectcore.ECInfo
}{ }{
{ {
desc: "1 copy already held by local node", desc: "1 copy already held by local node",
@ -144,6 +145,22 @@ func TestProcessObject(t *testing.T) {
objHolders: []int{1}, objHolders: []int{1},
maintenanceNodes: []int{2}, maintenanceNodes: []int{2},
}, },
{
desc: "lock object must be replicated to all EC nodes",
objType: objectSDK.TypeLock,
nodeCount: 3,
policy: `EC 1.1`,
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
{
desc: "tombstone object must be replicated to all EC nodes",
objType: objectSDK.TypeTombstone,
nodeCount: 3,
policy: `EC 1.1`,
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
} }
for i := range tests { for i := range tests {
@ -173,12 +190,15 @@ func TestProcessObject(t *testing.T) {
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) { if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
return placementVectors, nil return placementVectors, nil
} }
if ti.ecInfo != nil && cnr.Equals(addr.Container()) && obj != nil && obj.Equals(ti.ecInfo.ParentID) {
return placementVectors, nil
}
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj) t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
return nil, errors.New("unexpected placement build") return nil, errors.New("unexpected placement build")
} }
// Object remote header // Object remote header
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) { headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
index := int(ni.PublicKey()[0]) index := int(ni.PublicKey()[0])
if a != addr || index < 1 || index >= ti.nodeCount { if a != addr || index < 1 || index >= ti.nodeCount {
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a) t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
@ -229,18 +249,21 @@ func TestProcessObject(t *testing.T) {
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a) require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
gotRemoveRedundant = true gotRemoveRedundant = true
}), }),
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) { WithReplicator(&testReplicator{
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task) handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
for _, node := range task.Nodes { require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) for _, node := range task.Nodes {
} gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
})), }
},
}),
WithPool(testPool(t)), WithPool(testPool(t)),
) )
addrWithType := objectcore.AddressWithType{ addrWithType := objectcore.Info{
Address: addr, Address: addr,
Type: ti.objType, Type: ti.objType,
ECInfo: ti.ecInfo,
} }
err := p.processObject(context.Background(), addrWithType) err := p.processObject(context.Background(), addrWithType)
@ -276,7 +299,7 @@ func TestProcessObjectError(t *testing.T) {
WithPool(testPool(t)), WithPool(testPool(t)),
) )
addrWithType := objectcore.AddressWithType{ addrWithType := objectcore.Info{
Address: addr, Address: addr,
} }
@ -285,7 +308,7 @@ func TestProcessObjectError(t *testing.T) {
func TestIteratorContract(t *testing.T) { func TestIteratorContract(t *testing.T) {
addr := oidtest.Address() addr := oidtest.Address()
objs := []objectcore.AddressWithType{{ objs := []objectcore.Info{{
Address: addr, Address: addr,
Type: objectSDK.TypeRegular, Type: objectSDK.TypeRegular,
}} }}
@ -350,7 +373,7 @@ func testPool(t *testing.T) *ants.Pool {
} }
type nextResult struct { type nextResult struct {
objs []objectcore.AddressWithType objs []objectcore.Info
err error err error
} }
@ -361,7 +384,7 @@ type predefinedIterator struct {
calls []string calls []string
} }
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) { func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.Info, error) {
if it.pos == len(it.scenario) { if it.pos == len(it.scenario) {
close(it.finishCh) close(it.finishCh)
<-ctx.Done() <-ctx.Done()
@ -380,11 +403,11 @@ func (it *predefinedIterator) Rewind() {
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice. // sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
type sliceKeySpaceIterator struct { type sliceKeySpaceIterator struct {
objs []objectcore.AddressWithType objs []objectcore.Info
cur int cur int
} }
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) { func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.Info, error) {
if it.cur >= len(it.objs) { if it.cur >= len(it.objs) {
return nil, engine.ErrEndOfListing return nil, engine.ErrEndOfListing
} }
@ -419,9 +442,20 @@ type announcedKeysFunc func([]byte) bool
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
// replicatorFunc is a Replicator backed by a function. type testReplicator struct {
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult)
handleLocalPutTask func(ctx context.Context, task replicator.Task)
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { handlePullTask func(ctx context.Context, task replicator.Task)
f(ctx, task, res) }
func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
r.handleReplicationTask(ctx, task, res)
}
func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) {
r.handleLocalPutTask(ctx, task)
}
func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) {
r.handlePullTask(ctx, task)
} }

View file

@ -21,9 +21,9 @@ type TaskResult interface {
SubmitSuccessfulReplication(netmap.NodeInfo) SubmitSuccessfulReplication(netmap.NodeInfo)
} }
// HandleTask executes replication task inside invoking goroutine. // HandleReplicationTask executes replication task inside invoking goroutine.
// Passes all the nodes that accepted the replication to the TaskResult. // Passes all the nodes that accepted the replication to the TaskResult.
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) { func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res TaskResult) {
p.metrics.IncInFlightRequest() p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest() defer p.metrics.DecInFlightRequest()
defer func() { defer func() {
@ -32,7 +32,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
) )
}() }()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask", ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleReplicateTask",
trace.WithAttributes( trace.WithAttributes(
attribute.Stringer("address", task.Addr), attribute.Stringer("address", task.Addr),
attribute.Int64("number_of_copies", int64(task.NumCopies)), attribute.Int64("number_of_copies", int64(task.NumCopies)),

View file

@ -0,0 +1,72 @@
package replicator
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node")
func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest()
defer func() {
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
}()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandlePullTask",
trace.WithAttributes(
attribute.Stringer("address", task.Addr),
attribute.Int("nodes_count", len(task.Nodes)),
))
defer span.End()
var obj *objectSDK.Object
for _, node := range task.Nodes {
var err error
obj, err = p.remoteGetter.Get(ctx, getsvc.RemoteGetPrm{
Address: task.Addr,
Node: node,
})
if err == nil {
break
}
var endpoints []string
node.IterateNetworkEndpoints(func(s string) bool {
endpoints = append(endpoints, s)
return false
})
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
zap.Stringer("object", task.Addr),
zap.Error(err),
zap.Strings("endpoints", endpoints),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
if obj == nil {
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
zap.Stringer("object", task.Addr),
zap.Error(errFailedToGetObjectFromAnyNode),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
err := engine.Put(ctx, p.localStorage, obj)
if err != nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(err),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}

View file

@ -0,0 +1,47 @@
package replicator
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var errObjectNotDefined = errors.New("object is not defined")
func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest()
defer func() {
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
}()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleLocalPutTask",
trace.WithAttributes(
attribute.Stringer("address", task.Addr),
attribute.Int("nodes_count", len(task.Nodes)),
))
defer span.End()
if task.Obj == nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(errObjectNotDefined),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
err := engine.Put(ctx, p.localStorage, task.Obj)
if err != nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(err),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}

View file

@ -4,6 +4,7 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
@ -25,6 +26,8 @@ type cfg struct {
remoteSender *putsvc.RemoteSender remoteSender *putsvc.RemoteSender
remoteGetter *getsvc.RemoteGetter
localStorage *engine.StorageEngine localStorage *engine.StorageEngine
metrics MetricsRegister metrics MetricsRegister
@ -70,6 +73,12 @@ func WithRemoteSender(v *putsvc.RemoteSender) Option {
} }
} }
func WithRemoteGetter(v *getsvc.RemoteGetter) Option {
return func(c *cfg) {
c.remoteGetter = v
}
}
// WithLocalStorage returns option to set local object storage of Replicator. // WithLocalStorage returns option to set local object storage of Replicator.
func WithLocalStorage(v *engine.StorageEngine) Option { func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) { return func(c *cfg) {