Add EC replication #1129

Merged
dstepanov-yadro merged 7 commits from dstepanov-yadro/frostfs-node:feat/ec_restore into master 2024-09-04 19:51:08 +00:00
25 changed files with 1440 additions and 143 deletions

View file

@ -13,7 +13,7 @@ type keySpaceIterator struct {
cur *engine.Cursor
}
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.Info, error) {
var prm engine.ListWithCursorPrm
prm.WithCursor(it.cur)
prm.WithCount(batchSize)

View file

@ -29,7 +29,6 @@ import (
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
@ -231,7 +230,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
return err
}
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
pol := policer.New(
policer.WithLogger(c.log),
@ -242,11 +241,33 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
placement.NewNetworkMapSourceBuilder(c.netMapSource),
),
policer.WithRemoteObjectHeaderFunc(
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
return remoteHeader.Head(ctx, prm)
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw)
return remoteReader.Head(ctx, prm)
},
),
policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
var prm engine.HeadPrm
fyrchik marked this conversation as resolved
Review

Why have you decided to go with 2 func options instead of an interface?

Why have you decided to go with 2 func options instead of an interface?
Review

Because this approach has already been used in policer.

Because this approach has already been used in policer.
prm.WithAddress(a)
res, err := c.cfgObject.cfgLocalStorage.localStorage.Head(ctx, prm)
if err != nil {
return nil, err
}
return res.Header(), nil
}),
policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a)
return remoteReader.Get(ctx, prm)
}),
policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
var prm engine.GetPrm
prm.WithAddress(a)
res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm)
if err != nil {
return nil, err
}
return res.Object(), nil
}),
policer.WithNetmapKeys(c),
policer.WithHeadTimeout(
policerconfig.HeadTimeout(c.appCfg),
@ -265,6 +286,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
}),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
policer.WithKeyStorage(keyStorage),
)
c.workers = append(c.workers, worker{
@ -297,6 +319,9 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, cache),
),
replicator.WithRemoteGetter(
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
),
replicator.WithMetrics(c.metricsCollector.Replicator()),
)
}

View file

@ -529,4 +529,16 @@ const (
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
ECFailedToSendToContainerNode = "failed to send EC object to container node"
ECFailedToSaveECPart = "failed to save EC part"
PolicerNodeIsNotECObjectNode = "current node is not EC object node"
PolicerFailedToGetLocalECChunks = "failed to get local EC chunks"
PolicerMissingECChunk = "failed to find EC chunk on any of the nodes"
PolicerFailedToDecodeECChunkID = "failed to decode EC chunk ID"
PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk"
ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage"
ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage"
PolicerCouldNotGetObjectFromNodeMoving = "could not get EC object from the node, moving current chunk to the node"
PolicerCouldNotRestoreObjectNotEnoughChunks = "could not restore EC object: not enough chunks"
PolicerFailedToRestoreObject = "failed to restore EC object"
PolicerCouldNotGetChunk = "could not get EC chunk"
PolicerCouldNotGetChunks = "could not get EC chunks"
)

View file

@ -167,17 +167,29 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
token := obj.SessionToken()
ownerID := obj.OwnerID()
if token == nil && obj.ECHeader() != nil {
role, err := v.isIROrContainerNode(obj, binKey)
if err != nil {
return err
}
if role == acl.RoleContainer {
// EC part could be restored or created by container node, so ownerID could not match object signature
return nil
}
return v.checkOwnerKey(ownerID, key)
}
if token == nil || !token.AssertAuthKey(&key) {
return v.checkOwnerKey(ownerID, key)
}
if v.verifyTokenIssuer {
signerIsIROrContainerNode, err := v.isIROrContainerNode(obj, binKey)
role, err := v.isIROrContainerNode(obj, binKey)
if err != nil {
return err
}
if signerIsIROrContainerNode {
if role == acl.RoleContainer || role == acl.RoleInnerRing {
return nil
}
@ -190,10 +202,10 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
return nil
}
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (bool, error) {
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (acl.Role, error) {
cnrID, containerIDSet := obj.ContainerID()
if !containerIDSet {
return false, errNilCID
return acl.RoleOthers, errNilCID
}
cnrIDBin := make([]byte, sha256.Size)
@ -201,14 +213,14 @@ func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey [
cnr, err := v.containers.Get(cnrID)
if err != nil {
return false, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
}
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
if err != nil {
return false, err
return acl.RoleOthers, err
}
return res.Role == acl.RoleContainer || res.Role == acl.RoleInnerRing, nil
return res.Role, nil
}
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {

View file

@ -7,14 +7,21 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
// AddressWithType groups object address with its FrostFS
// object type.
type AddressWithType struct {
type ECInfo struct {
ParentID oid.ID
Index uint32
Total uint32
}
// Info groups object address with its FrostFS
// object info.
type Info struct {
Address oid.Address
Type objectSDK.Type
IsLinkingObject bool
ECInfo *ECInfo
}
func (v AddressWithType) String() string {
func (v Info) String() string {
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
}

View file

@ -622,7 +622,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
return shards, nil
}
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",

View file

@ -68,12 +68,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []objectcore.AddressWithType
addrList []objectcore.Info
cursor *Cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
func (l ListWithCursorRes) AddressList() []objectcore.Info {
return l.addrList
}
@ -98,7 +98,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
result := make([]objectcore.AddressWithType, 0, prm.count)
result := make([]objectcore.Info, 0, prm.count)
// Set initial cursors
cursor := prm.cursor

View file

@ -76,8 +76,8 @@ func TestListWithCursor(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
expected := make([]object.AddressWithType, 0, tt.objectNum)
got := make([]object.AddressWithType, 0, tt.objectNum)
expected := make([]object.Info, 0, tt.objectNum)
got := make([]object.Info, 0, tt.objectNum)
for i := 0; i < tt.objectNum; i++ {
containerID := cidtest.ID()
@ -88,7 +88,7 @@ func TestListWithCursor(t *testing.T) {
err := e.Put(context.Background(), prm)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
expected = append(expected, object.Info{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
}
var prm ListWithCursorPrm

View file

@ -47,12 +47,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) {
// ListRes contains values returned from ListWithCursor operation.
type ListRes struct {
addrList []objectcore.AddressWithType
addrList []objectcore.Info
cursor *Cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListRes) AddressList() []objectcore.AddressWithType {
func (l ListRes) AddressList() []objectcore.Info {
return l.addrList
}
@ -89,7 +89,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
return res, ErrDegradedMode
}
result := make([]objectcore.AddressWithType, 0, prm.count)
result := make([]objectcore.Info, 0, prm.count)
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
@ -99,7 +99,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
return res, metaerr.Wrap(err)
}
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor
var bucketName []byte
var err error
@ -183,11 +183,11 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID
to []objectcore.AddressWithType, // listing result
to []objectcore.Info, // listing result
limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
) ([]objectcore.Info, []byte, *Cursor, error) {
if cursor == nil {
cursor = new(Cursor)
}
@ -219,18 +219,27 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return nil, nil, nil, err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(cnt)
a.SetObject(obj)
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
count++
}

View file

@ -77,7 +77,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
total = containers * 4 // regular + ts + child + lock
)
expected := make([]object.AddressWithType, 0, total)
expected := make([]object.Info, 0, total)
// fill metabase with objects
for i := 0; i < containers; i++ {
@ -88,21 +88,21 @@ func TestLisObjectsWithCursor(t *testing.T) {
obj.SetType(objectSDK.TypeRegular)
err := putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
// add one tombstone
obj = testutil.GenerateObjectWithCID(containerID)
obj.SetType(objectSDK.TypeTombstone)
err = putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
// add one lock
obj = testutil.GenerateObjectWithCID(containerID)
obj.SetType(objectSDK.TypeLock)
err = putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
// add one inhumed (do not include into expected)
obj = testutil.GenerateObjectWithCID(containerID)
@ -124,12 +124,12 @@ func TestLisObjectsWithCursor(t *testing.T) {
child.SetSplitID(splitID)
err = putBig(db, child)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
}
t.Run("success with various count", func(t *testing.T) {
for countPerReq := 1; countPerReq <= total; countPerReq++ {
got := make([]object.AddressWithType, 0, total)
got := make([]object.Info, 0, total)
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
require.NoError(t, err, "count:%d", countPerReq)
@ -211,7 +211,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
}
}
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) {
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.Info, *meta.Cursor, error) {
var listPrm meta.ListPrm
listPrm.SetCount(count)
listPrm.SetCursor(cursor)

View file

@ -42,7 +42,7 @@ type ListWithCursorPrm struct {
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []objectcore.AddressWithType
addrList []objectcore.Info
cursor *Cursor
}
@ -59,7 +59,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
}
// AddressList returns addresses selected by ListWithCursor operation.
func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType {
func (r ListWithCursorRes) AddressList() []objectcore.Info {
return r.addrList
}

View file

@ -77,7 +77,7 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
Obj: obj,
Nodes: nodes,
}
fyrchik marked this conversation as resolved Outdated

HandleReplicationTask? handle is a verb, replicate is a verb too

`HandleReplicationTask`? `handle` is a verb, `replicate` is a verb too

Done

Done
s.replicator.HandleTask(ctx, task, &res)
s.replicator.HandleReplicationTask(ctx, task, &res)
if res.count == 0 {
return errors.New("object was not replicated")

View file

@ -0,0 +1,55 @@
package getsvc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type RemoteGetPrm struct {
Address oid.Address
Node netmapSDK.NodeInfo
}
type RemoteGetter struct {
s remoteStorageConstructor
es epochSource
ks keyStorage
}
func (g *RemoteGetter) Get(ctx context.Context, prm RemoteGetPrm) (*objectSDK.Object, error) {
var nodeInfo client.NodeInfo
if err := client.NodeInfoFromRawNetmapElement(&nodeInfo, netmapCore.Node(prm.Node)); err != nil {
return nil, err
}
rs, err := g.s.Get(nodeInfo)
if err != nil {
return nil, err
}
epoch, err := g.es.Epoch()
if err != nil {
return nil, err
}
key, err := g.ks.GetKey(nil)
if err != nil {
return nil, err
}
r := RemoteRequestParams{
Epoch: epoch,
TTL: 1,
PrivateKey: key,
}
return rs.Get(ctx, prm.Address, r)
}
func NewRemoteGetter(cc clientConstructor, es epochSource, ks keyStorage) *RemoteGetter {
return &RemoteGetter{
s: &multiclientRemoteStorageConstructor{clientConstructor: cc},
es: es,
ks: ks,
}
}

View file

@ -1,17 +0,0 @@
package headsvc
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type Prm struct {
addr oid.Address
}
func (p *Prm) WithAddress(v oid.Address) *Prm {
if p != nil {
p.addr = v
}
return p
}

View file

@ -34,10 +34,7 @@ import (
"go.uber.org/zap"
)
var (
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
errInvalidECObject = errors.New("object must be splitted to EC parts")
)
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
type putSingleRequestSigner struct {
req *objectAPI.PutSingleRequest
@ -181,10 +178,6 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac
}
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
return errInvalidECObject
}
commonPrm, err := svcutil.CommonPrmFromV2(req)
if err != nil {
return err

View file

@ -1,4 +1,4 @@
package headsvc
package object
import (
"context"
@ -18,18 +18,18 @@ type ClientConstructor interface {
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
}
// RemoteHeader represents utility for getting
// the object header from a remote host.
type RemoteHeader struct {
// RemoteReader represents utility for getting
// the object from a remote host.
type RemoteReader struct {
keyStorage *util.KeyStorage
clientCache ClientConstructor
}
// RemoteHeadPrm groups remote header operation parameters.
type RemoteHeadPrm struct {
commonHeadPrm *Prm
// RemoteRequestPrm groups remote operation parameters.
type RemoteRequestPrm struct {
addr oid.Address
raw bool
node netmap.NodeInfo
}
@ -37,16 +37,16 @@ const remoteOpTTL = 1
var ErrNotFound = errors.New("object header not found")
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteHeader {
return &RemoteHeader{
// NewRemoteReader creates, initializes and returns new RemoteHeader instance.
func NewRemoteReader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteReader {
return &RemoteReader{
keyStorage: keyStorage,
clientCache: cache,
}
}
// WithNodeInfo sets information about the remote node.
func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
func (p *RemoteRequestPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteRequestPrm {
if p != nil {
p.node = v
}
@ -55,16 +55,23 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
}
// WithObjectAddress sets object address.
func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm {
func (p *RemoteRequestPrm) WithObjectAddress(v oid.Address) *RemoteRequestPrm {
if p != nil {
p.commonHeadPrm = new(Prm).WithAddress(v)
p.addr = v
}
return p
}
func (p *RemoteRequestPrm) WithRaw(v bool) *RemoteRequestPrm {
if p != nil {
p.raw = v
}
return p
}
// Head requests object header from the remote node.
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) {
func (h *RemoteReader) Head(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
key, err := h.keyStorage.GetKey(nil)
if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
@ -86,8 +93,11 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
headPrm.SetClient(c)
headPrm.SetPrivateKey(key)
headPrm.SetAddress(prm.commonHeadPrm.addr)
headPrm.SetAddress(prm.addr)
headPrm.SetTTL(remoteOpTTL)
if prm.raw {
headPrm.SetRawFlag()
}
res, err := internalclient.HeadObject(ctx, headPrm)
if err != nil {
@ -96,3 +106,39 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
return res.Header(), nil
}
func (h *RemoteReader) Get(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
key, err := h.keyStorage.GetKey(nil)
if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
}
var info clientcore.NodeInfo
err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(prm.node))
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := h.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err)
}
var getPrm internalclient.GetObjectPrm
getPrm.SetClient(c)
getPrm.SetPrivateKey(key)
getPrm.SetAddress(prm.addr)
getPrm.SetTTL(remoteOpTTL)
if prm.raw {
getPrm.SetRawFlag()
}
res, err := internalclient.GetObject(ctx, getPrm)
if err != nil {
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err)
}
return res.Object(), nil
}

View file

@ -18,19 +18,15 @@ import (
"go.uber.org/zap"
)
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error {
addr := addrWithType.Address
idCnr := addr.Container()
idObj := addr.Object()
cnr, err := p.cnrSrc.Get(idCnr)
func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
if err != nil {
if client.IsErrContainerNotFound(err) {
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr)
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container())
if errWasRemoved != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
} else if existed {
err := p.buryFn(ctx, addrWithType.Address)
err := p.buryFn(ctx, objInfo.Address)
if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
}
@ -41,11 +37,16 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
}
policy := cnr.Value.PlacementPolicy()
if policycore.IsECPlacement(policy) {
// EC not supported yet by policer
return nil
}
if policycore.IsECPlacement(policy) {
return p.processECContainerObject(ctx, objInfo, policy)
}
return p.processRepContainerObject(ctx, objInfo, policy)
}
func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
idObj := objInfo.Address.Object()
idCnr := objInfo.Address.Container()
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
@ -53,11 +54,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
c := &placementRequirements{}
var numOfContainerNodes int
fyrchik marked this conversation as resolved Outdated

Seems like a separate fix, unrelated to the commit

Seems like a separate fix, unrelated to the commit

Done

Done
for i := range nn {
numOfContainerNodes += len(nn[i])
}
// cached info about already checked nodes
checkedNodes := newNodeCache()
@ -68,15 +64,24 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
default:
}
p.processNodes(ctx, c, addrWithType, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes)
shortage := policy.ReplicaDescriptor(i).NumberOfObjects()
if objInfo.Type == objectSDK.TypeLock || objInfo.Type == objectSDK.TypeTombstone || objInfo.IsLinkingObject {
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
// for correct object removal protection:
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nn[i]))
}
p.processRepNodes(ctx, c, objInfo, nn[i], shortage, checkedNodes)
}
if !c.needLocalCopy && c.removeLocalCopy {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
zap.Stringer("object", addr),
zap.Stringer("object", objInfo.Address),
)
p.cbRedundantCopy(ctx, addr)
p.cbRedundantCopy(ctx, objInfo.Address)
}
return nil
}
@ -89,23 +94,13 @@ type placementRequirements struct {
removeLocalCopy bool
}
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info,
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache,
) {
addr := addrWithType.Address
typ := addrWithType.Type
addr := objInfo.Address
// Number of copies that are stored on maintenance nodes.
var uncheckedCopies int
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
// for correct object removal protection:
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nodes))
}
for i := 0; shortage > 0 && i < len(nodes); i++ {
select {
case <-ctx.Done():
@ -133,7 +128,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader(callCtx, nodes[i], addr)
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
cancel()
@ -196,7 +191,7 @@ func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address
Nodes: nodes,
}
p.replicator.HandleTask(ctx, task, checkedNodes)
p.replicator.HandleReplicationTask(ctx, task, checkedNodes)
case uncheckedCopies > 0:
// If we have more copies than needed, but some of them are from the maintenance nodes,

390
pkg/services/policer/ec.go Normal file
View file

@ -0,0 +1,390 @@
package policer
import (
"context"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var errNoECinfoReturnded = errors.New("no EC info returned")
type ecChunkProcessResult struct {
validPlacement bool
removeLocal bool
}
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector")
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
if objInfo.ECInfo == nil {
return p.processECContainerRepObject(ctx, objInfo, policy)
}
return p.processECContainerECObject(ctx, objInfo, policy)
}
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
// All of them must be stored on all of the container nodes.
func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
objID := objInfo.Address.Object()
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objID, policy)
if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
}
fyrchik marked this conversation as resolved Outdated

len(nn) is currently 1 for all EC policies.
Can we simplify the code having this in mind?
Because when it won't be 1, we will also likely have different EC X.Y or REP N descriptors for different indexes in this array.

The distinction between REP/EC could be done based on each nn element, not on the uppermost level.

`len(nn)` is currently 1 for all EC policies. Can we simplify the code having this in mind? Because when it won't be 1, we will also likely have different `EC X.Y` or `REP N` descriptors for different indexes in this array. The distinction between `REP`/`EC` could be done based on each `nn` element, not on the uppermost level.

fixed

fixed
if len(nn) != 1 {
return errInvalidECPlacement
}
c := &placementRequirements{}
checkedNodes := newNodeCache()
fyrchik marked this conversation as resolved Outdated

Could you elaborate a bit what this comment is trying to explain and why it is here?

Could you elaborate a bit what this comment is trying to explain and why it is here?

Refactored, see last commit.

Refactored, see last commit.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
p.processRepNodes(ctx, c, objInfo, nn[0], uint32(len(nn[0])), checkedNodes)
if !c.needLocalCopy && c.removeLocalCopy {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
zap.Stringer("object", objInfo.Address),
)
p.cbRedundantCopy(ctx, objInfo.Address)
}
return nil
}
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
if err != nil {
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
}
if len(nn) != 1 {
return errInvalidECPlacement
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
res := p.processECChunk(ctx, objInfo, nn[0])
if !res.validPlacement {
// drop local chunk only if all required chunks are in place
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0])
}
p.adjustECPlacement(ctx, objInfo, nn[0], policy)
if res.removeLocal {
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
p.cbRedundantCopy(ctx, objInfo.Address)
}
return nil
}
// processECChunk replicates EC chunk if needed.
fyrchik marked this conversation as resolved Outdated

It returns a struct.

It returns a `struct`.

Fixed

Fixed
func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) ecChunkProcessResult {
var removeLocalChunk bool
requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))]
if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) {
// current node is required node, we are happy
return ecChunkProcessResult{
validPlacement: true,
}
}
if requiredNode.IsMaintenance() {
// consider maintenance mode has object, but do not drop local copy
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
return ecChunkProcessResult{}
}
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address, false)
cancel()
if err == nil {
removeLocalChunk = true
} else if client.IsErrObjectNotFound(err) {
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", objInfo.Address), zap.Uint32("shortage", 1))
task := replicator.Task{
NumCopies: 1,
Addr: objInfo.Address,
Nodes: []netmap.NodeInfo{requiredNode},
}
p.replicator.HandleReplicationTask(ctx, task, newNodeCache())
} else if isClientErrMaintenance(err) {
// consider maintenance mode has object, but do not drop local copy
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
} else {
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error()))
}
return ecChunkProcessResult{
removeLocal: removeLocalChunk,
}
}
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool {
var parentAddress oid.Address
parentAddress.SetContainer(objInfo.Address.Container())
parentAddress.SetObject(objInfo.ECInfo.ParentID)
requiredChunkIndexes := p.collectRequiredECChunks(nodes, objInfo)
if len(requiredChunkIndexes) == 0 {
p.log.Info(logs.PolicerNodeIsNotECObjectNode, zap.Stringer("object", objInfo.ECInfo.ParentID))
return true
}
err := p.resolveLocalECChunks(ctx, parentAddress, requiredChunkIndexes)
if err != nil {
p.log.Error(logs.PolicerFailedToGetLocalECChunks, zap.Error(err), zap.Stringer("object", parentAddress))
return false
}
if len(requiredChunkIndexes) == 0 {
return true
}
indexToObjectID := make(map[uint32]oid.ID)
success := p.resolveRemoteECChunks(ctx, parentAddress, nodes, requiredChunkIndexes, indexToObjectID)
if !success {
return false
}
for index, candidates := range requiredChunkIndexes {
var addr oid.Address
addr.SetContainer(objInfo.Address.Container())
addr.SetObject(indexToObjectID[index])
p.replicator.HandlePullTask(ctx, replicator.Task{
Review

Just for curiosity: do we always need to pull missing chunks from remote node?
processECChunk sets removeLocal = true but a chunk, that is being replicated, is not removed yet until pullRequiredECChunks's run. Could a policer try to reconstruct missing chunk if the local node, by good fortune, keeps required chunks for reconstruction?

I suppose this is not a big deal and barely gives a big profit...

Just for curiosity: do we *always* need to pull missing chunks from remote node? `processECChunk` sets `removeLocal = true` but a chunk, that is being replicated, is not removed yet until `pullRequiredECChunks`'s run. Could a policer try to `reconstruct` missing chunk if the local node, *by good fortune*, keeps required chunks for reconstruction? I suppose this is not a big deal and barely gives a big profit...
Review

By default local node should keep only one chunk.

By default local node should keep only one chunk.
Addr: addr,
Nodes: candidates,
})
}
// there was some missing chunks, it's not ok
return false
}
func (p *Policer) collectRequiredECChunks(nodes []netmap.NodeInfo, objInfo objectcore.Info) map[uint32][]netmap.NodeInfo {
fyrchik marked this conversation as resolved Outdated

By design there is only 1 required chunk to store on the node (EC policy implies UNIQUE).
Why do we return a map here? It seems the code could be greatly simplified, having this in mind.

_By design_ there is only 1 required chunk to store on the node (EC policy implies `UNIQUE`). Why do we return a map here? It seems the code could be greatly simplified, having this in mind.

Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks.

Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks.

Evacuation does not affect the set of required chunks in any way.

Evacuation does not affect the set of required chunks in any way.

Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy.

Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy.

Yes, but it is moved to a node which should not store it, and I have thought this function determines whether the node should store it?
Another way to look at it: policer must depend only on the network map and placement policy, evacuation is a separate process.

Yes, but it is moved to a node which _should not_ store it, and I have thought this function determines whether the node _should_ store it? Another way to look at it: policer _must_ depend only on the network map and placement policy, evacuation is a separate process.

Actually, it may happen in a situation when we have EC 3.1 and 1 node went offline, so we have 3 nodes in the network map.
What is the behaviour in this case?

Actually, it may happen in a situation when we have `EC 3.1` and 1 node went offline, so we have 3 nodes in the network map. What is the behaviour in this case?

this function determines whether the node should store it

This function collects all nodes that store required chunks.

What is the behaviour in this case?

Policer will get an error from offline node and will skip object.

> this function determines whether the node should store it This function collects all nodes that store required chunks. > What is the behaviour in this case? Policer will get an error from offline node and will skip object.
requiredChunkIndexes := make(map[uint32][]netmap.NodeInfo)
for i, n := range nodes {
if uint32(i) == objInfo.ECInfo.Total {
break
}
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
requiredChunkIndexes[uint32(i)] = []netmap.NodeInfo{}
}
}
return requiredChunkIndexes
}
func (p *Policer) resolveLocalECChunks(ctx context.Context, parentAddress oid.Address, required map[uint32][]netmap.NodeInfo) error {
_, err := p.localHeader(ctx, parentAddress)
var eiErr *objectSDK.ECInfoError
if err == nil { // should not be happen
return errNoECinfoReturnded
}
if !errors.As(err, &eiErr) {
return err
}
for _, ch := range eiErr.ECInfo().Chunks {
delete(required, ch.Index)
}
return nil
}
func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.Address, nodes []netmap.NodeInfo, required map[uint32][]netmap.NodeInfo, indexToObjectID map[uint32]oid.ID) bool {
var eiErr *objectSDK.ECInfoError
for _, n := range nodes {
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
continue
}
_, err := p.remoteHeader(ctx, n, parentAddress, true)
if !errors.As(err, &eiErr) {
continue
}
for _, ch := range eiErr.ECInfo().Chunks {
if candidates, ok := required[ch.Index]; ok {
candidates = append(candidates, n)
required[ch.Index] = candidates
var chunkID oid.ID
if err := chunkID.ReadFromV2(ch.ID); err != nil {
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
return false
}
if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID {
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed),
zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
return false
}
indexToObjectID[ch.Index] = chunkID
}
}
}
for index, candidates := range required {
if len(candidates) == 0 {
p.log.Error(logs.PolicerMissingECChunk, zap.Stringer("object", parentAddress), zap.Uint32("index", index))
return false
}
}
return true
}
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) {
var parentAddress oid.Address
parentAddress.SetContainer(objInfo.Address.Container())
parentAddress.SetObject(objInfo.ECInfo.ParentID)
var eiErr *objectSDK.ECInfoError
resolved := make(map[uint32][]netmap.NodeInfo)
chunkIDs := make(map[uint32]oid.ID)
restore := true // do not restore EC chunks if some node returned error
for idx, n := range nodes {
if uint32(idx) >= objInfo.ECInfo.Total && uint32(len(resolved)) == objInfo.ECInfo.Total {
return
}
var err error
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
_, err = p.localHeader(ctx, parentAddress)
} else {
_, err = p.remoteHeader(ctx, n, parentAddress, true)
}
if errors.As(err, &eiErr) {
for _, ch := range eiErr.ECInfo().Chunks {
resolved[ch.Index] = append(resolved[ch.Index], n)
var ecInfoChunkID oid.ID
if err := ecInfoChunkID.ReadFromV2(ch.ID); err != nil {
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
return
}
if chunkID, exist := chunkIDs[ch.Index]; exist && chunkID != ecInfoChunkID {
aarifullin marked this conversation as resolved Outdated

I found existed, exist is not appropriate pair name. You could rename chunkID to ecInfoChunkID, but existed to chunkID

I found `existed, exist` is not appropriate pair name. You could rename `chunkID` to `ecInfoChunkID`, but `existed` to `chunkID`

Done

Done
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", chunkID),
aarifullin marked this conversation as resolved Outdated

Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs?

Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs?

A bug that I hope we don't have :)

A bug that I hope we don't have :)
zap.Stringer("second", ecInfoChunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
return
}
chunkIDs[ch.Index] = ecInfoChunkID
}
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
p.log.Warn(logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
p.replicator.HandleReplicationTask(ctx, replicator.Task{
NumCopies: 1,
Addr: objInfo.Address,
Nodes: []netmap.NodeInfo{n},
}, newNodeCache())
restore = false
}
}
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
return
}
if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() {
var found []uint32
fyrchik marked this conversation as resolved Outdated

Do you mean found? It is already a past participle from find, founded is what happened with Saint-Petersburg in 1703.

Do you mean `found`? It is already a past participle from `find`, `founded` is what happened with Saint-Petersburg in 1703.

Fixed

Fixed
for i := range resolved {
found = append(found, i)
}
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
return
}
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy)
}
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) {
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
if err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
parts := p.collectExistedChunks(ctx, objInfo, existedChunks, parentAddress, chunkIDs)
if parts == nil {
return
}
key, err := p.keyStorage.GetKey(nil)
if err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
required := make([]bool, len(parts))
for i, p := range parts {
if p == nil {
required[i] = true
}
}
fyrchik marked this conversation as resolved Outdated

You call Reconstruct(), then you call Split on the result. These operations are inverse to each other.
What is the purpose?

You call `Reconstruct()`, then you call `Split` on the result. These operations are inverse to each other. What is the purpose?

Fixed

Fixed
if err := c.ReconstructParts(parts, required, key); err != nil {
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
return
}
for idx, part := range parts {
if _, exists := existedChunks[uint32(idx)]; exists {
continue
}
var addr oid.Address
addr.SetContainer(parentAddress.Container())
pID, _ := part.ID()
addr.SetObject(pID)
targetNode := nodes[idx%len(nodes)]
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
Addr: addr,
Obj: part,
})
} else {
p.replicator.HandleReplicationTask(ctx, replicator.Task{
NumCopies: 1,
Addr: addr,
Nodes: []netmap.NodeInfo{targetNode},
Obj: part,
}, newNodeCache())
}
}
}
func (p *Policer) collectExistedChunks(ctx context.Context, objInfo objectcore.Info, existedChunks map[uint32][]netmap.NodeInfo, parentAddress oid.Address, chunkIDs map[uint32]oid.ID) []*objectSDK.Object {
parts := make([]*objectSDK.Object, objInfo.ECInfo.Total)
errGroup, egCtx := errgroup.WithContext(ctx)
for idx, nodes := range existedChunks {
idx := idx
nodes := nodes
errGroup.Go(func() error {
var objID oid.Address
objID.SetContainer(parentAddress.Container())
objID.SetObject(chunkIDs[idx])
var obj *objectSDK.Object
var err error
for _, node := range nodes {
if p.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
obj, err = p.localObject(egCtx, objID)
} else {
obj, err = p.remoteObject(egCtx, node, objID)
}
if err == nil {
break
}
p.log.Warn(logs.PolicerCouldNotGetChunk, zap.Stringer("object", parentAddress), zap.Stringer("chunkID", objID), zap.Error(err), zap.String("node", hex.EncodeToString(node.PublicKey())))
}
if obj != nil {
parts[idx] = obj
}
return nil
})
}
if err := errGroup.Wait(); err != nil {
p.log.Error(logs.PolicerCouldNotGetChunks, zap.Stringer("object", parentAddress), zap.Error(err))
return nil
}
return parts
}

View file

@ -0,0 +1,565 @@
package policer
import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
func TestECChunkHasValidPlacement(t *testing.T) {
t.Parallel()
chunkAddress := oidtest.Address()
parentID := oidtest.ID()
var policy netmapSDK.PlacementPolicy
require.NoError(t, policy.DecodeString("EC 2.1"))
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(chunkAddress.Container()) {
return cnr, nil
}
return nil, new(apistatus.ContainerNotFound)
},
}
nodes := make([]netmapSDK.NodeInfo, 4)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
return [][]netmapSDK.NodeInfo{nodes}, nil
}
return nil, errors.New("unexpected placement build")
}
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
require.True(t, raw, "remote header for parent object must be called with raw flag")
index := int(ni.PublicKey()[0])
require.True(t, index == 1 || index == 2, "invalid node to get parent header")
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = uint32(index)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(remoteHeadFn),
WithLocalObjectHeaderFunc(localHeadFn),
WithPool(testPool(t)),
)
objInfo := objectcore.Info{
Address: chunkAddress,
Type: objectSDK.TypeRegular,
ECInfo: &objectcore.ECInfo{
ParentID: parentID,
Index: 0,
Total: 3,
},
}
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
}
func TestECChunkHasInvalidPlacement(t *testing.T) {
t.Parallel()
chunkAddress := oidtest.Address()
parentID := oidtest.ID()
chunkObject := objectSDK.New()
chunkObject.SetContainerID(chunkAddress.Container())
chunkObject.SetID(chunkAddress.Object())
chunkObject.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
chunkObject.SetPayloadSize(uint64(10))
chunkObject.SetECHeader(objectSDK.NewECHeader(parentID, 1, 3, []byte{}, 0))
var policy netmapSDK.PlacementPolicy
require.NoError(t, policy.DecodeString("EC 2.1"))
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(chunkAddress.Container()) {
return cnr, nil
}
return nil, new(apistatus.ContainerNotFound)
},
}
nodes := make([]netmapSDK.NodeInfo, 4)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
return [][]netmapSDK.NodeInfo{nodes}, nil
}
return nil, errors.New("unexpected placement build")
}
objInfo := objectcore.Info{
Address: chunkAddress,
Type: objectSDK.TypeRegular,
ECInfo: &objectcore.ECInfo{
ParentID: parentID,
Index: 1,
Total: 3,
},
}
t.Run("node0 has chunk1, node1 has chunk0 and chunk1", func(t *testing.T) {
// policer should pull chunk0 on first run and drop chunk1 on second run
var allowDrop bool
requiredChunkID := oidtest.ID()
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
return chunkObject, nil
}
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.Index = 0
ch.SetID(requiredChunkID)
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 2
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
return nil, new(apistatus.ObjectNotFound)
}
require.Fail(t, "unexpected remote HEAD")
return nil, fmt.Errorf("unexpected remote HEAD")
}
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
if allowDrop {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.SetID(requiredChunkID)
ch.Index = 0
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var pullCounter atomic.Int64
var dropped []oid.Address
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithLocalObjectHeaderFunc(localHeadF),
WithReplicator(&testReplicator{
handlePullTask: (func(ctx context.Context, r replicator.Task) {
require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID &&
len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task")
pullCounter.Add(1)
}),
}),
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
require.True(t, allowDrop, "invalid redundent copy call")
dropped = append(dropped, a)
}),
WithPool(testPool(t)),
)
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
require.Equal(t, 0, len(dropped), "invalid dropped count")
allowDrop = true
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
require.Equal(t, 1, len(dropped), "invalid dropped count")
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
})
t.Run("node0 has chunk0 and chunk1, node1 has chunk1", func(t *testing.T) {
// policer should drop chunk1
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
return chunkObject, nil
}
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 2
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
return nil, new(apistatus.ObjectNotFound)
}
require.Fail(t, "unexpected remote HEAD")
return nil, fmt.Errorf("unexpected remote HEAD")
}
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.SetID(oidtest.ID())
ch.Index = 0
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var dropped []oid.Address
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithLocalObjectHeaderFunc(localHeadF),
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
dropped = append(dropped, a)
}),
WithPool(testPool(t)),
)
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 1, len(dropped), "invalid dropped count")
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
})
t.Run("node0 has chunk0 and chunk1, node1 has no chunks", func(t *testing.T) {
// policer should replicate chunk1 to node1 on first run and drop chunk1 on node0 on second run
var secondRun bool
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
if !secondRun {
return nil, new(apistatus.ObjectNotFound)
}
return chunkObject, nil
}
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(oidtest.ID())
ch.Index = 2
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
a.Object() == parentID && raw {
return nil, new(apistatus.ObjectNotFound)
}
require.Fail(t, "unexpected remote HEAD")
return nil, fmt.Errorf("unexpected remote HEAD")
}
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkAddress.Object())
ch.Index = 1
ch.Total = 3
ei.AddChunk(ch)
ch.SetID(oidtest.ID())
ch.Index = 0
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var dropped []oid.Address
var replicated []replicator.Task
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithLocalObjectHeaderFunc(localHeadF),
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
dropped = append(dropped, a)
}),
WithReplicator(&testReplicator{
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
replicated = append(replicated, t)
},
}),
WithPool(testPool(t)),
)
err := p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 0, len(dropped), "invalid dropped count")
require.Equal(t, 1, len(replicated), "invalid replicated count")
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
secondRun = true
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 1, len(replicated), "invalid replicated count")
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
require.Equal(t, 1, len(dropped), "invalid dropped count")
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
})
}
func TestECChunkRestore(t *testing.T) {
// node0 has chunk0, node1 has chunk1
// policer should replicate chunk0 to node2 on the first run
// then restore EC object and replicate chunk2 to node2 on the second run
t.Parallel()
payload := make([]byte, 64)
rand.Read(payload)
parentAddress := oidtest.Address()
parentObject := objectSDK.New()
parentObject.SetContainerID(parentAddress.Container())
parentObject.SetPayload(payload)
parentObject.SetPayloadSize(64)
objectSDK.CalculateAndSetPayloadChecksum(parentObject)
err := objectSDK.CalculateAndSetID(parentObject)
require.NoError(t, err)
id, _ := parentObject.ID()
parentAddress.SetObject(id)
chunkIDs := make([]oid.ID, 3)
c, err := erasurecode.NewConstructor(2, 1)
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
chunks, err := c.Split(parentObject, &key.PrivateKey)
require.NoError(t, err)
for i, ch := range chunks {
chunkIDs[i], _ = ch.ID()
}
var policy netmapSDK.PlacementPolicy
require.NoError(t, policy.DecodeString("EC 2.1"))
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
if id.Equals(parentAddress.Container()) {
return cnr, nil
}
return nil, new(apistatus.ContainerNotFound)
},
}
nodes := make([]netmapSDK.NodeInfo, 4)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
if cnr.Equals(parentAddress.Container()) && obj.Equals(parentAddress.Object()) {
return [][]netmapSDK.NodeInfo{nodes}, nil
}
return nil, errors.New("unexpected placement build")
}
var secondRun bool
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
require.True(t, raw, "remote header for parent object must be called with raw flag")
index := int(ni.PublicKey()[0])
require.True(t, index == 1 || index == 2 || index == 3, "invalid node to get parent header")
require.True(t, a == parentAddress, "invalid address to get remote header")
if index == 1 {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[1])
ch.Index = uint32(1)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
if index == 2 && secondRun {
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[0])
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
return nil, new(apistatus.ObjectNotFound)
}
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a == parentAddress, "invalid address to get remote header")
ei := objectSDK.NewECInfo()
var ch objectSDK.ECChunk
ch.SetID(chunkIDs[0])
ch.Index = uint32(0)
ch.Total = 3
ei.AddChunk(ch)
return nil, objectSDK.NewECInfoError(ei)
}
var replicatedObj []*objectSDK.Object
p := New(
WithContainerSource(containerSrc),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(remoteHeadFn),
WithLocalObjectHeaderFunc(localHeadFn),
WithReplicator(&testReplicator{
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
if t.Obj != nil {
replicatedObj = append(replicatedObj, t.Obj)
}
},
}),
WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
require.True(t, a.Container() == parentAddress.Container() && a.Object() == chunkIDs[0], "invalid local object request")
return chunks[0], nil
}),
WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
index := ni.PublicKey()[0]
if index == 2 {
return nil, new(apistatus.ObjectNotFound)
}
return chunks[index], nil
}),
WithPool(testPool(t)),
WithKeyStorage(util.NewKeyStorage(&key.PrivateKey, nil, nil)),
)
var chunkAddress oid.Address
chunkAddress.SetContainer(parentAddress.Container())
chunkAddress.SetObject(chunkIDs[0])
objInfo := objectcore.Info{
Address: chunkAddress,
Type: objectSDK.TypeRegular,
ECInfo: &objectcore.ECInfo{
ParentID: parentAddress.Object(),
Index: 0,
Total: 3,
},
}
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
secondRun = true
err = p.processObject(context.Background(), objInfo)
require.NoError(t, err)
require.Equal(t, 1, len(replicatedObj), "invalid replicated objects count")
chunks[2].SetSignature(nil)
expectedData, err := chunks[2].MarshalJSON()
require.NoError(t, err)
replicatedObj[0].SetSignature(nil)
actualData, err := replicatedObj[0].MarshalJSON()
require.NoError(t, err)
require.EqualValues(t, string(expectedData), string(actualData), "invalid restored objects")
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -22,7 +23,7 @@ import (
// Note that the underlying implementation might be circular: i.e. it can restart
// when the end of the key space is reached.
type KeySpaceIterator interface {
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
Next(context.Context, uint32) ([]objectcore.Info, error)
Rewind()
}
@ -35,11 +36,20 @@ type BuryFunc func(context.Context, oid.Address) error
// Replicator is the interface to a consumer of replication tasks.
type Replicator interface {
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
HandlePullTask(ctx context.Context, task replicator.Task)
HandleLocalPutTask(ctx context.Context, task replicator.Task)
}
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error)
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
type cfg struct {
headTimeout time.Duration
@ -56,6 +66,8 @@ type cfg struct {
remoteHeader RemoteObjectHeaderFunc
localHeader LocalObjectHeaderFunc
netmapKeys netmap.AnnouncedKeys
replicator Replicator
@ -69,6 +81,12 @@ type cfg struct {
evictDuration, sleepDuration time.Duration
metrics MetricsRegister
remoteObject RemoteObjectGetFunc
localObject LocalObjectGetFunc
keyStorage *util.KeyStorage
}
func defaultCfg() *cfg {
@ -125,13 +143,32 @@ func WithPlacementBuilder(v placement.Builder) Option {
}
}
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
// WithRemoteObjectHeader returns option to set remote object header receiver of Policer.
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer.
func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
return func(c *cfg) {
c.localHeader = v
}
}
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
return func(c *cfg) {
c.remoteObject = v
}
}
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
return func(c *cfg) {
c.localObject = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
@ -169,3 +206,9 @@ func WithMetrics(m MetricsRegister) Option {
c.metrics = m
}
}
func WithKeyStorage(ks *util.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = ks
}
}

View file

@ -26,7 +26,7 @@ import (
func TestBuryObjectWithoutContainer(t *testing.T) {
// Key space
addr := oidtest.Address()
objs := []objectcore.AddressWithType{
objs := []objectcore.Info{
{
Address: addr,
Type: objectSDK.TypeRegular,
@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
maintenanceNodes []int
wantRemoveRedundant bool
wantReplicateTo []int
ecInfo *objectcore.ECInfo
}{
{
desc: "1 copy already held by local node",
@ -144,6 +145,22 @@ func TestProcessObject(t *testing.T) {
objHolders: []int{1},
maintenanceNodes: []int{2},
},
{
desc: "lock object must be replicated to all EC nodes",
objType: objectSDK.TypeLock,
nodeCount: 3,
policy: `EC 1.1`,
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
{
desc: "tombstone object must be replicated to all EC nodes",
objType: objectSDK.TypeTombstone,
nodeCount: 3,
policy: `EC 1.1`,
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
}
for i := range tests {
@ -173,12 +190,15 @@ func TestProcessObject(t *testing.T) {
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
return placementVectors, nil
}
if ti.ecInfo != nil && cnr.Equals(addr.Container()) && obj != nil && obj.Equals(ti.ecInfo.ParentID) {
return placementVectors, nil
}
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
return nil, errors.New("unexpected placement build")
}
// Object remote header
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
index := int(ni.PublicKey()[0])
if a != addr || index < 1 || index >= ti.nodeCount {
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
@ -229,18 +249,21 @@ func TestProcessObject(t *testing.T) {
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
gotRemoveRedundant = true
}),
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
for _, node := range task.Nodes {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
}
})),
WithReplicator(&testReplicator{
handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
for _, node := range task.Nodes {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
}
},
}),
WithPool(testPool(t)),
)
addrWithType := objectcore.AddressWithType{
addrWithType := objectcore.Info{
Address: addr,
Type: ti.objType,
ECInfo: ti.ecInfo,
}
err := p.processObject(context.Background(), addrWithType)
@ -276,7 +299,7 @@ func TestProcessObjectError(t *testing.T) {
WithPool(testPool(t)),
)
addrWithType := objectcore.AddressWithType{
addrWithType := objectcore.Info{
Address: addr,
}
@ -285,7 +308,7 @@ func TestProcessObjectError(t *testing.T) {
func TestIteratorContract(t *testing.T) {
addr := oidtest.Address()
objs := []objectcore.AddressWithType{{
objs := []objectcore.Info{{
Address: addr,
Type: objectSDK.TypeRegular,
}}
@ -350,7 +373,7 @@ func testPool(t *testing.T) *ants.Pool {
}
type nextResult struct {
objs []objectcore.AddressWithType
objs []objectcore.Info
err error
}
@ -361,7 +384,7 @@ type predefinedIterator struct {
calls []string
}
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.Info, error) {
if it.pos == len(it.scenario) {
close(it.finishCh)
<-ctx.Done()
@ -380,11 +403,11 @@ func (it *predefinedIterator) Rewind() {
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
type sliceKeySpaceIterator struct {
objs []objectcore.AddressWithType
objs []objectcore.Info
cur int
}
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.Info, error) {
if it.cur >= len(it.objs) {
return nil, engine.ErrEndOfListing
}
@ -419,9 +442,20 @@ type announcedKeysFunc func([]byte) bool
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
// replicatorFunc is a Replicator backed by a function.
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
f(ctx, task, res)
type testReplicator struct {
handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult)
handleLocalPutTask func(ctx context.Context, task replicator.Task)
handlePullTask func(ctx context.Context, task replicator.Task)
}
func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
r.handleReplicationTask(ctx, task, res)
}
func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) {
r.handleLocalPutTask(ctx, task)
}
func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) {
r.handlePullTask(ctx, task)
}

View file

@ -21,9 +21,9 @@ type TaskResult interface {
SubmitSuccessfulReplication(netmap.NodeInfo)
}
// HandleTask executes replication task inside invoking goroutine.
// HandleReplicationTask executes replication task inside invoking goroutine.
// Passes all the nodes that accepted the replication to the TaskResult.
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res TaskResult) {
p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest()
defer func() {
@ -32,7 +32,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask",
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleReplicateTask",
trace.WithAttributes(
attribute.Stringer("address", task.Addr),
attribute.Int64("number_of_copies", int64(task.NumCopies)),

View file

@ -0,0 +1,72 @@
package replicator
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node")
func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest()
defer func() {
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
}()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandlePullTask",
trace.WithAttributes(
attribute.Stringer("address", task.Addr),
attribute.Int("nodes_count", len(task.Nodes)),
))
defer span.End()
var obj *objectSDK.Object
for _, node := range task.Nodes {
var err error
obj, err = p.remoteGetter.Get(ctx, getsvc.RemoteGetPrm{
Address: task.Addr,
Node: node,
})
if err == nil {
break
}
var endpoints []string
node.IterateNetworkEndpoints(func(s string) bool {
endpoints = append(endpoints, s)
return false
})
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
zap.Stringer("object", task.Addr),
zap.Error(err),
zap.Strings("endpoints", endpoints),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
if obj == nil {
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
zap.Stringer("object", task.Addr),
zap.Error(errFailedToGetObjectFromAnyNode),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
err := engine.Put(ctx, p.localStorage, obj)
if err != nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(err),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}

View file

@ -0,0 +1,47 @@
package replicator
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var errObjectNotDefined = errors.New("object is not defined")
func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest()
defer func() {
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
}()
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleLocalPutTask",
trace.WithAttributes(
attribute.Stringer("address", task.Addr),
attribute.Int("nodes_count", len(task.Nodes)),
))
defer span.End()
if task.Obj == nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(errObjectNotDefined),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
err := engine.Put(ctx, p.localStorage, task.Obj)
if err != nil {
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(err),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}

View file

@ -4,6 +4,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
@ -25,6 +26,8 @@ type cfg struct {
remoteSender *putsvc.RemoteSender
remoteGetter *getsvc.RemoteGetter
localStorage *engine.StorageEngine
metrics MetricsRegister
@ -70,6 +73,12 @@ func WithRemoteSender(v *putsvc.RemoteSender) Option {
}
}
func WithRemoteGetter(v *getsvc.RemoteGetter) Option {
return func(c *cfg) {
c.remoteGetter = v
}
}
// WithLocalStorage returns option to set local object storage of Replicator.
func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {