diff --git a/cmd/frostfs-node/keyspaceiterator.go b/cmd/frostfs-node/keyspaceiterator.go index e7214aacb..09a8f7f73 100644 --- a/cmd/frostfs-node/keyspaceiterator.go +++ b/cmd/frostfs-node/keyspaceiterator.go @@ -13,7 +13,7 @@ type keySpaceIterator struct { cur *engine.Cursor } -func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) { +func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.Info, error) { var prm engine.ListWithCursorPrm prm.WithCursor(it.cur) prm.WithCount(batchSize) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 4576e2e59..f51c72e73 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -529,4 +529,5 @@ const ( EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available" ECFailedToSendToContainerNode = "failed to send EC object to container node" ECFailedToSaveECPart = "failed to save EC part" + PolicerNodeIsNotContainerNodeForECObject = "current node is not container node for EC object" ) diff --git a/pkg/core/object/address.go b/pkg/core/object/info.go similarity index 62% rename from pkg/core/object/address.go rename to pkg/core/object/info.go index a25d853eb..67c9a3188 100644 --- a/pkg/core/object/address.go +++ b/pkg/core/object/info.go @@ -7,14 +7,21 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) -// AddressWithType groups object address with its FrostFS -// object type. -type AddressWithType struct { +type ECInfo struct { + ParentID oid.ID + Index uint32 + Total uint32 +} + +// Info groups object address with its FrostFS +// object info. +type Info struct { Address oid.Address Type objectSDK.Type IsLinkingObject bool + ECInfo *ECInfo } -func (v AddressWithType) String() string { +func (v Info) String() string { return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject) } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 81fe47e65..87542eec1 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -622,7 +622,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) return shards, nil } -func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, +func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 7245caeeb..cb3830b7c 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -68,12 +68,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { - addrList []objectcore.AddressWithType + addrList []objectcore.Info cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. -func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType { +func (l ListWithCursorRes) AddressList() []objectcore.Info { return l.addrList } @@ -98,7 +98,7 @@ func (l ListWithCursorRes) Cursor() *Cursor { // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) { - result := make([]objectcore.AddressWithType, 0, prm.count) + result := make([]objectcore.Info, 0, prm.count) // Set initial cursors cursor := prm.cursor diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 4fc9569c7..dd8a2e8a0 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -76,8 +76,8 @@ func TestListWithCursor(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - expected := make([]object.AddressWithType, 0, tt.objectNum) - got := make([]object.AddressWithType, 0, tt.objectNum) + expected := make([]object.Info, 0, tt.objectNum) + got := make([]object.Info, 0, tt.objectNum) for i := 0; i < tt.objectNum; i++ { containerID := cidtest.ID() @@ -88,7 +88,7 @@ func TestListWithCursor(t *testing.T) { err := e.Put(context.Background(), prm) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) + expected = append(expected, object.Info{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) } var prm ListWithCursorPrm diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index f7a314452..544b2e666 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -47,12 +47,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) { // ListRes contains values returned from ListWithCursor operation. type ListRes struct { - addrList []objectcore.AddressWithType + addrList []objectcore.Info cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. -func (l ListRes) AddressList() []objectcore.AddressWithType { +func (l ListRes) AddressList() []objectcore.Info { return l.addrList } @@ -89,7 +89,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err return res, ErrDegradedMode } - result := make([]objectcore.AddressWithType, 0, prm.count) + result := make([]objectcore.Info, 0, prm.count) err = db.boltDB.View(func(tx *bbolt.Tx) error { res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) @@ -99,7 +99,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err return res, metaerr.Wrap(err) } -func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { +func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) { threshold := cursor == nil // threshold is a flag to ignore cursor var bucketName []byte var err error @@ -183,11 +183,11 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets cidRaw []byte, // container ID prefix, optimization cnt cid.ID, // container ID - to []objectcore.AddressWithType, // listing result + to []objectcore.Info, // listing result limit int, // stop listing at `limit` items in result cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately -) ([]objectcore.AddressWithType, []byte, *Cursor, error) { +) ([]objectcore.Info, []byte, *Cursor, error) { if cursor == nil { cursor = new(Cursor) } @@ -219,18 +219,27 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket } var isLinkingObj bool + var ecInfo *objectcore.ECInfo if objType == objectSDK.TypeRegular { var o objectSDK.Object if err := o.Unmarshal(v); err != nil { return nil, nil, nil, err } isLinkingObj = isLinkObject(&o) + ecHeader := o.ECHeader() + if ecHeader != nil { + ecInfo = &objectcore.ECInfo{ + ParentID: ecHeader.Parent(), + Index: ecHeader.Index(), + Total: ecHeader.Total(), + } + } } var a oid.Address a.SetContainer(cnt) a.SetObject(obj) - to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj}) + to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}) count++ } diff --git a/pkg/local_object_storage/metabase/list_test.go b/pkg/local_object_storage/metabase/list_test.go index e1ccb4e06..a92e2eff4 100644 --- a/pkg/local_object_storage/metabase/list_test.go +++ b/pkg/local_object_storage/metabase/list_test.go @@ -77,7 +77,7 @@ func TestLisObjectsWithCursor(t *testing.T) { total = containers * 4 // regular + ts + child + lock ) - expected := make([]object.AddressWithType, 0, total) + expected := make([]object.Info, 0, total) // fill metabase with objects for i := 0; i < containers; i++ { @@ -88,21 +88,21 @@ func TestLisObjectsWithCursor(t *testing.T) { obj.SetType(objectSDK.TypeRegular) err := putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular}) + expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular}) // add one tombstone obj = testutil.GenerateObjectWithCID(containerID) obj.SetType(objectSDK.TypeTombstone) err = putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone}) + expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone}) // add one lock obj = testutil.GenerateObjectWithCID(containerID) obj.SetType(objectSDK.TypeLock) err = putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock}) + expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeLock}) // add one inhumed (do not include into expected) obj = testutil.GenerateObjectWithCID(containerID) @@ -124,12 +124,12 @@ func TestLisObjectsWithCursor(t *testing.T) { child.SetSplitID(splitID) err = putBig(db, child) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) + expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) } t.Run("success with various count", func(t *testing.T) { for countPerReq := 1; countPerReq <= total; countPerReq++ { - got := make([]object.AddressWithType, 0, total) + got := make([]object.Info, 0, total) res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil) require.NoError(t, err, "count:%d", countPerReq) @@ -211,7 +211,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) { } } -func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) { +func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.Info, *meta.Cursor, error) { var listPrm meta.ListPrm listPrm.SetCount(count) listPrm.SetCursor(cursor) diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index d6e4d7e50..08ea81a0c 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -42,7 +42,7 @@ type ListWithCursorPrm struct { // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { - addrList []objectcore.AddressWithType + addrList []objectcore.Info cursor *Cursor } @@ -59,7 +59,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { } // AddressList returns addresses selected by ListWithCursor operation. -func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType { +func (r ListWithCursorRes) AddressList() []objectcore.Info { return r.addrList } diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 3b65bff2c..794bc199a 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -77,7 +77,7 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj Obj: obj, Nodes: nodes, } - s.replicator.HandleTask(ctx, task, &res) + s.replicator.HandleReplicationTask(ctx, task, &res) if res.count == 0 { return errors.New("object was not replicated") diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index d68d99d18..2153275cc 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -18,19 +18,15 @@ import ( "go.uber.org/zap" ) -func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error { - addr := addrWithType.Address - idCnr := addr.Container() - idObj := addr.Object() - - cnr, err := p.cnrSrc.Get(idCnr) +func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error { + cnr, err := p.cnrSrc.Get(objInfo.Address.Container()) if err != nil { if client.IsErrContainerNotFound(err) { - existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr) + existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container()) if errWasRemoved != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved) } else if existed { - err := p.buryFn(ctx, addrWithType.Address) + err := p.buryFn(ctx, objInfo.Address) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err) } @@ -41,11 +37,16 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add } policy := cnr.Value.PlacementPolicy() - if policycore.IsECPlacement(policy) { - // EC not supported yet by policer - return nil - } + if policycore.IsECPlacement(policy) { + return p.processECContainerObject(ctx, objInfo, policy) + } + return p.processRepContainerObject(ctx, objInfo, policy) +} + +func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { + idObj := objInfo.Address.Object() + idCnr := objInfo.Address.Container() nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) @@ -68,15 +69,15 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add default: } - p.processNodes(ctx, c, addrWithType, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes) + p.processRepNodes(ctx, c, objInfo, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes) } if !c.needLocalCopy && c.removeLocalCopy { p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, - zap.Stringer("object", addr), + zap.Stringer("object", objInfo.Address), ) - p.cbRedundantCopy(ctx, addr) + p.cbRedundantCopy(ctx, objInfo.Address) } return nil } @@ -89,16 +90,16 @@ type placementRequirements struct { removeLocalCopy bool } -func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType, +func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info, nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache, ) { - addr := addrWithType.Address - typ := addrWithType.Type + addr := objInfo.Address + typ := objInfo.Type // Number of copies that are stored on maintenance nodes. var uncheckedCopies int - if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject { + if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || objInfo.IsLinkingObject { // all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects // for correct object removal protection: // - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests; @@ -196,7 +197,7 @@ func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address Nodes: nodes, } - p.replicator.HandleTask(ctx, task, checkedNodes) + p.replicator.HandleReplicationTask(ctx, task, checkedNodes) case uncheckedCopies > 0: // If we have more copies than needed, but some of them are from the maintenance nodes, diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go new file mode 100644 index 000000000..194b0ddbe --- /dev/null +++ b/pkg/services/policer/ec.go @@ -0,0 +1,137 @@ +package policer + +import ( + "context" + "errors" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "go.uber.org/zap" +) + +var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector") + +func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { + if objInfo.ECInfo == nil { + return p.processECContainerRepObject(ctx, objInfo, policy) + } + return p.processECContainerECObject(ctx, objInfo, policy) +} + +// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects. +// All of them must be stored on all of the container nodes. +func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { + objID := objInfo.Address.Object() + nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objID, policy) + if err != nil { + return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) + } + if len(nn) != 1 { + return errInvalidECPlacement + } + + c := &placementRequirements{} + checkedNodes := newNodeCache() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + rd := policy.ReplicaDescriptor(0) + // processNodes replaces rd.GetECDataCount() + rd.GetECParityCount() for len(nn[0]) for locks, tomstones and linking objects. + p.processRepNodes(ctx, c, objInfo, nn[0], rd.GetECDataCount()+rd.GetECParityCount(), checkedNodes) + + if !c.needLocalCopy && c.removeLocalCopy { + p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, + zap.Stringer("object", objInfo.Address), + ) + + p.cbRedundantCopy(ctx, objInfo.Address) + } + return nil +} + +func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { + nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy) + if err != nil { + return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) + } + if len(nn) != 1 { + return errInvalidECPlacement + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + validPlacement := p.processECChunk(ctx, objInfo, nn[0]) + if !validPlacement { + p.pullRequiredECChunks(ctx, objInfo, nn[0]) + } + return nil +} + +// processECChunk replicates EC chunk if needed. +// Returns True if current chunk should be stored on current node. +func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool { + var removeLocalChunk bool + requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))] + if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) { + // current node is required node, we are happy + return true + } + if requiredNode.IsMaintenance() { + // consider maintenance mode has object, but do not drop local copy + p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode))) + return false + } + + callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) + _, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address) + cancel() + + if err == nil { + removeLocalChunk = true + } else if client.IsErrObjectNotFound(err) { + p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", objInfo.Address), zap.Uint32("shortage", 1)) + task := replicator.Task{ + NumCopies: 1, + Addr: objInfo.Address, + Nodes: []netmap.NodeInfo{requiredNode}, + } + p.replicator.HandleReplicationTask(ctx, task, newNodeCache()) + } else if isClientErrMaintenance(err) { + // consider maintenance mode has object, but do not drop local copy + p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode))) + } else { + p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error())) + } + + if removeLocalChunk { + p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address)) + p.cbRedundantCopy(ctx, objInfo.Address) + } + + return false +} + +func (p *Policer) pullRequiredECChunks(_ context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) { + requiredChunkIndexes := make(map[uint32]struct{}) + for i, n := range nodes { + if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) { + requiredChunkIndexes[uint32(i)] = struct{}{} + } + } + if len(requiredChunkIndexes) == 0 { + p.log.Info(logs.PolicerNodeIsNotContainerNodeForECObject, zap.Stringer("object", objInfo.ECInfo.ParentID)) + return + } +} diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 58c74ee1a..849e5ed8b 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -22,7 +22,7 @@ import ( // Note that the underlying implementation might be circular: i.e. it can restart // when the end of the key space is reached. type KeySpaceIterator interface { - Next(context.Context, uint32) ([]objectcore.AddressWithType, error) + Next(context.Context, uint32) ([]objectcore.Info, error) Rewind() } @@ -35,7 +35,7 @@ type BuryFunc func(context.Context, oid.Address) error // Replicator is the interface to a consumer of replication tasks. type Replicator interface { - HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) + HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) } // RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node. diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index be0974c39..2c70b1c04 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -26,7 +26,7 @@ import ( func TestBuryObjectWithoutContainer(t *testing.T) { // Key space addr := oidtest.Address() - objs := []objectcore.AddressWithType{ + objs := []objectcore.Info{ { Address: addr, Type: objectSDK.TypeRegular, @@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) { maintenanceNodes []int wantRemoveRedundant bool wantReplicateTo []int + ecInfo *objectcore.ECInfo }{ { desc: "1 copy already held by local node", @@ -144,6 +145,47 @@ func TestProcessObject(t *testing.T) { objHolders: []int{1}, maintenanceNodes: []int{2}, }, + { + desc: "lock object must be replicated to all EC nodes", + objType: objectSDK.TypeLock, + nodeCount: 3, + policy: `EC 1.1`, + placement: [][]int{{0, 1, 2}}, + wantReplicateTo: []int{1, 2}, + }, + { + desc: "tombstone object must be replicated to all EC nodes", + objType: objectSDK.TypeTombstone, + nodeCount: 3, + policy: `EC 1.1`, + placement: [][]int{{0, 1, 2}}, + wantReplicateTo: []int{1, 2}, + }, + { + desc: "EC chunk stored valid on current node", + objType: objectSDK.TypeRegular, + nodeCount: 2, + policy: `EC 1.1`, + placement: [][]int{{0}}, + ecInfo: &objectcore.ECInfo{ + ParentID: oidtest.ID(), + Index: 1, + Total: 2, + }, + }, + { + desc: "EC chunk must be replicated to other EC node", + objType: objectSDK.TypeRegular, + nodeCount: 2, + policy: `EC 1.1`, + placement: [][]int{{1}}, + wantReplicateTo: []int{1}, + ecInfo: &objectcore.ECInfo{ + ParentID: oidtest.ID(), + Index: 1, + Total: 2, + }, + }, } for i := range tests { @@ -173,6 +215,9 @@ func TestProcessObject(t *testing.T) { if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) { return placementVectors, nil } + if ti.ecInfo != nil && cnr.Equals(addr.Container()) && obj != nil && obj.Equals(ti.ecInfo.ParentID) { + return placementVectors, nil + } t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj) return nil, errors.New("unexpected placement build") } @@ -238,9 +283,10 @@ func TestProcessObject(t *testing.T) { WithPool(testPool(t)), ) - addrWithType := objectcore.AddressWithType{ + addrWithType := objectcore.Info{ Address: addr, Type: ti.objType, + ECInfo: ti.ecInfo, } err := p.processObject(context.Background(), addrWithType) @@ -276,7 +322,7 @@ func TestProcessObjectError(t *testing.T) { WithPool(testPool(t)), ) - addrWithType := objectcore.AddressWithType{ + addrWithType := objectcore.Info{ Address: addr, } @@ -285,7 +331,7 @@ func TestProcessObjectError(t *testing.T) { func TestIteratorContract(t *testing.T) { addr := oidtest.Address() - objs := []objectcore.AddressWithType{{ + objs := []objectcore.Info{{ Address: addr, Type: objectSDK.TypeRegular, }} @@ -350,7 +396,7 @@ func testPool(t *testing.T) *ants.Pool { } type nextResult struct { - objs []objectcore.AddressWithType + objs []objectcore.Info err error } @@ -361,7 +407,7 @@ type predefinedIterator struct { calls []string } -func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) { +func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.Info, error) { if it.pos == len(it.scenario) { close(it.finishCh) <-ctx.Done() @@ -380,11 +426,11 @@ func (it *predefinedIterator) Rewind() { // sliceKeySpaceIterator is a KeySpaceIterator backed by a slice. type sliceKeySpaceIterator struct { - objs []objectcore.AddressWithType + objs []objectcore.Info cur int } -func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) { +func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.Info, error) { if it.cur >= len(it.objs) { return nil, engine.ErrEndOfListing } @@ -422,6 +468,6 @@ func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } // replicatorFunc is a Replicator backed by a function. type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) -func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { +func (f replicatorFunc) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { f(ctx, task, res) } diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 3a46e5f04..06d41b74e 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -21,9 +21,9 @@ type TaskResult interface { SubmitSuccessfulReplication(netmap.NodeInfo) } -// HandleTask executes replication task inside invoking goroutine. +// HandleReplicationTask executes replication task inside invoking goroutine. // Passes all the nodes that accepted the replication to the TaskResult. -func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) { +func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res TaskResult) { p.metrics.IncInFlightRequest() defer p.metrics.DecInFlightRequest() defer func() {