Add EC replication #1129
|
@ -13,7 +13,7 @@ type keySpaceIterator struct {
|
||||||
cur *engine.Cursor
|
cur *engine.Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
|
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.Info, error) {
|
||||||
var prm engine.ListWithCursorPrm
|
var prm engine.ListWithCursorPrm
|
||||||
prm.WithCursor(it.cur)
|
prm.WithCursor(it.cur)
|
||||||
prm.WithCount(batchSize)
|
prm.WithCount(batchSize)
|
||||||
|
|
|
@ -29,7 +29,6 @@ import (
|
||||||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
||||||
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
|
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
||||||
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
||||||
|
@ -231,7 +230,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
|
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
||||||
|
|
||||||
pol := policer.New(
|
pol := policer.New(
|
||||||
policer.WithLogger(c.log),
|
policer.WithLogger(c.log),
|
||||||
|
@ -242,11 +241,33 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
||||||
),
|
),
|
||||||
policer.WithRemoteObjectHeaderFunc(
|
policer.WithRemoteObjectHeaderFunc(
|
||||||
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw)
|
||||||
return remoteHeader.Head(ctx, prm)
|
return remoteReader.Head(ctx, prm)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
var prm engine.HeadPrm
|
||||||
fyrchik marked this conversation as resolved
|
|||||||
|
prm.WithAddress(a)
|
||||||
|
res, err := c.cfgObject.cfgLocalStorage.localStorage.Head(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return res.Header(), nil
|
||||||
|
}),
|
||||||
|
policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
||||||
|
return remoteReader.Get(ctx, prm)
|
||||||
|
}),
|
||||||
|
policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
var prm engine.GetPrm
|
||||||
|
prm.WithAddress(a)
|
||||||
|
res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return res.Object(), nil
|
||||||
|
}),
|
||||||
policer.WithNetmapKeys(c),
|
policer.WithNetmapKeys(c),
|
||||||
policer.WithHeadTimeout(
|
policer.WithHeadTimeout(
|
||||||
policerconfig.HeadTimeout(c.appCfg),
|
policerconfig.HeadTimeout(c.appCfg),
|
||||||
|
@ -265,6 +286,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
}),
|
}),
|
||||||
policer.WithPool(c.cfgObject.pool.replication),
|
policer.WithPool(c.cfgObject.pool.replication),
|
||||||
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
||||||
|
policer.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
|
||||||
c.workers = append(c.workers, worker{
|
c.workers = append(c.workers, worker{
|
||||||
|
@ -297,6 +319,9 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
||||||
replicator.WithRemoteSender(
|
replicator.WithRemoteSender(
|
||||||
putsvc.NewRemoteSender(keyStorage, cache),
|
putsvc.NewRemoteSender(keyStorage, cache),
|
||||||
),
|
),
|
||||||
|
replicator.WithRemoteGetter(
|
||||||
|
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
|
||||||
|
),
|
||||||
replicator.WithMetrics(c.metricsCollector.Replicator()),
|
replicator.WithMetrics(c.metricsCollector.Replicator()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -529,4 +529,16 @@ const (
|
||||||
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
||||||
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||||
ECFailedToSaveECPart = "failed to save EC part"
|
ECFailedToSaveECPart = "failed to save EC part"
|
||||||
|
PolicerNodeIsNotECObjectNode = "current node is not EC object node"
|
||||||
|
PolicerFailedToGetLocalECChunks = "failed to get local EC chunks"
|
||||||
|
PolicerMissingECChunk = "failed to find EC chunk on any of the nodes"
|
||||||
|
PolicerFailedToDecodeECChunkID = "failed to decode EC chunk ID"
|
||||||
|
PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk"
|
||||||
|
ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage"
|
||||||
|
ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage"
|
||||||
|
PolicerCouldNotGetObjectFromNodeMoving = "could not get EC object from the node, moving current chunk to the node"
|
||||||
|
PolicerCouldNotRestoreObjectNotEnoughChunks = "could not restore EC object: not enough chunks"
|
||||||
|
PolicerFailedToRestoreObject = "failed to restore EC object"
|
||||||
|
PolicerCouldNotGetChunk = "could not get EC chunk"
|
||||||
|
PolicerCouldNotGetChunks = "could not get EC chunks"
|
||||||
)
|
)
|
||||||
|
|
|
@ -167,17 +167,29 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
|
||||||
token := obj.SessionToken()
|
token := obj.SessionToken()
|
||||||
ownerID := obj.OwnerID()
|
ownerID := obj.OwnerID()
|
||||||
|
|
||||||
|
if token == nil && obj.ECHeader() != nil {
|
||||||
|
role, err := v.isIROrContainerNode(obj, binKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if role == acl.RoleContainer {
|
||||||
|
// EC part could be restored or created by container node, so ownerID could not match object signature
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return v.checkOwnerKey(ownerID, key)
|
||||||
|
}
|
||||||
|
|
||||||
if token == nil || !token.AssertAuthKey(&key) {
|
if token == nil || !token.AssertAuthKey(&key) {
|
||||||
return v.checkOwnerKey(ownerID, key)
|
return v.checkOwnerKey(ownerID, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.verifyTokenIssuer {
|
if v.verifyTokenIssuer {
|
||||||
signerIsIROrContainerNode, err := v.isIROrContainerNode(obj, binKey)
|
role, err := v.isIROrContainerNode(obj, binKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if signerIsIROrContainerNode {
|
if role == acl.RoleContainer || role == acl.RoleInnerRing {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,10 +202,10 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (bool, error) {
|
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (acl.Role, error) {
|
||||||
cnrID, containerIDSet := obj.ContainerID()
|
cnrID, containerIDSet := obj.ContainerID()
|
||||||
if !containerIDSet {
|
if !containerIDSet {
|
||||||
return false, errNilCID
|
return acl.RoleOthers, errNilCID
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrIDBin := make([]byte, sha256.Size)
|
cnrIDBin := make([]byte, sha256.Size)
|
||||||
|
@ -201,14 +213,14 @@ func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey [
|
||||||
|
|
||||||
cnr, err := v.containers.Get(cnrID)
|
cnr, err := v.containers.Get(cnrID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
|
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return acl.RoleOthers, err
|
||||||
}
|
}
|
||||||
return res.Role == acl.RoleContainer || res.Role == acl.RoleInnerRing, nil
|
return res.Role, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {
|
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {
|
||||||
|
|
|
@ -7,14 +7,21 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddressWithType groups object address with its FrostFS
|
type ECInfo struct {
|
||||||
// object type.
|
ParentID oid.ID
|
||||||
type AddressWithType struct {
|
Index uint32
|
||||||
|
Total uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
// Info groups object address with its FrostFS
|
||||||
|
// object info.
|
||||||
|
type Info struct {
|
||||||
Address oid.Address
|
Address oid.Address
|
||||||
Type objectSDK.Type
|
Type objectSDK.Type
|
||||||
IsLinkingObject bool
|
IsLinkingObject bool
|
||||||
|
ECInfo *ECInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v AddressWithType) String() string {
|
func (v Info) String() string {
|
||||||
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
||||||
}
|
}
|
|
@ -622,7 +622,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
||||||
return shards, nil
|
return shards, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||||
|
|
|
@ -68,12 +68,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||||
|
|
||||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||||
type ListWithCursorRes struct {
|
type ListWithCursorRes struct {
|
||||||
addrList []objectcore.AddressWithType
|
addrList []objectcore.Info
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
func (l ListWithCursorRes) AddressList() []objectcore.Info {
|
||||||
return l.addrList
|
return l.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
// parameter set to zero.
|
// parameter set to zero.
|
||||||
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
result := make([]objectcore.Info, 0, prm.count)
|
||||||
|
|
||||||
// Set initial cursors
|
// Set initial cursors
|
||||||
cursor := prm.cursor
|
cursor := prm.cursor
|
||||||
|
|
|
@ -76,8 +76,8 @@ func TestListWithCursor(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
expected := make([]object.Info, 0, tt.objectNum)
|
||||||
got := make([]object.AddressWithType, 0, tt.objectNum)
|
got := make([]object.Info, 0, tt.objectNum)
|
||||||
|
|
||||||
for i := 0; i < tt.objectNum; i++ {
|
for i := 0; i < tt.objectNum; i++ {
|
||||||
containerID := cidtest.ID()
|
containerID := cidtest.ID()
|
||||||
|
@ -88,7 +88,7 @@ func TestListWithCursor(t *testing.T) {
|
||||||
|
|
||||||
err := e.Put(context.Background(), prm)
|
err := e.Put(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
expected = append(expected, object.Info{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm ListWithCursorPrm
|
var prm ListWithCursorPrm
|
||||||
|
|
|
@ -47,12 +47,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) {
|
||||||
|
|
||||||
// ListRes contains values returned from ListWithCursor operation.
|
// ListRes contains values returned from ListWithCursor operation.
|
||||||
type ListRes struct {
|
type ListRes struct {
|
||||||
addrList []objectcore.AddressWithType
|
addrList []objectcore.Info
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (l ListRes) AddressList() []objectcore.AddressWithType {
|
func (l ListRes) AddressList() []objectcore.Info {
|
||||||
return l.addrList
|
return l.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
result := make([]objectcore.Info, 0, prm.count)
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
||||||
|
@ -99,7 +99,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
|
||||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
threshold := cursor == nil // threshold is a flag to ignore cursor
|
||||||
var bucketName []byte
|
var bucketName []byte
|
||||||
var err error
|
var err error
|
||||||
|
@ -183,11 +183,11 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||||
cidRaw []byte, // container ID prefix, optimization
|
cidRaw []byte, // container ID prefix, optimization
|
||||||
cnt cid.ID, // container ID
|
cnt cid.ID, // container ID
|
||||||
to []objectcore.AddressWithType, // listing result
|
to []objectcore.Info, // listing result
|
||||||
limit int, // stop listing at `limit` items in result
|
limit int, // stop listing at `limit` items in result
|
||||||
cursor *Cursor, // start from cursor object
|
cursor *Cursor, // start from cursor object
|
||||||
threshold bool, // ignore cursor and start immediately
|
threshold bool, // ignore cursor and start immediately
|
||||||
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
|
) ([]objectcore.Info, []byte, *Cursor, error) {
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
}
|
}
|
||||||
|
@ -219,18 +219,27 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
var isLinkingObj bool
|
var isLinkingObj bool
|
||||||
|
var ecInfo *objectcore.ECInfo
|
||||||
if objType == objectSDK.TypeRegular {
|
if objType == objectSDK.TypeRegular {
|
||||||
var o objectSDK.Object
|
var o objectSDK.Object
|
||||||
if err := o.Unmarshal(v); err != nil {
|
if err := o.Unmarshal(v); err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = isLinkObject(&o)
|
||||||
|
ecHeader := o.ECHeader()
|
||||||
|
if ecHeader != nil {
|
||||||
|
ecInfo = &objectcore.ECInfo{
|
||||||
|
ParentID: ecHeader.Parent(),
|
||||||
|
Index: ecHeader.Index(),
|
||||||
|
Total: ecHeader.Total(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var a oid.Address
|
var a oid.Address
|
||||||
a.SetContainer(cnt)
|
a.SetContainer(cnt)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
|
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
total = containers * 4 // regular + ts + child + lock
|
total = containers * 4 // regular + ts + child + lock
|
||||||
)
|
)
|
||||||
|
|
||||||
expected := make([]object.AddressWithType, 0, total)
|
expected := make([]object.Info, 0, total)
|
||||||
|
|
||||||
// fill metabase with objects
|
// fill metabase with objects
|
||||||
for i := 0; i < containers; i++ {
|
for i := 0; i < containers; i++ {
|
||||||
|
@ -88,21 +88,21 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
obj.SetType(objectSDK.TypeRegular)
|
obj.SetType(objectSDK.TypeRegular)
|
||||||
err := putBig(db, obj)
|
err := putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||||
|
|
||||||
// add one tombstone
|
// add one tombstone
|
||||||
obj = testutil.GenerateObjectWithCID(containerID)
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
obj.SetType(objectSDK.TypeTombstone)
|
obj.SetType(objectSDK.TypeTombstone)
|
||||||
err = putBig(db, obj)
|
err = putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
|
||||||
|
|
||||||
// add one lock
|
// add one lock
|
||||||
obj = testutil.GenerateObjectWithCID(containerID)
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
obj.SetType(objectSDK.TypeLock)
|
obj.SetType(objectSDK.TypeLock)
|
||||||
err = putBig(db, obj)
|
err = putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
|
||||||
|
|
||||||
// add one inhumed (do not include into expected)
|
// add one inhumed (do not include into expected)
|
||||||
obj = testutil.GenerateObjectWithCID(containerID)
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
@ -124,12 +124,12 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
child.SetSplitID(splitID)
|
child.SetSplitID(splitID)
|
||||||
err = putBig(db, child)
|
err = putBig(db, child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("success with various count", func(t *testing.T) {
|
t.Run("success with various count", func(t *testing.T) {
|
||||||
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
||||||
got := make([]object.AddressWithType, 0, total)
|
got := make([]object.Info, 0, total)
|
||||||
|
|
||||||
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
|
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
|
||||||
require.NoError(t, err, "count:%d", countPerReq)
|
require.NoError(t, err, "count:%d", countPerReq)
|
||||||
|
@ -211,7 +211,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) {
|
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.Info, *meta.Cursor, error) {
|
||||||
var listPrm meta.ListPrm
|
var listPrm meta.ListPrm
|
||||||
listPrm.SetCount(count)
|
listPrm.SetCount(count)
|
||||||
listPrm.SetCursor(cursor)
|
listPrm.SetCursor(cursor)
|
||||||
|
|
|
@ -42,7 +42,7 @@ type ListWithCursorPrm struct {
|
||||||
|
|
||||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||||
type ListWithCursorRes struct {
|
type ListWithCursorRes struct {
|
||||||
addrList []objectcore.AddressWithType
|
addrList []objectcore.Info
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
func (r ListWithCursorRes) AddressList() []objectcore.Info {
|
||||||
return r.addrList
|
return r.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
|
||||||
Obj: obj,
|
Obj: obj,
|
||||||
Nodes: nodes,
|
Nodes: nodes,
|
||||||
}
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`HandleReplicationTask`? `handle` is a verb, `replicate` is a verb too
dstepanov-yadro
commented
Done Done
|
|||||||
s.replicator.HandleTask(ctx, task, &res)
|
s.replicator.HandleReplicationTask(ctx, task, &res)
|
||||||
|
|
||||||
if res.count == 0 {
|
if res.count == 0 {
|
||||||
return errors.New("object was not replicated")
|
return errors.New("object was not replicated")
|
||||||
|
|
55
pkg/services/object/get/remote_getter.go
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RemoteGetPrm struct {
|
||||||
|
Address oid.Address
|
||||||
|
Node netmapSDK.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoteGetter struct {
|
||||||
|
s remoteStorageConstructor
|
||||||
|
es epochSource
|
||||||
|
ks keyStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *RemoteGetter) Get(ctx context.Context, prm RemoteGetPrm) (*objectSDK.Object, error) {
|
||||||
|
var nodeInfo client.NodeInfo
|
||||||
|
if err := client.NodeInfoFromRawNetmapElement(&nodeInfo, netmapCore.Node(prm.Node)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rs, err := g.s.Get(nodeInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
epoch, err := g.es.Epoch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
key, err := g.ks.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r := RemoteRequestParams{
|
||||||
|
Epoch: epoch,
|
||||||
|
TTL: 1,
|
||||||
|
PrivateKey: key,
|
||||||
|
}
|
||||||
|
return rs.Get(ctx, prm.Address, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRemoteGetter(cc clientConstructor, es epochSource, ks keyStorage) *RemoteGetter {
|
||||||
|
return &RemoteGetter{
|
||||||
|
s: &multiclientRemoteStorageConstructor{clientConstructor: cc},
|
||||||
|
es: es,
|
||||||
|
ks: ks,
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,17 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Prm struct {
|
|
||||||
addr oid.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Prm) WithAddress(v oid.Address) *Prm {
|
|
||||||
if p != nil {
|
|
||||||
p.addr = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
|
@ -34,10 +34,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||||
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
|
||||||
errInvalidECObject = errors.New("object must be splitted to EC parts")
|
|
||||||
)
|
|
||||||
|
|
||||||
type putSingleRequestSigner struct {
|
type putSingleRequestSigner struct {
|
||||||
req *objectAPI.PutSingleRequest
|
req *objectAPI.PutSingleRequest
|
||||||
|
@ -181,10 +178,6 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
|
|
||||||
return errInvalidECObject
|
|
||||||
}
|
|
||||||
|
|
||||||
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package headsvc
|
package object
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -18,18 +18,18 @@ type ClientConstructor interface {
|
||||||
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
|
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteHeader represents utility for getting
|
// RemoteReader represents utility for getting
|
||||||
// the object header from a remote host.
|
// the object from a remote host.
|
||||||
type RemoteHeader struct {
|
type RemoteReader struct {
|
||||||
keyStorage *util.KeyStorage
|
keyStorage *util.KeyStorage
|
||||||
|
|
||||||
clientCache ClientConstructor
|
clientCache ClientConstructor
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteHeadPrm groups remote header operation parameters.
|
// RemoteRequestPrm groups remote operation parameters.
|
||||||
type RemoteHeadPrm struct {
|
type RemoteRequestPrm struct {
|
||||||
commonHeadPrm *Prm
|
addr oid.Address
|
||||||
|
raw bool
|
||||||
node netmap.NodeInfo
|
node netmap.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,16 +37,16 @@ const remoteOpTTL = 1
|
||||||
|
|
||||||
var ErrNotFound = errors.New("object header not found")
|
var ErrNotFound = errors.New("object header not found")
|
||||||
|
|
||||||
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
|
// NewRemoteReader creates, initializes and returns new RemoteHeader instance.
|
||||||
func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteHeader {
|
func NewRemoteReader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteReader {
|
||||||
return &RemoteHeader{
|
return &RemoteReader{
|
||||||
keyStorage: keyStorage,
|
keyStorage: keyStorage,
|
||||||
clientCache: cache,
|
clientCache: cache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeInfo sets information about the remote node.
|
// WithNodeInfo sets information about the remote node.
|
||||||
func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
|
func (p *RemoteRequestPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteRequestPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.node = v
|
p.node = v
|
||||||
}
|
}
|
||||||
|
@ -55,16 +55,23 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithObjectAddress sets object address.
|
// WithObjectAddress sets object address.
|
||||||
func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm {
|
func (p *RemoteRequestPrm) WithObjectAddress(v oid.Address) *RemoteRequestPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.commonHeadPrm = new(Prm).WithAddress(v)
|
p.addr = v
|
||||||
}
|
}
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *RemoteRequestPrm) WithRaw(v bool) *RemoteRequestPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.raw = v
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
// Head requests object header from the remote node.
|
// Head requests object header from the remote node.
|
||||||
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) {
|
func (h *RemoteReader) Head(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
|
||||||
key, err := h.keyStorage.GetKey(nil)
|
key, err := h.keyStorage.GetKey(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||||
|
@ -86,8 +93,11 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
|
||||||
|
|
||||||
headPrm.SetClient(c)
|
headPrm.SetClient(c)
|
||||||
headPrm.SetPrivateKey(key)
|
headPrm.SetPrivateKey(key)
|
||||||
headPrm.SetAddress(prm.commonHeadPrm.addr)
|
headPrm.SetAddress(prm.addr)
|
||||||
headPrm.SetTTL(remoteOpTTL)
|
headPrm.SetTTL(remoteOpTTL)
|
||||||
|
if prm.raw {
|
||||||
|
headPrm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
res, err := internalclient.HeadObject(ctx, headPrm)
|
res, err := internalclient.HeadObject(ctx, headPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -96,3 +106,39 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
|
||||||
|
|
||||||
return res.Header(), nil
|
return res.Header(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *RemoteReader) Get(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
|
||||||
|
key, err := h.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
|
err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(prm.node))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := h.clientCache.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var getPrm internalclient.GetObjectPrm
|
||||||
|
|
||||||
|
getPrm.SetClient(c)
|
||||||
|
getPrm.SetPrivateKey(key)
|
||||||
|
getPrm.SetAddress(prm.addr)
|
||||||
|
getPrm.SetTTL(remoteOpTTL)
|
||||||
|
if prm.raw {
|
||||||
|
getPrm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.GetObject(ctx, getPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Object(), nil
|
||||||
|
}
|
|
@ -18,19 +18,15 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error {
|
func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
|
||||||
addr := addrWithType.Address
|
cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
|
||||||
idCnr := addr.Container()
|
|
||||||
idObj := addr.Object()
|
|
||||||
|
|
||||||
cnr, err := p.cnrSrc.Get(idCnr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrContainerNotFound(err) {
|
if client.IsErrContainerNotFound(err) {
|
||||||
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr)
|
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container())
|
||||||
if errWasRemoved != nil {
|
if errWasRemoved != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
|
||||||
} else if existed {
|
} else if existed {
|
||||||
err := p.buryFn(ctx, addrWithType.Address)
|
err := p.buryFn(ctx, objInfo.Address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
|
||||||
}
|
}
|
||||||
|
@ -41,11 +37,16 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
}
|
}
|
||||||
|
|
||||||
policy := cnr.Value.PlacementPolicy()
|
policy := cnr.Value.PlacementPolicy()
|
||||||
if policycore.IsECPlacement(policy) {
|
|
||||||
// EC not supported yet by policer
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if policycore.IsECPlacement(policy) {
|
||||||
|
return p.processECContainerObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
return p.processRepContainerObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
idObj := objInfo.Address.Object()
|
||||||
|
idCnr := objInfo.Address.Container()
|
||||||
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||||
|
@ -53,11 +54,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
|
|
||||||
c := &placementRequirements{}
|
c := &placementRequirements{}
|
||||||
|
|
||||||
var numOfContainerNodes int
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Seems like a separate fix, unrelated to the commit Seems like a separate fix, unrelated to the commit
dstepanov-yadro
commented
Done Done
|
|||||||
for i := range nn {
|
|
||||||
numOfContainerNodes += len(nn[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
// cached info about already checked nodes
|
// cached info about already checked nodes
|
||||||
checkedNodes := newNodeCache()
|
checkedNodes := newNodeCache()
|
||||||
|
|
||||||
|
@ -68,15 +64,24 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
p.processNodes(ctx, c, addrWithType, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes)
|
shortage := policy.ReplicaDescriptor(i).NumberOfObjects()
|
||||||
|
if objInfo.Type == objectSDK.TypeLock || objInfo.Type == objectSDK.TypeTombstone || objInfo.IsLinkingObject {
|
||||||
|
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
||||||
|
// for correct object removal protection:
|
||||||
|
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
||||||
|
// - `LOCK` object removal is a prohibited action in the GC.
|
||||||
|
shortage = uint32(len(nn[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
p.processRepNodes(ctx, c, objInfo, nn[i], shortage, checkedNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.needLocalCopy && c.removeLocalCopy {
|
if !c.needLocalCopy && c.removeLocalCopy {
|
||||||
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
||||||
zap.Stringer("object", addr),
|
zap.Stringer("object", objInfo.Address),
|
||||||
)
|
)
|
||||||
|
|
||||||
p.cbRedundantCopy(ctx, addr)
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -89,23 +94,13 @@ type placementRequirements struct {
|
||||||
removeLocalCopy bool
|
removeLocalCopy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
|
func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info,
|
||||||
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache,
|
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache,
|
||||||
) {
|
) {
|
||||||
addr := addrWithType.Address
|
addr := objInfo.Address
|
||||||
typ := addrWithType.Type
|
|
||||||
|
|
||||||
// Number of copies that are stored on maintenance nodes.
|
// Number of copies that are stored on maintenance nodes.
|
||||||
var uncheckedCopies int
|
var uncheckedCopies int
|
||||||
|
|
||||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
|
|
||||||
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
|
||||||
// for correct object removal protection:
|
|
||||||
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
|
||||||
// - `LOCK` object removal is a prohibited action in the GC.
|
|
||||||
shortage = uint32(len(nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -133,7 +128,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
||||||
|
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
|
|
||||||
_, err := p.remoteHeader(callCtx, nodes[i], addr)
|
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -196,7 +191,7 @@ func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address
|
||||||
Nodes: nodes,
|
Nodes: nodes,
|
||||||
}
|
}
|
||||||
|
|
||||||
p.replicator.HandleTask(ctx, task, checkedNodes)
|
p.replicator.HandleReplicationTask(ctx, task, checkedNodes)
|
||||||
|
|
||||||
case uncheckedCopies > 0:
|
case uncheckedCopies > 0:
|
||||||
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
||||||
|
|
390
pkg/services/policer/ec.go
Normal file
|
@ -0,0 +1,390 @@
|
||||||
|
package policer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errNoECinfoReturnded = errors.New("no EC info returned")
|
||||||
|
|
||||||
|
type ecChunkProcessResult struct {
|
||||||
|
validPlacement bool
|
||||||
|
removeLocal bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector")
|
||||||
|
|
||||||
|
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
if objInfo.ECInfo == nil {
|
||||||
|
return p.processECContainerRepObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
return p.processECContainerECObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
|
||||||
|
// All of them must be stored on all of the container nodes.
|
||||||
|
func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
objID := objInfo.Address.Object()
|
||||||
|
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objID, policy)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||||
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
The distinction between `len(nn)` is currently 1 for all EC policies.
Can we simplify the code having this in mind?
Because when it won't be 1, we will also likely have different `EC X.Y` or `REP N` descriptors for different indexes in this array.
The distinction between `REP`/`EC` could be done based on each `nn` element, not on the uppermost level.
dstepanov-yadro
commented
fixed fixed
|
|||||||
|
if len(nn) != 1 {
|
||||||
|
return errInvalidECPlacement
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &placementRequirements{}
|
||||||
|
checkedNodes := newNodeCache()
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Could you elaborate a bit what this comment is trying to explain and why it is here? Could you elaborate a bit what this comment is trying to explain and why it is here?
dstepanov-yadro
commented
Refactored, see last commit. Refactored, see last commit.
|
|||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
p.processRepNodes(ctx, c, objInfo, nn[0], uint32(len(nn[0])), checkedNodes)
|
||||||
|
|
||||||
|
if !c.needLocalCopy && c.removeLocalCopy {
|
||||||
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
||||||
|
zap.Stringer("object", objInfo.Address),
|
||||||
|
)
|
||||||
|
|
||||||
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||||
|
}
|
||||||
|
if len(nn) != 1 {
|
||||||
|
return errInvalidECPlacement
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
res := p.processECChunk(ctx, objInfo, nn[0])
|
||||||
|
if !res.validPlacement {
|
||||||
|
// drop local chunk only if all required chunks are in place
|
||||||
|
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0])
|
||||||
|
}
|
||||||
|
p.adjustECPlacement(ctx, objInfo, nn[0], policy)
|
||||||
|
|
||||||
|
if res.removeLocal {
|
||||||
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
|
||||||
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processECChunk replicates EC chunk if needed.
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It returns a It returns a `struct`.
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) ecChunkProcessResult {
|
||||||
|
var removeLocalChunk bool
|
||||||
|
requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))]
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) {
|
||||||
|
// current node is required node, we are happy
|
||||||
|
return ecChunkProcessResult{
|
||||||
|
validPlacement: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if requiredNode.IsMaintenance() {
|
||||||
|
// consider maintenance mode has object, but do not drop local copy
|
||||||
|
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
||||||
|
return ecChunkProcessResult{}
|
||||||
|
}
|
||||||
|
|
||||||
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
|
_, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address, false)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
removeLocalChunk = true
|
||||||
|
} else if client.IsErrObjectNotFound(err) {
|
||||||
|
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", objInfo.Address), zap.Uint32("shortage", 1))
|
||||||
|
task := replicator.Task{
|
||||||
|
NumCopies: 1,
|
||||||
|
Addr: objInfo.Address,
|
||||||
|
Nodes: []netmap.NodeInfo{requiredNode},
|
||||||
|
}
|
||||||
|
p.replicator.HandleReplicationTask(ctx, task, newNodeCache())
|
||||||
|
} else if isClientErrMaintenance(err) {
|
||||||
|
// consider maintenance mode has object, but do not drop local copy
|
||||||
|
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
||||||
|
} else {
|
||||||
|
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return ecChunkProcessResult{
|
||||||
|
removeLocal: removeLocalChunk,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool {
|
||||||
|
var parentAddress oid.Address
|
||||||
|
parentAddress.SetContainer(objInfo.Address.Container())
|
||||||
|
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||||
|
|
||||||
|
requiredChunkIndexes := p.collectRequiredECChunks(nodes, objInfo)
|
||||||
|
if len(requiredChunkIndexes) == 0 {
|
||||||
|
p.log.Info(logs.PolicerNodeIsNotECObjectNode, zap.Stringer("object", objInfo.ECInfo.ParentID))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
err := p.resolveLocalECChunks(ctx, parentAddress, requiredChunkIndexes)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToGetLocalECChunks, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if len(requiredChunkIndexes) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
indexToObjectID := make(map[uint32]oid.ID)
|
||||||
|
success := p.resolveRemoteECChunks(ctx, parentAddress, nodes, requiredChunkIndexes, indexToObjectID)
|
||||||
|
if !success {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, candidates := range requiredChunkIndexes {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(objInfo.Address.Container())
|
||||||
|
addr.SetObject(indexToObjectID[index])
|
||||||
|
p.replicator.HandlePullTask(ctx, replicator.Task{
|
||||||
aarifullin
commented
Just for curiosity: do we always need to pull missing chunks from remote node? I suppose this is not a big deal and barely gives a big profit... Just for curiosity: do we *always* need to pull missing chunks from remote node?
`processECChunk` sets `removeLocal = true` but a chunk, that is being replicated, is not removed yet until `pullRequiredECChunks`'s run. Could a policer try to `reconstruct` missing chunk if the local node, *by good fortune*, keeps required chunks for reconstruction?
I suppose this is not a big deal and barely gives a big profit...
dstepanov-yadro
commented
By default local node should keep only one chunk. By default local node should keep only one chunk.
|
|||||||
|
Addr: addr,
|
||||||
|
Nodes: candidates,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// there was some missing chunks, it's not ok
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) collectRequiredECChunks(nodes []netmap.NodeInfo, objInfo objectcore.Info) map[uint32][]netmap.NodeInfo {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
By design there is only 1 required chunk to store on the node (EC policy implies _By design_ there is only 1 required chunk to store on the node (EC policy implies `UNIQUE`).
Why do we return a map here? It seems the code could be greatly simplified, having this in mind.
dstepanov-yadro
commented
Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks. Not always: evacuation or not completed replication may lead that chunk may be stored on different chunks.
fyrchik
commented
Evacuation does not affect the set of required chunks in any way. Evacuation does not affect the set of required chunks in any way.
dstepanov-yadro
commented
Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy. Agree, current implementation does not affect. But I think that in case of evacuation from a node, EC chunk should move to another node, even if it does not comply with the storage policy.
fyrchik
commented
Yes, but it is moved to a node which should not store it, and I have thought this function determines whether the node should store it? Yes, but it is moved to a node which _should not_ store it, and I have thought this function determines whether the node _should_ store it?
Another way to look at it: policer _must_ depend only on the network map and placement policy, evacuation is a separate process.
fyrchik
commented
Actually, it may happen in a situation when we have Actually, it may happen in a situation when we have `EC 3.1` and 1 node went offline, so we have 3 nodes in the network map.
What is the behaviour in this case?
dstepanov-yadro
commented
This function collects all nodes that store required chunks.
Policer will get an error from offline node and will skip object. > this function determines whether the node should store it
This function collects all nodes that store required chunks.
> What is the behaviour in this case?
Policer will get an error from offline node and will skip object.
|
|||||||
|
requiredChunkIndexes := make(map[uint32][]netmap.NodeInfo)
|
||||||
|
for i, n := range nodes {
|
||||||
|
if uint32(i) == objInfo.ECInfo.Total {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||||
|
requiredChunkIndexes[uint32(i)] = []netmap.NodeInfo{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return requiredChunkIndexes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) resolveLocalECChunks(ctx context.Context, parentAddress oid.Address, required map[uint32][]netmap.NodeInfo) error {
|
||||||
|
_, err := p.localHeader(ctx, parentAddress)
|
||||||
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
if err == nil { // should not be happen
|
||||||
|
return errNoECinfoReturnded
|
||||||
|
}
|
||||||
|
if !errors.As(err, &eiErr) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, ch := range eiErr.ECInfo().Chunks {
|
||||||
|
delete(required, ch.Index)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.Address, nodes []netmap.NodeInfo, required map[uint32][]netmap.NodeInfo, indexToObjectID map[uint32]oid.ID) bool {
|
||||||
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
for _, n := range nodes {
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err := p.remoteHeader(ctx, n, parentAddress, true)
|
||||||
|
if !errors.As(err, &eiErr) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, ch := range eiErr.ECInfo().Chunks {
|
||||||
|
if candidates, ok := required[ch.Index]; ok {
|
||||||
|
candidates = append(candidates, n)
|
||||||
|
required[ch.Index] = candidates
|
||||||
|
|
||||||
|
var chunkID oid.ID
|
||||||
|
if err := chunkID.ReadFromV2(ch.ID); err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID {
|
||||||
|
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed),
|
||||||
|
zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
indexToObjectID[ch.Index] = chunkID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, candidates := range required {
|
||||||
|
if len(candidates) == 0 {
|
||||||
|
p.log.Error(logs.PolicerMissingECChunk, zap.Stringer("object", parentAddress), zap.Uint32("index", index))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) {
|
||||||
|
var parentAddress oid.Address
|
||||||
|
parentAddress.SetContainer(objInfo.Address.Container())
|
||||||
|
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||||
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
resolved := make(map[uint32][]netmap.NodeInfo)
|
||||||
|
chunkIDs := make(map[uint32]oid.ID)
|
||||||
|
restore := true // do not restore EC chunks if some node returned error
|
||||||
|
for idx, n := range nodes {
|
||||||
|
if uint32(idx) >= objInfo.ECInfo.Total && uint32(len(resolved)) == objInfo.ECInfo.Total {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||||
|
_, err = p.localHeader(ctx, parentAddress)
|
||||||
|
} else {
|
||||||
|
_, err = p.remoteHeader(ctx, n, parentAddress, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.As(err, &eiErr) {
|
||||||
|
for _, ch := range eiErr.ECInfo().Chunks {
|
||||||
|
resolved[ch.Index] = append(resolved[ch.Index], n)
|
||||||
|
var ecInfoChunkID oid.ID
|
||||||
|
if err := ecInfoChunkID.ReadFromV2(ch.ID); err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if chunkID, exist := chunkIDs[ch.Index]; exist && chunkID != ecInfoChunkID {
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
I found I found `existed, exist` is not appropriate pair name. You could rename `chunkID` to `ecInfoChunkID`, but `existed` to `chunkID`
dstepanov-yadro
commented
Done Done
|
|||||||
|
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", chunkID),
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs? Please, could you give a brief explanation how this could be that chunks related to the same parent may differ by IDs?
dstepanov-yadro
commented
A bug that I hope we don't have :) A bug that I hope we don't have :)
|
|||||||
|
zap.Stringer("second", ecInfoChunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
chunkIDs[ch.Index] = ecInfoChunkID
|
||||||
|
}
|
||||||
|
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
||||||
|
p.log.Warn(logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||||
|
NumCopies: 1,
|
||||||
|
Addr: objInfo.Address,
|
||||||
|
Nodes: []netmap.NodeInfo{n},
|
||||||
|
}, newNodeCache())
|
||||||
|
restore = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() {
|
||||||
|
var found []uint32
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do you mean Do you mean `found`? It is already a past participle from `find`, `founded` is what happened with Saint-Petersburg in 1703.
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
for i := range resolved {
|
||||||
|
found = append(found, i)
|
||||||
|
}
|
||||||
|
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) {
|
||||||
|
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
parts := p.collectExistedChunks(ctx, objInfo, existedChunks, parentAddress, chunkIDs)
|
||||||
|
if parts == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key, err := p.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
required := make([]bool, len(parts))
|
||||||
|
for i, p := range parts {
|
||||||
|
if p == nil {
|
||||||
|
required[i] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
You call You call `Reconstruct()`, then you call `Split` on the result. These operations are inverse to each other.
What is the purpose?
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
if err := c.ReconstructParts(parts, required, key); err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for idx, part := range parts {
|
||||||
|
if _, exists := existedChunks[uint32(idx)]; exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(parentAddress.Container())
|
||||||
|
pID, _ := part.ID()
|
||||||
|
addr.SetObject(pID)
|
||||||
|
targetNode := nodes[idx%len(nodes)]
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
|
||||||
|
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
|
||||||
|
Addr: addr,
|
||||||
|
Obj: part,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||||
|
NumCopies: 1,
|
||||||
|
Addr: addr,
|
||||||
|
Nodes: []netmap.NodeInfo{targetNode},
|
||||||
|
Obj: part,
|
||||||
|
}, newNodeCache())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) collectExistedChunks(ctx context.Context, objInfo objectcore.Info, existedChunks map[uint32][]netmap.NodeInfo, parentAddress oid.Address, chunkIDs map[uint32]oid.ID) []*objectSDK.Object {
|
||||||
|
parts := make([]*objectSDK.Object, objInfo.ECInfo.Total)
|
||||||
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for idx, nodes := range existedChunks {
|
||||||
|
idx := idx
|
||||||
|
nodes := nodes
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
var objID oid.Address
|
||||||
|
objID.SetContainer(parentAddress.Container())
|
||||||
|
objID.SetObject(chunkIDs[idx])
|
||||||
|
var obj *objectSDK.Object
|
||||||
|
var err error
|
||||||
|
for _, node := range nodes {
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
|
obj, err = p.localObject(egCtx, objID)
|
||||||
|
} else {
|
||||||
|
obj, err = p.remoteObject(egCtx, node, objID)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
p.log.Warn(logs.PolicerCouldNotGetChunk, zap.Stringer("object", parentAddress), zap.Stringer("chunkID", objID), zap.Error(err), zap.String("node", hex.EncodeToString(node.PublicKey())))
|
||||||
|
}
|
||||||
|
if obj != nil {
|
||||||
|
parts[idx] = obj
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := errGroup.Wait(); err != nil {
|
||||||
|
p.log.Error(logs.PolicerCouldNotGetChunks, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return parts
|
||||||
|
}
|
565
pkg/services/policer/ec_test.go
Normal file
|
@ -0,0 +1,565 @@
|
||||||
|
package policer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestECChunkHasValidPlacement(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
chunkAddress := oidtest.Address()
|
||||||
|
parentID := oidtest.ID()
|
||||||
|
|
||||||
|
var policy netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||||
|
|
||||||
|
cnr := &container.Container{}
|
||||||
|
cnr.Value.Init()
|
||||||
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
|
containerSrc := containerSrc{
|
||||||
|
get: func(id cid.ID) (*container.Container, error) {
|
||||||
|
if id.Equals(chunkAddress.Container()) {
|
||||||
|
return cnr, nil
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||||
|
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
|
||||||
|
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("unexpected placement build")
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, raw, "remote header for parent object must be called with raw flag")
|
||||||
|
index := int(ni.PublicKey()[0])
|
||||||
|
require.True(t, index == 1 || index == 2, "invalid node to get parent header")
|
||||||
|
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = uint32(index)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = uint32(0)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(remoteHeadFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadFn),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
objInfo := objectcore.Info{
|
||||||
|
Address: chunkAddress,
|
||||||
|
Type: objectSDK.TypeRegular,
|
||||||
|
ECInfo: &objectcore.ECInfo{
|
||||||
|
ParentID: parentID,
|
||||||
|
Index: 0,
|
||||||
|
Total: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestECChunkHasInvalidPlacement(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
chunkAddress := oidtest.Address()
|
||||||
|
parentID := oidtest.ID()
|
||||||
|
chunkObject := objectSDK.New()
|
||||||
|
chunkObject.SetContainerID(chunkAddress.Container())
|
||||||
|
chunkObject.SetID(chunkAddress.Object())
|
||||||
|
chunkObject.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||||
|
chunkObject.SetPayloadSize(uint64(10))
|
||||||
|
chunkObject.SetECHeader(objectSDK.NewECHeader(parentID, 1, 3, []byte{}, 0))
|
||||||
|
|
||||||
|
var policy netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||||
|
|
||||||
|
cnr := &container.Container{}
|
||||||
|
cnr.Value.Init()
|
||||||
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
|
containerSrc := containerSrc{
|
||||||
|
get: func(id cid.ID) (*container.Container, error) {
|
||||||
|
if id.Equals(chunkAddress.Container()) {
|
||||||
|
return cnr, nil
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||||
|
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
|
||||||
|
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("unexpected placement build")
|
||||||
|
}
|
||||||
|
|
||||||
|
objInfo := objectcore.Info{
|
||||||
|
Address: chunkAddress,
|
||||||
|
Type: objectSDK.TypeRegular,
|
||||||
|
ECInfo: &objectcore.ECInfo{
|
||||||
|
ParentID: parentID,
|
||||||
|
Index: 1,
|
||||||
|
Total: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("node0 has chunk1, node1 has chunk0 and chunk1", func(t *testing.T) {
|
||||||
|
// policer should pull chunk0 on first run and drop chunk1 on second run
|
||||||
|
var allowDrop bool
|
||||||
|
requiredChunkID := oidtest.ID()
|
||||||
|
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||||
|
return chunkObject, nil
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.Index = 0
|
||||||
|
ch.SetID(requiredChunkID)
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 2
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
require.Fail(t, "unexpected remote HEAD")
|
||||||
|
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||||
|
if allowDrop {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.SetID(requiredChunkID)
|
||||||
|
ch.Index = 0
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var pullCounter atomic.Int64
|
||||||
|
var dropped []oid.Address
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(headFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadF),
|
||||||
|
WithReplicator(&testReplicator{
|
||||||
|
handlePullTask: (func(ctx context.Context, r replicator.Task) {
|
||||||
|
require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID &&
|
||||||
|
len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task")
|
||||||
|
pullCounter.Add(1)
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||||
|
require.True(t, allowDrop, "invalid redundent copy call")
|
||||||
|
dropped = append(dropped, a)
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
|
||||||
|
require.Equal(t, 0, len(dropped), "invalid dropped count")
|
||||||
|
allowDrop = true
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
|
||||||
|
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||||
|
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("node0 has chunk0 and chunk1, node1 has chunk1", func(t *testing.T) {
|
||||||
|
// policer should drop chunk1
|
||||||
|
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||||
|
return chunkObject, nil
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 2
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
require.Fail(t, "unexpected remote HEAD")
|
||||||
|
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 0
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var dropped []oid.Address
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(headFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadF),
|
||||||
|
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||||
|
dropped = append(dropped, a)
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||||
|
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("node0 has chunk0 and chunk1, node1 has no chunks", func(t *testing.T) {
|
||||||
|
// policer should replicate chunk1 to node1 on first run and drop chunk1 on node0 on second run
|
||||||
|
var secondRun bool
|
||||||
|
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||||
|
if !secondRun {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
return chunkObject, nil
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 2
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
require.Fail(t, "unexpected remote HEAD")
|
||||||
|
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 0
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var dropped []oid.Address
|
||||||
|
var replicated []replicator.Task
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(headFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadF),
|
||||||
|
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||||
|
dropped = append(dropped, a)
|
||||||
|
}),
|
||||||
|
WithReplicator(&testReplicator{
|
||||||
|
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
|
||||||
|
replicated = append(replicated, t)
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(dropped), "invalid dropped count")
|
||||||
|
require.Equal(t, 1, len(replicated), "invalid replicated count")
|
||||||
|
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
|
||||||
|
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
|
||||||
|
|
||||||
|
secondRun = true
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(replicated), "invalid replicated count")
|
||||||
|
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
|
||||||
|
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
|
||||||
|
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||||
|
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestECChunkRestore(t *testing.T) {
|
||||||
|
// node0 has chunk0, node1 has chunk1
|
||||||
|
// policer should replicate chunk0 to node2 on the first run
|
||||||
|
// then restore EC object and replicate chunk2 to node2 on the second run
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
payload := make([]byte, 64)
|
||||||
|
rand.Read(payload)
|
||||||
|
parentAddress := oidtest.Address()
|
||||||
|
parentObject := objectSDK.New()
|
||||||
|
parentObject.SetContainerID(parentAddress.Container())
|
||||||
|
parentObject.SetPayload(payload)
|
||||||
|
parentObject.SetPayloadSize(64)
|
||||||
|
objectSDK.CalculateAndSetPayloadChecksum(parentObject)
|
||||||
|
err := objectSDK.CalculateAndSetID(parentObject)
|
||||||
|
require.NoError(t, err)
|
||||||
|
id, _ := parentObject.ID()
|
||||||
|
parentAddress.SetObject(id)
|
||||||
|
|
||||||
|
chunkIDs := make([]oid.ID, 3)
|
||||||
|
c, err := erasurecode.NewConstructor(2, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
chunks, err := c.Split(parentObject, &key.PrivateKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for i, ch := range chunks {
|
||||||
|
chunkIDs[i], _ = ch.ID()
|
||||||
|
}
|
||||||
|
|
||||||
|
var policy netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||||
|
|
||||||
|
cnr := &container.Container{}
|
||||||
|
cnr.Value.Init()
|
||||||
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
|
containerSrc := containerSrc{
|
||||||
|
get: func(id cid.ID) (*container.Container, error) {
|
||||||
|
if id.Equals(parentAddress.Container()) {
|
||||||
|
return cnr, nil
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||||
|
if cnr.Equals(parentAddress.Container()) && obj.Equals(parentAddress.Object()) {
|
||||||
|
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("unexpected placement build")
|
||||||
|
}
|
||||||
|
var secondRun bool
|
||||||
|
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, raw, "remote header for parent object must be called with raw flag")
|
||||||
|
index := int(ni.PublicKey()[0])
|
||||||
|
require.True(t, index == 1 || index == 2 || index == 3, "invalid node to get parent header")
|
||||||
|
require.True(t, a == parentAddress, "invalid address to get remote header")
|
||||||
|
if index == 1 {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkIDs[1])
|
||||||
|
ch.Index = uint32(1)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if index == 2 && secondRun {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkIDs[0])
|
||||||
|
ch.Index = uint32(0)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, a == parentAddress, "invalid address to get remote header")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkIDs[0])
|
||||||
|
ch.Index = uint32(0)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var replicatedObj []*objectSDK.Object
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(remoteHeadFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadFn),
|
||||||
|
WithReplicator(&testReplicator{
|
||||||
|
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
|
||||||
|
if t.Obj != nil {
|
||||||
|
replicatedObj = append(replicatedObj, t.Obj)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, a.Container() == parentAddress.Container() && a.Object() == chunkIDs[0], "invalid local object request")
|
||||||
|
return chunks[0], nil
|
||||||
|
}),
|
||||||
|
WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
index := ni.PublicKey()[0]
|
||||||
|
if index == 2 {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
return chunks[index], nil
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
WithKeyStorage(util.NewKeyStorage(&key.PrivateKey, nil, nil)),
|
||||||
|
)
|
||||||
|
|
||||||
|
var chunkAddress oid.Address
|
||||||
|
chunkAddress.SetContainer(parentAddress.Container())
|
||||||
|
chunkAddress.SetObject(chunkIDs[0])
|
||||||
|
objInfo := objectcore.Info{
|
||||||
|
Address: chunkAddress,
|
||||||
|
Type: objectSDK.TypeRegular,
|
||||||
|
ECInfo: &objectcore.ECInfo{
|
||||||
|
ParentID: parentAddress.Object(),
|
||||||
|
Index: 0,
|
||||||
|
Total: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
secondRun = true
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(replicatedObj), "invalid replicated objects count")
|
||||||
|
chunks[2].SetSignature(nil)
|
||||||
|
expectedData, err := chunks[2].MarshalJSON()
|
||||||
|
require.NoError(t, err)
|
||||||
|
replicatedObj[0].SetSignature(nil)
|
||||||
|
actualData, err := replicatedObj[0].MarshalJSON()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, string(expectedData), string(actualData), "invalid restored objects")
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
@ -22,7 +23,7 @@ import (
|
||||||
// Note that the underlying implementation might be circular: i.e. it can restart
|
// Note that the underlying implementation might be circular: i.e. it can restart
|
||||||
// when the end of the key space is reached.
|
// when the end of the key space is reached.
|
||||||
type KeySpaceIterator interface {
|
type KeySpaceIterator interface {
|
||||||
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
Next(context.Context, uint32) ([]objectcore.Info, error)
|
||||||
Rewind()
|
Rewind()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,11 +36,20 @@ type BuryFunc func(context.Context, oid.Address) error
|
||||||
|
|
||||||
// Replicator is the interface to a consumer of replication tasks.
|
// Replicator is the interface to a consumer of replication tasks.
|
||||||
type Replicator interface {
|
type Replicator interface {
|
||||||
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||||
|
HandlePullTask(ctx context.Context, task replicator.Task)
|
||||||
|
HandleLocalPutTask(ctx context.Context, task replicator.Task)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
||||||
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
|
||||||
|
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
headTimeout time.Duration
|
headTimeout time.Duration
|
||||||
|
@ -56,6 +66,8 @@ type cfg struct {
|
||||||
|
|
||||||
remoteHeader RemoteObjectHeaderFunc
|
remoteHeader RemoteObjectHeaderFunc
|
||||||
|
|
||||||
|
localHeader LocalObjectHeaderFunc
|
||||||
|
|
||||||
netmapKeys netmap.AnnouncedKeys
|
netmapKeys netmap.AnnouncedKeys
|
||||||
|
|
||||||
replicator Replicator
|
replicator Replicator
|
||||||
|
@ -69,6 +81,12 @@ type cfg struct {
|
||||||
evictDuration, sleepDuration time.Duration
|
evictDuration, sleepDuration time.Duration
|
||||||
|
|
||||||
metrics MetricsRegister
|
metrics MetricsRegister
|
||||||
|
|
||||||
|
remoteObject RemoteObjectGetFunc
|
||||||
|
|
||||||
|
localObject LocalObjectGetFunc
|
||||||
|
|
||||||
|
keyStorage *util.KeyStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -125,13 +143,32 @@ func WithPlacementBuilder(v placement.Builder) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
|
// WithRemoteObjectHeader returns option to set remote object header receiver of Policer.
|
||||||
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.remoteHeader = v
|
c.remoteHeader = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer.
|
||||||
|
func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.localHeader = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.remoteObject = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.localObject = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
||||||
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
@ -169,3 +206,9 @@ func WithMetrics(m MetricsRegister) Option {
|
||||||
c.metrics = m
|
c.metrics = m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithKeyStorage(ks *util.KeyStorage) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.keyStorage = ks
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ import (
|
||||||
func TestBuryObjectWithoutContainer(t *testing.T) {
|
func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||||
// Key space
|
// Key space
|
||||||
addr := oidtest.Address()
|
addr := oidtest.Address()
|
||||||
objs := []objectcore.AddressWithType{
|
objs := []objectcore.Info{
|
||||||
{
|
{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Type: objectSDK.TypeRegular,
|
Type: objectSDK.TypeRegular,
|
||||||
|
@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
|
||||||
maintenanceNodes []int
|
maintenanceNodes []int
|
||||||
wantRemoveRedundant bool
|
wantRemoveRedundant bool
|
||||||
wantReplicateTo []int
|
wantReplicateTo []int
|
||||||
|
ecInfo *objectcore.ECInfo
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "1 copy already held by local node",
|
desc: "1 copy already held by local node",
|
||||||
|
@ -144,6 +145,22 @@ func TestProcessObject(t *testing.T) {
|
||||||
objHolders: []int{1},
|
objHolders: []int{1},
|
||||||
maintenanceNodes: []int{2},
|
maintenanceNodes: []int{2},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "lock object must be replicated to all EC nodes",
|
||||||
|
objType: objectSDK.TypeLock,
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `EC 1.1`,
|
||||||
|
placement: [][]int{{0, 1, 2}},
|
||||||
|
wantReplicateTo: []int{1, 2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "tombstone object must be replicated to all EC nodes",
|
||||||
|
objType: objectSDK.TypeTombstone,
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `EC 1.1`,
|
||||||
|
placement: [][]int{{0, 1, 2}},
|
||||||
|
wantReplicateTo: []int{1, 2},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range tests {
|
for i := range tests {
|
||||||
|
@ -173,12 +190,15 @@ func TestProcessObject(t *testing.T) {
|
||||||
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
||||||
return placementVectors, nil
|
return placementVectors, nil
|
||||||
}
|
}
|
||||||
|
if ti.ecInfo != nil && cnr.Equals(addr.Container()) && obj != nil && obj.Equals(ti.ecInfo.ParentID) {
|
||||||
|
return placementVectors, nil
|
||||||
|
}
|
||||||
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
||||||
return nil, errors.New("unexpected placement build")
|
return nil, errors.New("unexpected placement build")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object remote header
|
// Object remote header
|
||||||
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
index := int(ni.PublicKey()[0])
|
index := int(ni.PublicKey()[0])
|
||||||
if a != addr || index < 1 || index >= ti.nodeCount {
|
if a != addr || index < 1 || index >= ti.nodeCount {
|
||||||
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
||||||
|
@ -229,18 +249,21 @@ func TestProcessObject(t *testing.T) {
|
||||||
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
|
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
|
||||||
gotRemoveRedundant = true
|
gotRemoveRedundant = true
|
||||||
}),
|
}),
|
||||||
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
WithReplicator(&testReplicator{
|
||||||
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
|
handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||||
for _, node := range task.Nodes {
|
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
|
||||||
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
for _, node := range task.Nodes {
|
||||||
}
|
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||||
})),
|
}
|
||||||
|
},
|
||||||
|
}),
|
||||||
WithPool(testPool(t)),
|
WithPool(testPool(t)),
|
||||||
)
|
)
|
||||||
|
|
||||||
addrWithType := objectcore.AddressWithType{
|
addrWithType := objectcore.Info{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Type: ti.objType,
|
Type: ti.objType,
|
||||||
|
ECInfo: ti.ecInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.processObject(context.Background(), addrWithType)
|
err := p.processObject(context.Background(), addrWithType)
|
||||||
|
@ -276,7 +299,7 @@ func TestProcessObjectError(t *testing.T) {
|
||||||
WithPool(testPool(t)),
|
WithPool(testPool(t)),
|
||||||
)
|
)
|
||||||
|
|
||||||
addrWithType := objectcore.AddressWithType{
|
addrWithType := objectcore.Info{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,7 +308,7 @@ func TestProcessObjectError(t *testing.T) {
|
||||||
|
|
||||||
func TestIteratorContract(t *testing.T) {
|
func TestIteratorContract(t *testing.T) {
|
||||||
addr := oidtest.Address()
|
addr := oidtest.Address()
|
||||||
objs := []objectcore.AddressWithType{{
|
objs := []objectcore.Info{{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Type: objectSDK.TypeRegular,
|
Type: objectSDK.TypeRegular,
|
||||||
}}
|
}}
|
||||||
|
@ -350,7 +373,7 @@ func testPool(t *testing.T) *ants.Pool {
|
||||||
}
|
}
|
||||||
|
|
||||||
type nextResult struct {
|
type nextResult struct {
|
||||||
objs []objectcore.AddressWithType
|
objs []objectcore.Info
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,7 +384,7 @@ type predefinedIterator struct {
|
||||||
calls []string
|
calls []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.Info, error) {
|
||||||
if it.pos == len(it.scenario) {
|
if it.pos == len(it.scenario) {
|
||||||
close(it.finishCh)
|
close(it.finishCh)
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
@ -380,11 +403,11 @@ func (it *predefinedIterator) Rewind() {
|
||||||
|
|
||||||
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
||||||
type sliceKeySpaceIterator struct {
|
type sliceKeySpaceIterator struct {
|
||||||
objs []objectcore.AddressWithType
|
objs []objectcore.Info
|
||||||
cur int
|
cur int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.Info, error) {
|
||||||
if it.cur >= len(it.objs) {
|
if it.cur >= len(it.objs) {
|
||||||
return nil, engine.ErrEndOfListing
|
return nil, engine.ErrEndOfListing
|
||||||
}
|
}
|
||||||
|
@ -419,9 +442,20 @@ type announcedKeysFunc func([]byte) bool
|
||||||
|
|
||||||
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
||||||
|
|
||||||
// replicatorFunc is a Replicator backed by a function.
|
type testReplicator struct {
|
||||||
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||||
|
handleLocalPutTask func(ctx context.Context, task replicator.Task)
|
||||||
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
handlePullTask func(ctx context.Context, task replicator.Task)
|
||||||
f(ctx, task, res)
|
}
|
||||||
|
|
||||||
|
func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||||
|
r.handleReplicationTask(ctx, task, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) {
|
||||||
|
r.handleLocalPutTask(ctx, task)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) {
|
||||||
|
r.handlePullTask(ctx, task)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,9 +21,9 @@ type TaskResult interface {
|
||||||
SubmitSuccessfulReplication(netmap.NodeInfo)
|
SubmitSuccessfulReplication(netmap.NodeInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleTask executes replication task inside invoking goroutine.
|
// HandleReplicationTask executes replication task inside invoking goroutine.
|
||||||
// Passes all the nodes that accepted the replication to the TaskResult.
|
// Passes all the nodes that accepted the replication to the TaskResult.
|
||||||
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
|
func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res TaskResult) {
|
||||||
p.metrics.IncInFlightRequest()
|
p.metrics.IncInFlightRequest()
|
||||||
defer p.metrics.DecInFlightRequest()
|
defer p.metrics.DecInFlightRequest()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -32,7 +32,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
||||||
)
|
)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleReplicateTask",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.Stringer("address", task.Addr),
|
attribute.Stringer("address", task.Addr),
|
||||||
attribute.Int64("number_of_copies", int64(task.NumCopies)),
|
attribute.Int64("number_of_copies", int64(task.NumCopies)),
|
||||||
|
|
72
pkg/services/replicator/pull.go
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
package replicator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node")
|
||||||
|
|
||||||
|
func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
|
||||||
|
p.metrics.IncInFlightRequest()
|
||||||
|
defer p.metrics.DecInFlightRequest()
|
||||||
|
defer func() {
|
||||||
|
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandlePullTask",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Stringer("address", task.Addr),
|
||||||
|
attribute.Int("nodes_count", len(task.Nodes)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var obj *objectSDK.Object
|
||||||
|
|
||||||
|
for _, node := range task.Nodes {
|
||||||
|
var err error
|
||||||
|
obj, err = p.remoteGetter.Get(ctx, getsvc.RemoteGetPrm{
|
||||||
|
Address: task.Addr,
|
||||||
|
Node: node,
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var endpoints []string
|
||||||
|
node.IterateNetworkEndpoints(func(s string) bool {
|
||||||
|
endpoints = append(endpoints, s)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(err),
|
||||||
|
zap.Strings("endpoints", endpoints),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj == nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(errFailedToGetObjectFromAnyNode),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := engine.Put(ctx, p.localStorage, obj)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
}
|
47
pkg/services/replicator/put.go
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
package replicator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errObjectNotDefined = errors.New("object is not defined")
|
||||||
|
|
||||||
|
func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
|
||||||
|
p.metrics.IncInFlightRequest()
|
||||||
|
defer p.metrics.DecInFlightRequest()
|
||||||
|
defer func() {
|
||||||
|
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleLocalPutTask",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Stringer("address", task.Addr),
|
||||||
|
attribute.Int("nodes_count", len(task.Nodes)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if task.Obj == nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(errObjectNotDefined),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := engine.Put(ctx, p.localStorage, task.Obj)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -25,6 +26,8 @@ type cfg struct {
|
||||||
|
|
||||||
remoteSender *putsvc.RemoteSender
|
remoteSender *putsvc.RemoteSender
|
||||||
|
|
||||||
|
remoteGetter *getsvc.RemoteGetter
|
||||||
|
|
||||||
localStorage *engine.StorageEngine
|
localStorage *engine.StorageEngine
|
||||||
|
|
||||||
metrics MetricsRegister
|
metrics MetricsRegister
|
||||||
|
@ -70,6 +73,12 @@ func WithRemoteSender(v *putsvc.RemoteSender) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithRemoteGetter(v *getsvc.RemoteGetter) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.remoteGetter = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithLocalStorage returns option to set local object storage of Replicator.
|
// WithLocalStorage returns option to set local object storage of Replicator.
|
||||||
func WithLocalStorage(v *engine.StorageEngine) Option {
|
func WithLocalStorage(v *engine.StorageEngine) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
|
Why have you decided to go with 2 func options instead of an interface?
Because this approach has already been used in policer.