Add EC replication #1129
|
@ -13,7 +13,7 @@ type keySpaceIterator struct {
|
|||
cur *engine.Cursor
|
||||
}
|
||||
|
||||
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
|
||||
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.Info, error) {
|
||||
var prm engine.ListWithCursorPrm
|
||||
prm.WithCursor(it.cur)
|
||||
prm.WithCount(batchSize)
|
||||
|
|
|
@ -29,7 +29,6 @@ import (
|
|||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
||||
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
||||
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
||||
|
@ -231,7 +230,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
|||
return err
|
||||
}
|
||||
|
||||
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
|
||||
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
||||
|
||||
pol := policer.New(
|
||||
policer.WithLogger(c.log),
|
||||
|
@ -242,11 +241,33 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
|||
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
||||
),
|
||||
policer.WithRemoteObjectHeaderFunc(
|
||||
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
||||
return remoteHeader.Head(ctx, prm)
|
||||
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw)
|
||||
return remoteReader.Head(ctx, prm)
|
||||
},
|
||||
),
|
||||
policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||
var prm engine.HeadPrm
|
||||
fyrchik marked this conversation as resolved
|
||||
prm.WithAddress(a)
|
||||
res, err := c.cfgObject.cfgLocalStorage.localStorage.Head(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res.Header(), nil
|
||||
}),
|
||||
policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
||||
return remoteReader.Get(ctx, prm)
|
||||
}),
|
||||
policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||
var prm engine.GetPrm
|
||||
prm.WithAddress(a)
|
||||
res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res.Object(), nil
|
||||
}),
|
||||
policer.WithNetmapKeys(c),
|
||||
policer.WithHeadTimeout(
|
||||
policerconfig.HeadTimeout(c.appCfg),
|
||||
|
@ -265,6 +286,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
|||
}),
|
||||
policer.WithPool(c.cfgObject.pool.replication),
|
||||
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
||||
policer.WithKeyStorage(keyStorage),
|
||||
)
|
||||
|
||||
c.workers = append(c.workers, worker{
|
||||
|
@ -297,6 +319,9 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
|||
replicator.WithRemoteSender(
|
||||
putsvc.NewRemoteSender(keyStorage, cache),
|
||||
),
|
||||
replicator.WithRemoteGetter(
|
||||
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
|
||||
),
|
||||
replicator.WithMetrics(c.metricsCollector.Replicator()),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -529,4 +529,16 @@ const (
|
|||
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
||||
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||
ECFailedToSaveECPart = "failed to save EC part"
|
||||
PolicerNodeIsNotECObjectNode = "current node is not EC object node"
|
||||
PolicerFailedToGetLocalECChunks = "failed to get local EC chunks"
|
||||
PolicerMissingECChunk = "failed to find EC chunk on any of the nodes"
|
||||
PolicerFailedToDecodeECChunkID = "failed to decode EC chunk ID"
|
||||
PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk"
|
||||
ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage"
|
||||
ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage"
|
||||
PolicerCouldNotGetObjectFromNodeMoving = "could not get EC object from the node, moving current chunk to the node"
|
||||
PolicerCouldNotRestoreObjectNotEnoughChunks = "could not restore EC object: not enough chunks"
|
||||
PolicerFailedToRestoreObject = "failed to restore EC object"
|
||||
PolicerCouldNotGetChunk = "could not get EC chunk"
|
||||
PolicerCouldNotGetChunks = "could not get EC chunks"
|
||||
)
|
||||
|
|
|
@ -167,17 +167,29 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
|
|||
token := obj.SessionToken()
|
||||
ownerID := obj.OwnerID()
|
||||
|
||||
if token == nil && obj.ECHeader() != nil {
|
||||
role, err := v.isIROrContainerNode(obj, binKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if role == acl.RoleContainer {
|
||||
// EC part could be restored or created by container node, so ownerID could not match object signature
|
||||
return nil
|
||||
}
|
||||
return v.checkOwnerKey(ownerID, key)
|
||||
}
|
||||
|
||||
if token == nil || !token.AssertAuthKey(&key) {
|
||||
return v.checkOwnerKey(ownerID, key)
|
||||
}
|
||||
|
||||
if v.verifyTokenIssuer {
|
||||
signerIsIROrContainerNode, err := v.isIROrContainerNode(obj, binKey)
|
||||
role, err := v.isIROrContainerNode(obj, binKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if signerIsIROrContainerNode {
|
||||
if role == acl.RoleContainer || role == acl.RoleInnerRing {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -190,10 +202,10 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (bool, error) {
|
||||
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (acl.Role, error) {
|
||||
cnrID, containerIDSet := obj.ContainerID()
|
||||
if !containerIDSet {
|
||||
return false, errNilCID
|
||||
return acl.RoleOthers, errNilCID
|
||||
}
|
||||
|
||||
cnrIDBin := make([]byte, sha256.Size)
|
||||
|
@ -201,14 +213,14 @@ func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey [
|
|||
|
||||
cnr, err := v.containers.Get(cnrID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
||||
return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
||||
}
|
||||
|
||||
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return acl.RoleOthers, err
|
||||
}
|
||||
return res.Role == acl.RoleContainer || res.Role == acl.RoleInnerRing, nil
|
||||
return res.Role, nil
|
||||
}
|
||||
|
||||
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {
|
||||
|
|
|
@ -7,14 +7,21 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// AddressWithType groups object address with its FrostFS
|
||||
// object type.
|
||||
type AddressWithType struct {
|
||||
type ECInfo struct {
|
||||
ParentID oid.ID
|
||||
Index uint32
|
||||
Total uint32
|
||||
}
|
||||
|
||||
// Info groups object address with its FrostFS
|
||||
// object info.
|
||||
type Info struct {
|
||||
Address oid.Address
|
||||
Type objectSDK.Type
|
||||
IsLinkingObject bool
|
||||
ECInfo *ECInfo
|
||||
}
|
||||
|
||||
func (v AddressWithType) String() string {
|
||||
func (v Info) String() string {
|
||||
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
||||
}
|
|
@ -622,7 +622,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
|||
return shards, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||
|
|
|
@ -68,12 +68,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
|||
|
||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||
type ListWithCursorRes struct {
|
||||
addrList []objectcore.AddressWithType
|
||||
addrList []objectcore.Info
|
||||
cursor *Cursor
|
||||
}
|
||||
|
||||
// AddressList returns addresses selected by ListWithCursor operation.
|
||||
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
||||
func (l ListWithCursorRes) AddressList() []objectcore.Info {
|
||||
return l.addrList
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
|
|||
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||
// parameter set to zero.
|
||||
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
||||
result := make([]objectcore.Info, 0, prm.count)
|
||||
|
||||
// Set initial cursors
|
||||
cursor := prm.cursor
|
||||
|
|
|
@ -76,8 +76,8 @@ func TestListWithCursor(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
||||
got := make([]object.AddressWithType, 0, tt.objectNum)
|
||||
expected := make([]object.Info, 0, tt.objectNum)
|
||||
got := make([]object.Info, 0, tt.objectNum)
|
||||
|
||||
for i := 0; i < tt.objectNum; i++ {
|
||||
containerID := cidtest.ID()
|
||||
|
@ -88,7 +88,7 @@ func TestListWithCursor(t *testing.T) {
|
|||
|
||||
err := e.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
||||
expected = append(expected, object.Info{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
||||
}
|
||||
|
||||
var prm ListWithCursorPrm
|
||||
|
|
|
@ -47,12 +47,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) {
|
|||
|
||||
// ListRes contains values returned from ListWithCursor operation.
|
||||
type ListRes struct {
|
||||
addrList []objectcore.AddressWithType
|
||||
addrList []objectcore.Info
|
||||
cursor *Cursor
|
||||
}
|
||||
|
||||
// AddressList returns addresses selected by ListWithCursor operation.
|
||||
func (l ListRes) AddressList() []objectcore.AddressWithType {
|
||||
func (l ListRes) AddressList() []objectcore.Info {
|
||||
return l.addrList
|
||||
}
|
||||
|
||||
|
@ -89,7 +89,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
|||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
||||
result := make([]objectcore.Info, 0, prm.count)
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
||||
|
@ -99,7 +99,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
||||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
|
||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
||||
var bucketName []byte
|
||||
var err error
|
||||
|
@ -183,11 +183,11 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||
cidRaw []byte, // container ID prefix, optimization
|
||||
cnt cid.ID, // container ID
|
||||
to []objectcore.AddressWithType, // listing result
|
||||
to []objectcore.Info, // listing result
|
||||
limit int, // stop listing at `limit` items in result
|
||||
cursor *Cursor, // start from cursor object
|
||||
threshold bool, // ignore cursor and start immediately
|
||||
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
|
||||
) ([]objectcore.Info, []byte, *Cursor, error) {
|
||||
if cursor == nil {
|
||||
cursor = new(Cursor)
|
||||
}
|
||||
|
@ -219,18 +219,27 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
}
|
||||
|
||||
var isLinkingObj bool
|
||||
var ecInfo *objectcore.ECInfo
|
||||
if objType == objectSDK.TypeRegular {
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(v); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
isLinkingObj = isLinkObject(&o)
|
||||
ecHeader := o.ECHeader()
|
||||
if ecHeader != nil {
|
||||
ecInfo = &objectcore.ECInfo{
|
||||
ParentID: ecHeader.Parent(),
|
||||
Index: ecHeader.Index(),
|
||||
Total: ecHeader.Total(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var a oid.Address
|
||||
a.SetContainer(cnt)
|
||||
a.SetObject(obj)
|
||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
|
||||
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||
count++
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
|||
total = containers * 4 // regular + ts + child + lock
|
||||
)
|
||||
|
||||
expected := make([]object.AddressWithType, 0, total)
|
||||
expected := make([]object.Info, 0, total)
|
||||
|
||||
// fill metabase with objects
|
||||
for i := 0; i < containers; i++ {
|
||||
|
@ -88,21 +88,21 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
|||
obj.SetType(objectSDK.TypeRegular)
|
||||
err := putBig(db, obj)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||
|
||||
// add one tombstone
|
||||
obj = testutil.GenerateObjectWithCID(containerID)
|
||||
obj.SetType(objectSDK.TypeTombstone)
|
||||
err = putBig(db, obj)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
|
||||
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
|
||||
|
||||
// add one lock
|
||||
obj = testutil.GenerateObjectWithCID(containerID)
|
||||
obj.SetType(objectSDK.TypeLock)
|
||||
err = putBig(db, obj)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
|
||||
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
|
||||
|
||||
// add one inhumed (do not include into expected)
|
||||
obj = testutil.GenerateObjectWithCID(containerID)
|
||||
|
@ -124,12 +124,12 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
|||
child.SetSplitID(splitID)
|
||||
err = putBig(db, child)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||
}
|
||||
|
||||
t.Run("success with various count", func(t *testing.T) {
|
||||
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
||||
got := make([]object.AddressWithType, 0, total)
|
||||
got := make([]object.Info, 0, total)
|
||||
|
||||
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
|
||||
require.NoError(t, err, "count:%d", countPerReq)
|
||||
|
@ -211,7 +211,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) {
|
||||
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.Info, *meta.Cursor, error) {
|
||||
var listPrm meta.ListPrm
|
||||
listPrm.SetCount(count)
|
||||
listPrm.SetCursor(cursor)
|
||||
|
|
|
@ -42,7 +42,7 @@ type ListWithCursorPrm struct {
|
|||
|
||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||
type ListWithCursorRes struct {
|
||||
addrList []objectcore.AddressWithType
|
||||
addrList []objectcore.Info
|
||||
cursor *Cursor
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
|||
}
|
||||
|
||||
// AddressList returns addresses selected by ListWithCursor operation.
|
||||
func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
||||
func (r ListWithCursorRes) AddressList() []objectcore.Info {
|
||||
return r.addrList
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
|
|||
Obj: obj,
|
||||
Nodes: nodes,
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`HandleReplicationTask`? `handle` is a verb, `replicate` is a verb too
dstepanov-yadro
commented
Done Done
|
||||
s.replicator.HandleTask(ctx, task, &res)
|
||||
s.replicator.HandleReplicationTask(ctx, task, &res)
|
||||
|
||||
if res.count == 0 {
|
||||
return errors.New("object was not replicated")
|
||||
|
|
55
pkg/services/object/get/remote_getter.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type RemoteGetPrm struct {
|
||||
Address oid.Address
|
||||
Node netmapSDK.NodeInfo
|
||||
}
|
||||
|
||||
type RemoteGetter struct {
|
||||
s remoteStorageConstructor
|
||||
es epochSource
|
||||
ks keyStorage
|
||||
}
|
||||
|
||||
func (g *RemoteGetter) Get(ctx context.Context, prm RemoteGetPrm) (*objectSDK.Object, error) {
|
||||
var nodeInfo client.NodeInfo
|
||||
if err := client.NodeInfoFromRawNetmapElement(&nodeInfo, netmapCore.Node(prm.Node)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs, err := g.s.Get(nodeInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
epoch, err := g.es.Epoch()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key, err := g.ks.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := RemoteRequestParams{
|
||||
Epoch: epoch,
|
||||
TTL: 1,
|
||||
PrivateKey: key,
|
||||
}
|
||||
return rs.Get(ctx, prm.Address, r)
|
||||
}
|
||||
|
||||
func NewRemoteGetter(cc clientConstructor, es epochSource, ks keyStorage) *RemoteGetter {
|
||||
return &RemoteGetter{
|
||||
s: &multiclientRemoteStorageConstructor{clientConstructor: cc},
|
||||
es: es,
|
||||
ks: ks,
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
package headsvc
|
||||
|
||||
import (
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type Prm struct {
|
||||
addr oid.Address
|
||||
}
|
||||
|
||||
func (p *Prm) WithAddress(v oid.Address) *Prm {
|
||||
if p != nil {
|
||||
p.addr = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
|
@ -34,10 +34,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||
errInvalidECObject = errors.New("object must be splitted to EC parts")
|
||||
)
|
||||
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||
|
||||
type putSingleRequestSigner struct {
|
||||
req *objectAPI.PutSingleRequest
|
||||
|
@ -181,10 +178,6 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac
|
|||
}
|
||||
|
||||
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||
if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
|
||||
return errInvalidECObject
|
||||
}
|
||||
|
||||
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package headsvc
|
||||
package object
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -18,18 +18,18 @@ type ClientConstructor interface {
|
|||
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
// RemoteHeader represents utility for getting
|
||||
// the object header from a remote host.
|
||||
type RemoteHeader struct {
|
||||
// RemoteReader represents utility for getting
|
||||
// the object from a remote host.
|
||||
type RemoteReader struct {
|
||||
keyStorage *util.KeyStorage
|
||||
|
||||
clientCache ClientConstructor
|
||||
}
|
||||
|
||||
// RemoteHeadPrm groups remote header operation parameters.
|
||||
type RemoteHeadPrm struct {
|
||||
commonHeadPrm *Prm
|
||||
|
||||
// RemoteRequestPrm groups remote operation parameters.
|
||||
type RemoteRequestPrm struct {
|
||||
addr oid.Address
|
||||
raw bool
|
||||
node netmap.NodeInfo
|
||||
}
|
||||
|
||||
|
@ -37,16 +37,16 @@ const remoteOpTTL = 1
|
|||
|
||||
var ErrNotFound = errors.New("object header not found")
|
||||
|
||||
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
|
||||
func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteHeader {
|
||||
return &RemoteHeader{
|
||||
// NewRemoteReader creates, initializes and returns new RemoteHeader instance.
|
||||
func NewRemoteReader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteReader {
|
||||
return &RemoteReader{
|
||||
keyStorage: keyStorage,
|
||||
clientCache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
// WithNodeInfo sets information about the remote node.
|
||||
func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
|
||||
func (p *RemoteRequestPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteRequestPrm {
|
||||
if p != nil {
|
||||
p.node = v
|
||||
}
|
||||
|
@ -55,16 +55,23 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
|
|||
}
|
||||
|
||||
// WithObjectAddress sets object address.
|
||||
func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm {
|
||||
func (p *RemoteRequestPrm) WithObjectAddress(v oid.Address) *RemoteRequestPrm {
|
||||
if p != nil {
|
||||
p.commonHeadPrm = new(Prm).WithAddress(v)
|
||||
p.addr = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RemoteRequestPrm) WithRaw(v bool) *RemoteRequestPrm {
|
||||
if p != nil {
|
||||
p.raw = v
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// Head requests object header from the remote node.
|
||||
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) {
|
||||
func (h *RemoteReader) Head(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
|
||||
key, err := h.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||
|
@ -86,8 +93,11 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
|
|||
|
||||
headPrm.SetClient(c)
|
||||
headPrm.SetPrivateKey(key)
|
||||
headPrm.SetAddress(prm.commonHeadPrm.addr)
|
||||
headPrm.SetAddress(prm.addr)
|
||||
headPrm.SetTTL(remoteOpTTL)
|
||||
if prm.raw {
|
||||
headPrm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.HeadObject(ctx, headPrm)
|
||||
if err != nil {
|
||||
|
@ -96,3 +106,39 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
|
|||
|
||||
return res.Header(), nil
|
||||
}
|
||||
|
||||
func (h *RemoteReader) Get(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
|
||||
key, err := h.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||
}
|
||||
|
||||
var info clientcore.NodeInfo
|
||||
|
||||
err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(prm.node))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||
}
|
||||
|
||||
c, err := h.clientCache.Get(info)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
var getPrm internalclient.GetObjectPrm
|
||||
|
||||
getPrm.SetClient(c)
|
||||
getPrm.SetPrivateKey(key)
|
||||
getPrm.SetAddress(prm.addr)
|
||||
getPrm.SetTTL(remoteOpTTL)
|
||||
if prm.raw {
|
||||
getPrm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.GetObject(ctx, getPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
return res.Object(), nil
|
||||
}
|
|
@ -18,19 +18,15 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error {
|
||||
addr := addrWithType.Address
|
||||
idCnr := addr.Container()
|
||||
idObj := addr.Object()
|
||||
|
||||
cnr, err := p.cnrSrc.Get(idCnr)
|
||||
func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
|
||||
cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
|
||||
if err != nil {
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr)
|
||||
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container())
|
||||
if errWasRemoved != nil {
|
||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
|
||||
} else if existed {
|
||||
err := p.buryFn(ctx, addrWithType.Address)
|
||||
err := p.buryFn(ctx, objInfo.Address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
|
||||
}
|
||||
|
@ -41,11 +37,16 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
}
|
||||
|
||||
policy := cnr.Value.PlacementPolicy()
|
||||
if policycore.IsECPlacement(policy) {
|
||||
// EC not supported yet by policer
|
||||
return nil
|
||||
}
|
||||
|
||||
if policycore.IsECPlacement(policy) {
|
||||
return p.processECContainerObject(ctx, objInfo, policy)
|
||||
}
|
||||
return p.processRepContainerObject(ctx, objInfo, policy)
|
||||
}
|
||||
|
||||
func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||
idObj := objInfo.Address.Object()
|
||||
idCnr := objInfo.Address.Container()
|
||||
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||
|
@ -53,11 +54,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
|
||||
c := &placementRequirements{}
|
||||
|
||||
var numOfContainerNodes int
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Seems like a separate fix, unrelated to the commit Seems like a separate fix, unrelated to the commit
dstepanov-yadro
commented
Done Done
|
||||
for i := range nn {
|
||||
numOfContainerNodes += len(nn[i])
|
||||
}
|
||||
|
||||
// cached info about already checked nodes
|
||||
checkedNodes := newNodeCache()
|
||||
|
||||
|
@ -68,15 +64,24 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
default:
|
||||
}
|
||||
|
||||
p.processNodes(ctx, c, addrWithType, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes)
|
||||
shortage := policy.ReplicaDescriptor(i).NumberOfObjects()
|
||||
if objInfo.Type == objectSDK.TypeLock || objInfo.Type == objectSDK.TypeTombstone || objInfo.IsLinkingObject {
|
||||
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
||||
// for correct object removal protection:
|
||||
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
||||
// - `LOCK` object removal is a prohibited action in the GC.
|
||||
shortage = uint32(len(nn[i]))
|
||||
}
|
||||
|
||||
p.processRepNodes(ctx, c, objInfo, nn[i], shortage, checkedNodes)
|
||||
}
|
||||
|
||||
if !c.needLocalCopy && c.removeLocalCopy {
|
||||
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
||||
zap.Stringer("object", addr),
|
||||
zap.Stringer("object", objInfo.Address),
|
||||
)
|
||||
|
||||
p.cbRedundantCopy(ctx, addr)
|
||||
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -89,23 +94,13 @@ type placementRequirements struct {
|
|||
removeLocalCopy bool
|
||||
}
|
||||
|
||||
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
|
||||
func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info,
|
||||
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache,
|
||||
) {
|
||||
addr := addrWithType.Address
|
||||
typ := addrWithType.Type
|
||||
addr := objInfo.Address
|
||||
|
||||
// Number of copies that are stored on maintenance nodes.
|
||||
var uncheckedCopies int
|
||||
|
||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
|
||||
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
||||
// for correct object removal protection:
|
||||
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
||||
// - `LOCK` object removal is a prohibited action in the GC.
|
||||
shortage = uint32(len(nodes))
|
||||
}
|
||||
|
||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -133,7 +128,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
|||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
|
||||
_, err := p.remoteHeader(callCtx, nodes[i], addr)
|
||||
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
|
||||
|
||||
cancel()
|
||||
|
||||
|
@ -196,7 +191,7 @@ func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address
|
|||
Nodes: nodes,
|
||||
}
|
||||
|
||||
p.replicator.HandleTask(ctx, task, checkedNodes)
|
||||
p.replicator.HandleReplicationTask(ctx, task, checkedNodes)
|
||||
|
||||
case uncheckedCopies > 0:
|
||||
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
||||
|
|
390
pkg/services/policer/ec.go
Normal file
|
@ -0,0 +1,390 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var errNoECinfoReturnded = errors.New("no EC info returned")
|
||||
|
||||
type ecChunkProcessResult struct {
|
||||
validPlacement bool
|
||||
removeLocal bool
|
||||
}
|
||||
|
||||
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector")
|
||||
|
||||
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||
if objInfo.ECInfo == nil {
|
||||
return p.processECContainerRepObject(ctx, objInfo, policy)
|
||||
}
|
||||
return p.processECContainerECObject(ctx, objInfo, policy)
|
||||
}
|
||||
|
||||
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
|
||||
// All of them must be stored on all of the container nodes.
|
||||
func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||
objID := objInfo.Address.Object()
|
||||
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objID, policy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
The distinction between `len(nn)` is currently 1 for all EC policies.
Can we simplify the code having this in mind?
Because when it won't be 1, we will also likely have different `EC X.Y` or `REP N` descriptors for different indexes in this array.
The distinction between `REP`/`EC` could be done based on each `nn` element, not on the uppermost level.
dstepanov-yadro
commented
fixed fixed
|
||||
if len(nn) != 1 {
|
||||
return errInvalidECPlacement
|
||||
}
|
||||
|
||||
c := &placementRequirements{}
|
||||
checkedNodes := newNodeCache()
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Could you elaborate a bit what this comment is trying to explain and why it is here? Could you elaborate a bit what this comment is trying to explain and why it is here?
dstepanov-yadro
commented
Refactored, see last commit. Refactored, see last commit.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
p.processRepNodes(ctx, c, objInfo, nn[0], uint32(len(nn[0])), checkedNodes)
|
||||
|
||||
if !c.needLocalCopy && c.removeLocalCopy {
|
||||
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
||||
zap.Stringer("object", objInfo.Address),
|
||||
)
|
||||
|
||||
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||
}
|
||||
if len(nn) != 1 {
|
||||
return errInvalidECPlacement
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
res := p.processECChunk(ctx, objInfo, nn[0])
|
||||
if !res.validPlacement {
|
||||
// drop local chunk only if all required chunks are in place
|
||||
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0])
|
||||
}
|
||||
p.adjustECPlacement(ctx, objInfo, nn[0], policy)
|
||||
|
||||
if res.removeLocal {
|
||||
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
|
||||
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processECChunk replicates EC chunk if needed.
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It returns a It returns a `struct`.
dstepanov-yadro
commented
Fixed Fixed
|
||||
func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) ecChunkProcessResult {
|
||||
var removeLocalChunk bool
|
||||
requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))]
|
||||
if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) {
|
||||
// current node is required node, we are happy
|
||||
return ecChunkProcessResult{
|
||||
validPlacement: true,
|
||||
}
|
||||
}
|
||||
if requiredNode.IsMaintenance() {
|
||||
// consider maintenance mode has object, but do not drop local copy
|
||||
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
||||
return ecChunkProcessResult{}
|
||||
}
|
||||
|
||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||
_, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address, false)
|
||||
cancel()
|
||||
|
||||
if err == nil {
|
||||
removeLocalChunk = true
|
||||
} else if client.IsErrObjectNotFound(err) {
|
||||
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", objInfo.Address), zap.Uint32("shortage", 1))
|
||||
task := replicator.Task{
|
||||
NumCopies: 1,
|
||||
Addr: objInfo.Address,
|
||||
Nodes: []netmap.NodeInfo{requiredNode},
|
||||
}
|
||||
p.replicator.HandleReplicationTask(ctx, task, newNodeCache())
|
||||
} else if isClientErrMaintenance(err) {
|
||||
// consider maintenance mode has object, but do not drop local copy
|
||||
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
||||
} else {
|
||||
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
return ecChunkProcessResult{
|
||||
removeLocal: removeLocalChunk,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool {
|
||||
var parentAddress oid.Address
|
||||
parentAddress.SetContainer(objInfo.Address.Container())
|
||||
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||
|
||||
requiredChunkIndexes := p.collectRequiredECChunks(nodes, objInfo)
|
||||
if len(requiredChunkIndexes) == 0 {
|
||||
p.log.Info(logs.PolicerNodeIsNotECObjectNode, zap.Stringer("object", objInfo.ECInfo.ParentID))
|
||||
return true
|
||||
}
|
||||
|
||||
err := p.resolveLocalECChunks(ctx, parentAddress, requiredChunkIndexes)
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerFailedToGetLocalECChunks, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||
return false
|
||||
}
|
||||
if len(requiredChunkIndexes) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
indexToObjectID := make(map[uint32]oid.ID)
|
||||
success := p.resolveRemoteECChunks(ctx, parentAddress, nodes, requiredChunkIndexes, indexToObjectID)
|
||||
if !success {
|
||||
return false
|
||||
}
|
||||
|
||||
for index, candidates := range requiredChunkIndexes {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(objInfo.Address.Container())
|
||||
addr.SetObject(indexToObjectID[index])
|
||||
p.replicator.HandlePullTask(ctx, replicator.Task{
|
||||
aarifullin
commented
Just for curiosity: do we always need to pull missing chunks from remote node? I suppose this is not a big deal and barely gives a big profit... Just for curiosity: do we *always* need to pull missing chunks from remote node?
`processECChunk` sets `removeLocal = true` but a chunk, that is being replicated, is not removed yet until `pullRequiredECChunks`'s run. Could a policer try to `reconstruct` missing chunk if the local node, *by good fortune*, keeps required chunks for reconstruction?
I suppose this is not a big deal and barely gives a big profit...
dstepanov-yadro
commented
By default local node should keep only one chunk. By default local node should keep only one chunk.
|
||||
Addr: addr,
|
||||
Nodes: candidates,
|
||||
})
|
||||
}
|
||||
// there was some missing chunks, it's not ok
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *Policer) collectRequiredECChunks(nodes []netmap.NodeInfo, objInfo objectcore.Info) map[uint32][]netmap.NodeInfo {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
By design there is only 1 required chunk to store on the node (EC policy implies _By design_ there is only 1 required chunk to store on the node (EC policy implies `UNIQUE`).
Why do we return a map here? It seems the code could be greatly simplified, having this in mind.
dstepanov-yadro
commented
Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks. Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks.
fyrchik
commented
Evacuation does not affect the set of required chunks in any way. Evacuation does not affect the set of required chunks in any way.
dstepanov-yadro
commented
Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy. Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy.
fyrchik
commented
Yes, but it is moved to a node which should not store it, and I have thought this function determines whether the node should store it? Yes, but it is moved to a node which _should not_ store it, and I have thought this function determines whether the node _should_ store it?
Another way to look at it: policer _must_ depend only on the network map and placement policy, evacuation is a separate process.
fyrchik
commented
Actually, it may happen in a situation when we have Actually, it may happen in a situation when we have `EC 3.1` and 1 node went offline, so we have 3 nodes in the network map.
What is the behaviour in this case?
dstepanov-yadro
commented
This function collects all nodes that store required chunks.
Policer will get an error from offline node and will skip object. > this function determines whether the node should store it
This function collects all nodes that store required chunks.
> What is the behaviour in this case?
Policer will get an error from offline node and will skip object.
|
||||
requiredChunkIndexes := make(map[uint32][]netmap.NodeInfo)
|
||||
for i, n := range nodes {
|
||||
if uint32(i) == objInfo.ECInfo.Total {
|
||||
break
|
||||
}
|
||||
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||
requiredChunkIndexes[uint32(i)] = []netmap.NodeInfo{}
|
||||
}
|
||||
}
|
||||
return requiredChunkIndexes
|
||||
}
|
||||
|
||||
func (p *Policer) resolveLocalECChunks(ctx context.Context, parentAddress oid.Address, required map[uint32][]netmap.NodeInfo) error {
|
||||
_, err := p.localHeader(ctx, parentAddress)
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
if err == nil { // should not be happen
|
||||
return errNoECinfoReturnded
|
||||
}
|
||||
if !errors.As(err, &eiErr) {
|
||||
return err
|
||||
}
|
||||
for _, ch := range eiErr.ECInfo().Chunks {
|
||||
delete(required, ch.Index)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.Address, nodes []netmap.NodeInfo, required map[uint32][]netmap.NodeInfo, indexToObjectID map[uint32]oid.ID) bool {
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
for _, n := range nodes {
|
||||
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||
continue
|
||||
}
|
||||
_, err := p.remoteHeader(ctx, n, parentAddress, true)
|
||||
if !errors.As(err, &eiErr) {
|
||||
continue
|
||||
}
|
||||
for _, ch := range eiErr.ECInfo().Chunks {
|
||||
if candidates, ok := required[ch.Index]; ok {
|
||||
candidates = append(candidates, n)
|
||||
required[ch.Index] = candidates
|
||||
|
||||
var chunkID oid.ID
|
||||
if err := chunkID.ReadFromV2(ch.ID); err != nil {
|
||||
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||
return false
|
||||
}
|
||||
if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID {
|
||||
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed),
|
||||
zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
|
||||
return false
|
||||
}
|
||||
indexToObjectID[ch.Index] = chunkID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for index, candidates := range required {
|
||||
if len(candidates) == 0 {
|
||||
p.log.Error(logs.PolicerMissingECChunk, zap.Stringer("object", parentAddress), zap.Uint32("index", index))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) {
|
||||
var parentAddress oid.Address
|
||||
parentAddress.SetContainer(objInfo.Address.Container())
|
||||
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
resolved := make(map[uint32][]netmap.NodeInfo)
|
||||
chunkIDs := make(map[uint32]oid.ID)
|
||||
restore := true // do not restore EC chunks if some node returned error
|
||||
for idx, n := range nodes {
|
||||
if uint32(idx) >= objInfo.ECInfo.Total && uint32(len(resolved)) == objInfo.ECInfo.Total {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||
_, err = p.localHeader(ctx, parentAddress)
|
||||
} else {
|
||||
_, err = p.remoteHeader(ctx, n, parentAddress, true)
|
||||
}
|
||||
|
||||
if errors.As(err, &eiErr) {
|
||||
for _, ch := range eiErr.ECInfo().Chunks {
|
||||
resolved[ch.Index] = append(resolved[ch.Index], n)
|
||||
var ecInfoChunkID oid.ID
|
||||
if err := ecInfoChunkID.ReadFromV2(ch.ID); err != nil {
|
||||
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||
return
|
||||
}
|
||||
if chunkID, exist := chunkIDs[ch.Index]; exist && chunkID != ecInfoChunkID {
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
I found I found `existed, exist` is not appropriate pair name. You could rename `chunkID` to `ecInfoChunkID`, but `existed` to `chunkID`
dstepanov-yadro
commented
Done Done
|
||||
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", chunkID),
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs? Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs?
dstepanov-yadro
commented
A bug that I hope we don't have :) A bug that I hope we don't have :)
|
||||
zap.Stringer("second", ecInfoChunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
|
||||
return
|
||||
}
|
||||
chunkIDs[ch.Index] = ecInfoChunkID
|
||||
}
|
||||
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
||||
p.log.Warn(logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
||||
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||
NumCopies: 1,
|
||||
Addr: objInfo.Address,
|
||||
Nodes: []netmap.NodeInfo{n},
|
||||
}, newNodeCache())
|
||||
restore = false
|
||||
}
|
||||
}
|
||||
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
|
||||
return
|
||||
}
|
||||
if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() {
|
||||
var found []uint32
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do you mean Do you mean `found`? It is already a past participle from `find`, `founded` is what happened with Saint-Petersburg in 1703.
dstepanov-yadro
commented
Fixed Fixed
|
||||
for i := range resolved {
|
||||
found = append(found, i)
|
||||
}
|
||||
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
|
||||
return
|
||||
}
|
||||
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy)
|
||||
}
|
||||
|
||||
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) {
|
||||
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||
return
|
||||
}
|
||||
parts := p.collectExistedChunks(ctx, objInfo, existedChunks, parentAddress, chunkIDs)
|
||||
if parts == nil {
|
||||
return
|
||||
}
|
||||
key, err := p.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||
return
|
||||
}
|
||||
required := make([]bool, len(parts))
|
||||
for i, p := range parts {
|
||||
if p == nil {
|
||||
required[i] = true
|
||||
}
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
You call You call `Reconstruct()`, then you call `Split` on the result. These operations are inverse to each other.
What is the purpose?
dstepanov-yadro
commented
Fixed Fixed
|
||||
if err := c.ReconstructParts(parts, required, key); err != nil {
|
||||
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||
return
|
||||
}
|
||||
for idx, part := range parts {
|
||||
if _, exists := existedChunks[uint32(idx)]; exists {
|
||||
continue
|
||||
}
|
||||
var addr oid.Address
|
||||
addr.SetContainer(parentAddress.Container())
|
||||
pID, _ := part.ID()
|
||||
addr.SetObject(pID)
|
||||
targetNode := nodes[idx%len(nodes)]
|
||||
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
|
||||
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
|
||||
Addr: addr,
|
||||
Obj: part,
|
||||
})
|
||||
} else {
|
||||
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||
NumCopies: 1,
|
||||
Addr: addr,
|
||||
Nodes: []netmap.NodeInfo{targetNode},
|
||||
Obj: part,
|
||||
}, newNodeCache())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Policer) collectExistedChunks(ctx context.Context, objInfo objectcore.Info, existedChunks map[uint32][]netmap.NodeInfo, parentAddress oid.Address, chunkIDs map[uint32]oid.ID) []*objectSDK.Object {
|
||||
parts := make([]*objectSDK.Object, objInfo.ECInfo.Total)
|
||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||
for idx, nodes := range existedChunks {
|
||||
idx := idx
|
||||
nodes := nodes
|
||||
errGroup.Go(func() error {
|
||||
var objID oid.Address
|
||||
objID.SetContainer(parentAddress.Container())
|
||||
objID.SetObject(chunkIDs[idx])
|
||||
var obj *objectSDK.Object
|
||||
var err error
|
||||
for _, node := range nodes {
|
||||
if p.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
obj, err = p.localObject(egCtx, objID)
|
||||
} else {
|
||||
obj, err = p.remoteObject(egCtx, node, objID)
|
||||
}
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
p.log.Warn(logs.PolicerCouldNotGetChunk, zap.Stringer("object", parentAddress), zap.Stringer("chunkID", objID), zap.Error(err), zap.String("node", hex.EncodeToString(node.PublicKey())))
|
||||
}
|
||||
if obj != nil {
|
||||
parts[idx] = obj
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
p.log.Error(logs.PolicerCouldNotGetChunks, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
return parts
|
||||
}
|
565
pkg/services/policer/ec_test.go
Normal file
|
@ -0,0 +1,565 @@
|
|||
package policer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestECChunkHasValidPlacement(t *testing.T) {
|
||||
t.Parallel()
|
||||
chunkAddress := oidtest.Address()
|
||||
parentID := oidtest.ID()
|
||||
|
||||
var policy netmapSDK.PlacementPolicy
|
||||
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||
|
||||
cnr := &container.Container{}
|
||||
cnr.Value.Init()
|
||||
cnr.Value.SetPlacementPolicy(policy)
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(chunkAddress.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
}
|
||||
|
||||
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||
for i := range nodes {
|
||||
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||
}
|
||||
|
||||
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
|
||||
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||
}
|
||||
return nil, errors.New("unexpected placement build")
|
||||
}
|
||||
|
||||
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
require.True(t, raw, "remote header for parent object must be called with raw flag")
|
||||
index := int(ni.PublicKey()[0])
|
||||
require.True(t, index == 1 || index == 2, "invalid node to get parent header")
|
||||
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = uint32(index)
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = uint32(0)
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
p := New(
|
||||
WithContainerSource(containerSrc),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
})),
|
||||
WithRemoteObjectHeaderFunc(remoteHeadFn),
|
||||
WithLocalObjectHeaderFunc(localHeadFn),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
objInfo := objectcore.Info{
|
||||
Address: chunkAddress,
|
||||
Type: objectSDK.TypeRegular,
|
||||
ECInfo: &objectcore.ECInfo{
|
||||
ParentID: parentID,
|
||||
Index: 0,
|
||||
Total: 3,
|
||||
},
|
||||
}
|
||||
err := p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestECChunkHasInvalidPlacement(t *testing.T) {
|
||||
t.Parallel()
|
||||
chunkAddress := oidtest.Address()
|
||||
parentID := oidtest.ID()
|
||||
chunkObject := objectSDK.New()
|
||||
chunkObject.SetContainerID(chunkAddress.Container())
|
||||
chunkObject.SetID(chunkAddress.Object())
|
||||
chunkObject.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
chunkObject.SetPayloadSize(uint64(10))
|
||||
chunkObject.SetECHeader(objectSDK.NewECHeader(parentID, 1, 3, []byte{}, 0))
|
||||
|
||||
var policy netmapSDK.PlacementPolicy
|
||||
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||
|
||||
cnr := &container.Container{}
|
||||
cnr.Value.Init()
|
||||
cnr.Value.SetPlacementPolicy(policy)
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(chunkAddress.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
}
|
||||
|
||||
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||
for i := range nodes {
|
||||
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||
}
|
||||
|
||||
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
|
||||
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||
}
|
||||
return nil, errors.New("unexpected placement build")
|
||||
}
|
||||
|
||||
objInfo := objectcore.Info{
|
||||
Address: chunkAddress,
|
||||
Type: objectSDK.TypeRegular,
|
||||
ECInfo: &objectcore.ECInfo{
|
||||
ParentID: parentID,
|
||||
Index: 1,
|
||||
Total: 3,
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("node0 has chunk1, node1 has chunk0 and chunk1", func(t *testing.T) {
|
||||
// policer should pull chunk0 on first run and drop chunk1 on second run
|
||||
var allowDrop bool
|
||||
requiredChunkID := oidtest.ID()
|
||||
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||
return chunkObject, nil
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 1
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
ch.Index = 0
|
||||
ch.SetID(requiredChunkID)
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 2
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
require.Fail(t, "unexpected remote HEAD")
|
||||
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||
}
|
||||
|
||||
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||
if allowDrop {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 1
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
ch.SetID(requiredChunkID)
|
||||
ch.Index = 0
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 1
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
var pullCounter atomic.Int64
|
||||
var dropped []oid.Address
|
||||
p := New(
|
||||
WithContainerSource(containerSrc),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
})),
|
||||
WithRemoteObjectHeaderFunc(headFn),
|
||||
WithLocalObjectHeaderFunc(localHeadF),
|
||||
WithReplicator(&testReplicator{
|
||||
handlePullTask: (func(ctx context.Context, r replicator.Task) {
|
||||
require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID &&
|
||||
len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task")
|
||||
pullCounter.Add(1)
|
||||
}),
|
||||
}),
|
||||
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||
require.True(t, allowDrop, "invalid redundent copy call")
|
||||
dropped = append(dropped, a)
|
||||
}),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
err := p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
|
||||
require.Equal(t, 0, len(dropped), "invalid dropped count")
|
||||
allowDrop = true
|
||||
err = p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
|
||||
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||
})
|
||||
|
||||
t.Run("node0 has chunk0 and chunk1, node1 has chunk1", func(t *testing.T) {
|
||||
// policer should drop chunk1
|
||||
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||
return chunkObject, nil
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(chunkAddress.Object())
|
||||
ch.Index = 1
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 2
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
require.Fail(t, "unexpected remote HEAD")
|
||||
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||
}
|
||||
|
||||
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(chunkAddress.Object())
|
||||
ch.Index = 1
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 0
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
var dropped []oid.Address
|
||||
p := New(
|
||||
WithContainerSource(containerSrc),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
})),
|
||||
WithRemoteObjectHeaderFunc(headFn),
|
||||
WithLocalObjectHeaderFunc(localHeadF),
|
||||
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||
dropped = append(dropped, a)
|
||||
}),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
err := p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||
})
|
||||
|
||||
t.Run("node0 has chunk0 and chunk1, node1 has no chunks", func(t *testing.T) {
|
||||
// policer should replicate chunk1 to node1 on first run and drop chunk1 on node0 on second run
|
||||
var secondRun bool
|
||||
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||
if !secondRun {
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
return chunkObject, nil
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(chunkAddress.Object())
|
||||
ch.Index = 1
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 2
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||
a.Object() == parentID && raw {
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
require.Fail(t, "unexpected remote HEAD")
|
||||
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||
}
|
||||
|
||||
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(chunkAddress.Object())
|
||||
ch.Index = 1
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
ch.SetID(oidtest.ID())
|
||||
ch.Index = 0
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
var dropped []oid.Address
|
||||
var replicated []replicator.Task
|
||||
p := New(
|
||||
WithContainerSource(containerSrc),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
})),
|
||||
WithRemoteObjectHeaderFunc(headFn),
|
||||
WithLocalObjectHeaderFunc(localHeadF),
|
||||
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||
dropped = append(dropped, a)
|
||||
}),
|
||||
WithReplicator(&testReplicator{
|
||||
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
|
||||
replicated = append(replicated, t)
|
||||
},
|
||||
}),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
err := p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(dropped), "invalid dropped count")
|
||||
require.Equal(t, 1, len(replicated), "invalid replicated count")
|
||||
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
|
||||
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
|
||||
|
||||
secondRun = true
|
||||
err = p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(replicated), "invalid replicated count")
|
||||
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
|
||||
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
|
||||
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||
})
|
||||
}
|
||||
|
||||
func TestECChunkRestore(t *testing.T) {
|
||||
// node0 has chunk0, node1 has chunk1
|
||||
// policer should replicate chunk0 to node2 on the first run
|
||||
// then restore EC object and replicate chunk2 to node2 on the second run
|
||||
t.Parallel()
|
||||
|
||||
payload := make([]byte, 64)
|
||||
rand.Read(payload)
|
||||
parentAddress := oidtest.Address()
|
||||
parentObject := objectSDK.New()
|
||||
parentObject.SetContainerID(parentAddress.Container())
|
||||
parentObject.SetPayload(payload)
|
||||
parentObject.SetPayloadSize(64)
|
||||
objectSDK.CalculateAndSetPayloadChecksum(parentObject)
|
||||
err := objectSDK.CalculateAndSetID(parentObject)
|
||||
require.NoError(t, err)
|
||||
id, _ := parentObject.ID()
|
||||
parentAddress.SetObject(id)
|
||||
|
||||
chunkIDs := make([]oid.ID, 3)
|
||||
c, err := erasurecode.NewConstructor(2, 1)
|
||||
require.NoError(t, err)
|
||||
key, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
chunks, err := c.Split(parentObject, &key.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
for i, ch := range chunks {
|
||||
chunkIDs[i], _ = ch.ID()
|
||||
}
|
||||
|
||||
var policy netmapSDK.PlacementPolicy
|
||||
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||
|
||||
cnr := &container.Container{}
|
||||
cnr.Value.Init()
|
||||
cnr.Value.SetPlacementPolicy(policy)
|
||||
containerSrc := containerSrc{
|
||||
get: func(id cid.ID) (*container.Container, error) {
|
||||
if id.Equals(parentAddress.Container()) {
|
||||
return cnr, nil
|
||||
}
|
||||
return nil, new(apistatus.ContainerNotFound)
|
||||
},
|
||||
}
|
||||
|
||||
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||
for i := range nodes {
|
||||
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||
}
|
||||
|
||||
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||
if cnr.Equals(parentAddress.Container()) && obj.Equals(parentAddress.Object()) {
|
||||
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||
}
|
||||
return nil, errors.New("unexpected placement build")
|
||||
}
|
||||
var secondRun bool
|
||||
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
require.True(t, raw, "remote header for parent object must be called with raw flag")
|
||||
index := int(ni.PublicKey()[0])
|
||||
require.True(t, index == 1 || index == 2 || index == 3, "invalid node to get parent header")
|
||||
require.True(t, a == parentAddress, "invalid address to get remote header")
|
||||
if index == 1 {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(chunkIDs[1])
|
||||
ch.Index = uint32(1)
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
if index == 2 && secondRun {
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(chunkIDs[0])
|
||||
ch.Index = uint32(0)
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
|
||||
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||
require.True(t, a == parentAddress, "invalid address to get remote header")
|
||||
ei := objectSDK.NewECInfo()
|
||||
var ch objectSDK.ECChunk
|
||||
ch.SetID(chunkIDs[0])
|
||||
ch.Index = uint32(0)
|
||||
ch.Total = 3
|
||||
ei.AddChunk(ch)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
var replicatedObj []*objectSDK.Object
|
||||
p := New(
|
||||
WithContainerSource(containerSrc),
|
||||
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||
return bytes.Equal(k, nodes[0].PublicKey())
|
||||
})),
|
||||
WithRemoteObjectHeaderFunc(remoteHeadFn),
|
||||
WithLocalObjectHeaderFunc(localHeadFn),
|
||||
WithReplicator(&testReplicator{
|
||||
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
|
||||
if t.Obj != nil {
|
||||
replicatedObj = append(replicatedObj, t.Obj)
|
||||
}
|
||||
},
|
||||
}),
|
||||
WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||
require.True(t, a.Container() == parentAddress.Container() && a.Object() == chunkIDs[0], "invalid local object request")
|
||||
return chunks[0], nil
|
||||
}),
|
||||
WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||
index := ni.PublicKey()[0]
|
||||
if index == 2 {
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
return chunks[index], nil
|
||||
}),
|
||||
WithPool(testPool(t)),
|
||||
WithKeyStorage(util.NewKeyStorage(&key.PrivateKey, nil, nil)),
|
||||
)
|
||||
|
||||
var chunkAddress oid.Address
|
||||
chunkAddress.SetContainer(parentAddress.Container())
|
||||
chunkAddress.SetObject(chunkIDs[0])
|
||||
objInfo := objectcore.Info{
|
||||
Address: chunkAddress,
|
||||
Type: objectSDK.TypeRegular,
|
||||
ECInfo: &objectcore.ECInfo{
|
||||
ParentID: parentAddress.Object(),
|
||||
Index: 0,
|
||||
Total: 3,
|
||||
},
|
||||
}
|
||||
err = p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
secondRun = true
|
||||
err = p.processObject(context.Background(), objInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, len(replicatedObj), "invalid replicated objects count")
|
||||
chunks[2].SetSignature(nil)
|
||||
expectedData, err := chunks[2].MarshalJSON()
|
||||
require.NoError(t, err)
|
||||
replicatedObj[0].SetSignature(nil)
|
||||
actualData, err := replicatedObj[0].MarshalJSON()
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, string(expectedData), string(actualData), "invalid restored objects")
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
|
@ -22,7 +23,7 @@ import (
|
|||
// Note that the underlying implementation might be circular: i.e. it can restart
|
||||
// when the end of the key space is reached.
|
||||
type KeySpaceIterator interface {
|
||||
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
||||
Next(context.Context, uint32) ([]objectcore.Info, error)
|
||||
Rewind()
|
||||
}
|
||||
|
||||
|
@ -35,11 +36,20 @@ type BuryFunc func(context.Context, oid.Address) error
|
|||
|
||||
// Replicator is the interface to a consumer of replication tasks.
|
||||
type Replicator interface {
|
||||
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||
HandlePullTask(ctx context.Context, task replicator.Task)
|
||||
HandleLocalPutTask(ctx context.Context, task replicator.Task)
|
||||
}
|
||||
|
||||
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
||||
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||||
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error)
|
||||
|
||||
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
|
||||
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
||||
|
||||
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||||
|
||||
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
||||
|
||||
type cfg struct {
|
||||
headTimeout time.Duration
|
||||
|
@ -56,6 +66,8 @@ type cfg struct {
|
|||
|
||||
remoteHeader RemoteObjectHeaderFunc
|
||||
|
||||
localHeader LocalObjectHeaderFunc
|
||||
|
||||
netmapKeys netmap.AnnouncedKeys
|
||||
|
||||
replicator Replicator
|
||||
|
@ -69,6 +81,12 @@ type cfg struct {
|
|||
evictDuration, sleepDuration time.Duration
|
||||
|
||||
metrics MetricsRegister
|
||||
|
||||
remoteObject RemoteObjectGetFunc
|
||||
|
||||
localObject LocalObjectGetFunc
|
||||
|
||||
keyStorage *util.KeyStorage
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -125,13 +143,32 @@ func WithPlacementBuilder(v placement.Builder) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
|
||||
// WithRemoteObjectHeader returns option to set remote object header receiver of Policer.
|
||||
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteHeader = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer.
|
||||
func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
|
||||
return func(c *cfg) {
|
||||
c.localHeader = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteObject = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
|
||||
return func(c *cfg) {
|
||||
c.localObject = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
||||
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||
return func(c *cfg) {
|
||||
|
@ -169,3 +206,9 @@ func WithMetrics(m MetricsRegister) Option {
|
|||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
||||
func WithKeyStorage(ks *util.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStorage = ks
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import (
|
|||
func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||
// Key space
|
||||
addr := oidtest.Address()
|
||||
objs := []objectcore.AddressWithType{
|
||||
objs := []objectcore.Info{
|
||||
{
|
||||
Address: addr,
|
||||
Type: objectSDK.TypeRegular,
|
||||
|
@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
|
|||
maintenanceNodes []int
|
||||
wantRemoveRedundant bool
|
||||
wantReplicateTo []int
|
||||
ecInfo *objectcore.ECInfo
|
||||
}{
|
||||
{
|
||||
desc: "1 copy already held by local node",
|
||||
|
@ -144,6 +145,22 @@ func TestProcessObject(t *testing.T) {
|
|||
objHolders: []int{1},
|
||||
maintenanceNodes: []int{2},
|
||||
},
|
||||
{
|
||||
desc: "lock object must be replicated to all EC nodes",
|
||||
objType: objectSDK.TypeLock,
|
||||
nodeCount: 3,
|
||||
policy: `EC 1.1`,
|
||||
placement: [][]int{{0, 1, 2}},
|
||||
wantReplicateTo: []int{1, 2},
|
||||
},
|
||||
{
|
||||
desc: "tombstone object must be replicated to all EC nodes",
|
||||
objType: objectSDK.TypeTombstone,
|
||||
nodeCount: 3,
|
||||
policy: `EC 1.1`,
|
||||
placement: [][]int{{0, 1, 2}},
|
||||
wantReplicateTo: []int{1, 2},
|
||||
},
|
||||
}
|
||||
|
||||
for i := range tests {
|
||||
|
@ -173,12 +190,15 @@ func TestProcessObject(t *testing.T) {
|
|||
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
||||
return placementVectors, nil
|
||||
}
|
||||
if ti.ecInfo != nil && cnr.Equals(addr.Container()) && obj != nil && obj.Equals(ti.ecInfo.ParentID) {
|
||||
return placementVectors, nil
|
||||
}
|
||||
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
||||
return nil, errors.New("unexpected placement build")
|
||||
}
|
||||
|
||||
// Object remote header
|
||||
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
index := int(ni.PublicKey()[0])
|
||||
if a != addr || index < 1 || index >= ti.nodeCount {
|
||||
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
||||
|
@ -229,18 +249,21 @@ func TestProcessObject(t *testing.T) {
|
|||
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
|
||||
gotRemoveRedundant = true
|
||||
}),
|
||||
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
|
||||
for _, node := range task.Nodes {
|
||||
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||
}
|
||||
})),
|
||||
WithReplicator(&testReplicator{
|
||||
handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
|
||||
for _, node := range task.Nodes {
|
||||
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||
}
|
||||
},
|
||||
}),
|
||||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
addrWithType := objectcore.AddressWithType{
|
||||
addrWithType := objectcore.Info{
|
||||
Address: addr,
|
||||
Type: ti.objType,
|
||||
ECInfo: ti.ecInfo,
|
||||
}
|
||||
|
||||
err := p.processObject(context.Background(), addrWithType)
|
||||
|
@ -276,7 +299,7 @@ func TestProcessObjectError(t *testing.T) {
|
|||
WithPool(testPool(t)),
|
||||
)
|
||||
|
||||
addrWithType := objectcore.AddressWithType{
|
||||
addrWithType := objectcore.Info{
|
||||
Address: addr,
|
||||
}
|
||||
|
||||
|
@ -285,7 +308,7 @@ func TestProcessObjectError(t *testing.T) {
|
|||
|
||||
func TestIteratorContract(t *testing.T) {
|
||||
addr := oidtest.Address()
|
||||
objs := []objectcore.AddressWithType{{
|
||||
objs := []objectcore.Info{{
|
||||
Address: addr,
|
||||
Type: objectSDK.TypeRegular,
|
||||
}}
|
||||
|
@ -350,7 +373,7 @@ func testPool(t *testing.T) *ants.Pool {
|
|||
}
|
||||
|
||||
type nextResult struct {
|
||||
objs []objectcore.AddressWithType
|
||||
objs []objectcore.Info
|
||||
err error
|
||||
}
|
||||
|
||||
|
@ -361,7 +384,7 @@ type predefinedIterator struct {
|
|||
calls []string
|
||||
}
|
||||
|
||||
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.Info, error) {
|
||||
if it.pos == len(it.scenario) {
|
||||
close(it.finishCh)
|
||||
<-ctx.Done()
|
||||
|
@ -380,11 +403,11 @@ func (it *predefinedIterator) Rewind() {
|
|||
|
||||
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
||||
type sliceKeySpaceIterator struct {
|
||||
objs []objectcore.AddressWithType
|
||||
objs []objectcore.Info
|
||||
cur int
|
||||
}
|
||||
|
||||
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.Info, error) {
|
||||
if it.cur >= len(it.objs) {
|
||||
return nil, engine.ErrEndOfListing
|
||||
}
|
||||
|
@ -419,9 +442,20 @@ type announcedKeysFunc func([]byte) bool
|
|||
|
||||
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
||||
|
||||
// replicatorFunc is a Replicator backed by a function.
|
||||
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
||||
|
||||
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||
f(ctx, task, res)
|
||||
type testReplicator struct {
|
||||
handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||
handleLocalPutTask func(ctx context.Context, task replicator.Task)
|
||||
handlePullTask func(ctx context.Context, task replicator.Task)
|
||||
}
|
||||
|
||||
func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||
r.handleReplicationTask(ctx, task, res)
|
||||
}
|
||||
|
||||
func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) {
|
||||
r.handleLocalPutTask(ctx, task)
|
||||
}
|
||||
|
||||
func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) {
|
||||
r.handlePullTask(ctx, task)
|
||||
}
|
||||
|
|
|
@ -21,9 +21,9 @@ type TaskResult interface {
|
|||
SubmitSuccessfulReplication(netmap.NodeInfo)
|
||||
}
|
||||
|
||||
// HandleTask executes replication task inside invoking goroutine.
|
||||
// HandleReplicationTask executes replication task inside invoking goroutine.
|
||||
// Passes all the nodes that accepted the replication to the TaskResult.
|
||||
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
|
||||
func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res TaskResult) {
|
||||
p.metrics.IncInFlightRequest()
|
||||
defer p.metrics.DecInFlightRequest()
|
||||
defer func() {
|
||||
|
@ -32,7 +32,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
|||
)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask",
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleReplicateTask",
|
||||
trace.WithAttributes(
|
||||
attribute.Stringer("address", task.Addr),
|
||||
attribute.Int64("number_of_copies", int64(task.NumCopies)),
|
||||
|
|
72
pkg/services/replicator/pull.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
package replicator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node")
|
||||
|
||||
func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
|
||||
p.metrics.IncInFlightRequest()
|
||||
defer p.metrics.DecInFlightRequest()
|
||||
defer func() {
|
||||
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandlePullTask",
|
||||
trace.WithAttributes(
|
||||
attribute.Stringer("address", task.Addr),
|
||||
attribute.Int("nodes_count", len(task.Nodes)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var obj *objectSDK.Object
|
||||
|
||||
for _, node := range task.Nodes {
|
||||
var err error
|
||||
obj, err = p.remoteGetter.Get(ctx, getsvc.RemoteGetPrm{
|
||||
Address: task.Addr,
|
||||
Node: node,
|
||||
})
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
var endpoints []string
|
||||
node.IterateNetworkEndpoints(func(s string) bool {
|
||||
endpoints = append(endpoints, s)
|
||||
return false
|
||||
})
|
||||
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
||||
zap.Stringer("object", task.Addr),
|
||||
zap.Error(err),
|
||||
zap.Strings("endpoints", endpoints),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
|
||||
if obj == nil {
|
||||
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
||||
zap.Stringer("object", task.Addr),
|
||||
zap.Error(errFailedToGetObjectFromAnyNode),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return
|
||||
}
|
||||
|
||||
err := engine.Put(ctx, p.localStorage, obj)
|
||||
if err != nil {
|
||||
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||
zap.Stringer("object", task.Addr),
|
||||
zap.Error(err),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
}
|
47
pkg/services/replicator/put.go
Normal file
|
@ -0,0 +1,47 @@
|
|||
package replicator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errObjectNotDefined = errors.New("object is not defined")
|
||||
|
||||
func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
|
||||
p.metrics.IncInFlightRequest()
|
||||
defer p.metrics.DecInFlightRequest()
|
||||
defer func() {
|
||||
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleLocalPutTask",
|
||||
trace.WithAttributes(
|
||||
attribute.Stringer("address", task.Addr),
|
||||
attribute.Int("nodes_count", len(task.Nodes)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if task.Obj == nil {
|
||||
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||
zap.Stringer("object", task.Addr),
|
||||
zap.Error(errObjectNotDefined),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return
|
||||
}
|
||||
|
||||
err := engine.Put(ctx, p.localStorage, task.Obj)
|
||||
if err != nil {
|
||||
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||
zap.Stringer("object", task.Addr),
|
||||
zap.Error(err),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
}
|
|
@ -4,6 +4,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
|
@ -25,6 +26,8 @@ type cfg struct {
|
|||
|
||||
remoteSender *putsvc.RemoteSender
|
||||
|
||||
remoteGetter *getsvc.RemoteGetter
|
||||
|
||||
localStorage *engine.StorageEngine
|
||||
|
||||
metrics MetricsRegister
|
||||
|
@ -70,6 +73,12 @@ func WithRemoteSender(v *putsvc.RemoteSender) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithRemoteGetter(v *getsvc.RemoteGetter) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteGetter = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalStorage returns option to set local object storage of Replicator.
|
||||
func WithLocalStorage(v *engine.StorageEngine) Option {
|
||||
return func(c *cfg) {
|
||||
|
|
Why have you decided to go with 2 func options instead of an interface?
Because this approach has already been used in policer.