Add EC replication #1129
25 changed files with 1440 additions and 143 deletions
|
@ -13,7 +13,7 @@ type keySpaceIterator struct {
|
||||||
cur *engine.Cursor
|
cur *engine.Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.AddressWithType, error) {
|
func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objectcore.Info, error) {
|
||||||
var prm engine.ListWithCursorPrm
|
var prm engine.ListWithCursorPrm
|
||||||
prm.WithCursor(it.cur)
|
prm.WithCursor(it.cur)
|
||||||
prm.WithCount(batchSize)
|
prm.WithCount(batchSize)
|
||||||
|
|
|
@ -29,7 +29,6 @@ import (
|
||||||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
||||||
headsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/head"
|
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
||||||
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
||||||
|
@ -231,7 +230,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteHeader := headsvc.NewRemoteHeader(keyStorage, clientConstructor)
|
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
||||||
|
|
||||||
pol := policer.New(
|
pol := policer.New(
|
||||||
policer.WithLogger(c.log),
|
policer.WithLogger(c.log),
|
||||||
|
@ -242,11 +241,33 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
||||||
),
|
),
|
||||||
policer.WithRemoteObjectHeaderFunc(
|
policer.WithRemoteObjectHeaderFunc(
|
||||||
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
prm := new(headsvc.RemoteHeadPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a).WithRaw(raw)
|
||||||
return remoteHeader.Head(ctx, prm)
|
return remoteReader.Head(ctx, prm)
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
policer.WithLocalObjectHeaderFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
var prm engine.HeadPrm
|
||||||
fyrchik marked this conversation as resolved
|
|||||||
|
prm.WithAddress(a)
|
||||||
|
res, err := c.cfgObject.cfgLocalStorage.localStorage.Head(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return res.Header(), nil
|
||||||
|
}),
|
||||||
|
policer.WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
prm := new(objectService.RemoteRequestPrm).WithNodeInfo(ni).WithObjectAddress(a)
|
||||||
|
return remoteReader.Get(ctx, prm)
|
||||||
|
}),
|
||||||
|
policer.WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
var prm engine.GetPrm
|
||||||
|
prm.WithAddress(a)
|
||||||
|
res, err := c.cfgObject.cfgLocalStorage.localStorage.Get(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return res.Object(), nil
|
||||||
|
}),
|
||||||
policer.WithNetmapKeys(c),
|
policer.WithNetmapKeys(c),
|
||||||
policer.WithHeadTimeout(
|
policer.WithHeadTimeout(
|
||||||
policerconfig.HeadTimeout(c.appCfg),
|
policerconfig.HeadTimeout(c.appCfg),
|
||||||
|
@ -265,6 +286,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
}),
|
}),
|
||||||
policer.WithPool(c.cfgObject.pool.replication),
|
policer.WithPool(c.cfgObject.pool.replication),
|
||||||
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
|
||||||
|
policer.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
|
||||||
c.workers = append(c.workers, worker{
|
c.workers = append(c.workers, worker{
|
||||||
|
@ -297,6 +319,9 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
||||||
replicator.WithRemoteSender(
|
replicator.WithRemoteSender(
|
||||||
putsvc.NewRemoteSender(keyStorage, cache),
|
putsvc.NewRemoteSender(keyStorage, cache),
|
||||||
),
|
),
|
||||||
|
replicator.WithRemoteGetter(
|
||||||
|
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
|
||||||
|
),
|
||||||
replicator.WithMetrics(c.metricsCollector.Replicator()),
|
replicator.WithMetrics(c.metricsCollector.Replicator()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -529,4 +529,16 @@ const (
|
||||||
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
||||||
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||||
ECFailedToSaveECPart = "failed to save EC part"
|
ECFailedToSaveECPart = "failed to save EC part"
|
||||||
|
PolicerNodeIsNotECObjectNode = "current node is not EC object node"
|
||||||
|
PolicerFailedToGetLocalECChunks = "failed to get local EC chunks"
|
||||||
|
PolicerMissingECChunk = "failed to find EC chunk on any of the nodes"
|
||||||
|
PolicerFailedToDecodeECChunkID = "failed to decode EC chunk ID"
|
||||||
|
PolicerDifferentObjectIDForTheSameECChunk = "different object IDs for the same EC chunk"
|
||||||
|
ReplicatorCouldNotGetObjectFromRemoteStorage = "could not get object from remote storage"
|
||||||
|
ReplicatorCouldNotPutObjectToLocalStorage = "could not put object to local storage"
|
||||||
|
PolicerCouldNotGetObjectFromNodeMoving = "could not get EC object from the node, moving current chunk to the node"
|
||||||
|
PolicerCouldNotRestoreObjectNotEnoughChunks = "could not restore EC object: not enough chunks"
|
||||||
|
PolicerFailedToRestoreObject = "failed to restore EC object"
|
||||||
|
PolicerCouldNotGetChunk = "could not get EC chunk"
|
||||||
|
PolicerCouldNotGetChunks = "could not get EC chunks"
|
||||||
)
|
)
|
||||||
|
|
|
@ -167,17 +167,29 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
|
||||||
token := obj.SessionToken()
|
token := obj.SessionToken()
|
||||||
ownerID := obj.OwnerID()
|
ownerID := obj.OwnerID()
|
||||||
|
|
||||||
|
if token == nil && obj.ECHeader() != nil {
|
||||||
|
role, err := v.isIROrContainerNode(obj, binKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if role == acl.RoleContainer {
|
||||||
|
// EC part could be restored or created by container node, so ownerID could not match object signature
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return v.checkOwnerKey(ownerID, key)
|
||||||
|
}
|
||||||
|
|
||||||
if token == nil || !token.AssertAuthKey(&key) {
|
if token == nil || !token.AssertAuthKey(&key) {
|
||||||
return v.checkOwnerKey(ownerID, key)
|
return v.checkOwnerKey(ownerID, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.verifyTokenIssuer {
|
if v.verifyTokenIssuer {
|
||||||
signerIsIROrContainerNode, err := v.isIROrContainerNode(obj, binKey)
|
role, err := v.isIROrContainerNode(obj, binKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if signerIsIROrContainerNode {
|
if role == acl.RoleContainer || role == acl.RoleInnerRing {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,10 +202,10 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (bool, error) {
|
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (acl.Role, error) {
|
||||||
cnrID, containerIDSet := obj.ContainerID()
|
cnrID, containerIDSet := obj.ContainerID()
|
||||||
if !containerIDSet {
|
if !containerIDSet {
|
||||||
return false, errNilCID
|
return acl.RoleOthers, errNilCID
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrIDBin := make([]byte, sha256.Size)
|
cnrIDBin := make([]byte, sha256.Size)
|
||||||
|
@ -201,14 +213,14 @@ func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey [
|
||||||
|
|
||||||
cnr, err := v.containers.Get(cnrID)
|
cnr, err := v.containers.Get(cnrID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
return acl.RoleOthers, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
|
res, err := v.senderClassifier.IsInnerRingOrContainerNode(signerKey, cnrID, cnr.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return acl.RoleOthers, err
|
||||||
}
|
}
|
||||||
return res.Role == acl.RoleContainer || res.Role == acl.RoleInnerRing, nil
|
return res.Role, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {
|
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {
|
||||||
|
|
|
@ -7,14 +7,21 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddressWithType groups object address with its FrostFS
|
type ECInfo struct {
|
||||||
// object type.
|
ParentID oid.ID
|
||||||
type AddressWithType struct {
|
Index uint32
|
||||||
|
Total uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
// Info groups object address with its FrostFS
|
||||||
|
// object info.
|
||||||
|
type Info struct {
|
||||||
Address oid.Address
|
Address oid.Address
|
||||||
Type objectSDK.Type
|
Type objectSDK.Type
|
||||||
IsLinkingObject bool
|
IsLinkingObject bool
|
||||||
|
ECInfo *ECInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v AddressWithType) String() string {
|
func (v Info) String() string {
|
||||||
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
|
||||||
}
|
}
|
|
@ -622,7 +622,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
||||||
return shards, nil
|
return shards, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||||
|
|
|
@ -68,12 +68,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||||
|
|
||||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||||
type ListWithCursorRes struct {
|
type ListWithCursorRes struct {
|
||||||
addrList []objectcore.AddressWithType
|
addrList []objectcore.Info
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
func (l ListWithCursorRes) AddressList() []objectcore.Info {
|
||||||
return l.addrList
|
return l.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
// parameter set to zero.
|
// parameter set to zero.
|
||||||
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
result := make([]objectcore.Info, 0, prm.count)
|
||||||
|
|
||||||
// Set initial cursors
|
// Set initial cursors
|
||||||
cursor := prm.cursor
|
cursor := prm.cursor
|
||||||
|
|
|
@ -76,8 +76,8 @@ func TestListWithCursor(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
expected := make([]object.Info, 0, tt.objectNum)
|
||||||
got := make([]object.AddressWithType, 0, tt.objectNum)
|
got := make([]object.Info, 0, tt.objectNum)
|
||||||
|
|
||||||
for i := 0; i < tt.objectNum; i++ {
|
for i := 0; i < tt.objectNum; i++ {
|
||||||
containerID := cidtest.ID()
|
containerID := cidtest.ID()
|
||||||
|
@ -88,7 +88,7 @@ func TestListWithCursor(t *testing.T) {
|
||||||
|
|
||||||
err := e.Put(context.Background(), prm)
|
err := e.Put(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
expected = append(expected, object.Info{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm ListWithCursorPrm
|
var prm ListWithCursorPrm
|
||||||
|
|
|
@ -47,12 +47,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) {
|
||||||
|
|
||||||
// ListRes contains values returned from ListWithCursor operation.
|
// ListRes contains values returned from ListWithCursor operation.
|
||||||
type ListRes struct {
|
type ListRes struct {
|
||||||
addrList []objectcore.AddressWithType
|
addrList []objectcore.Info
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (l ListRes) AddressList() []objectcore.AddressWithType {
|
func (l ListRes) AddressList() []objectcore.Info {
|
||||||
return l.addrList
|
return l.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
result := make([]objectcore.Info, 0, prm.count)
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
||||||
|
@ -99,7 +99,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
|
||||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
threshold := cursor == nil // threshold is a flag to ignore cursor
|
||||||
var bucketName []byte
|
var bucketName []byte
|
||||||
var err error
|
var err error
|
||||||
|
@ -183,11 +183,11 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||||
cidRaw []byte, // container ID prefix, optimization
|
cidRaw []byte, // container ID prefix, optimization
|
||||||
cnt cid.ID, // container ID
|
cnt cid.ID, // container ID
|
||||||
to []objectcore.AddressWithType, // listing result
|
to []objectcore.Info, // listing result
|
||||||
limit int, // stop listing at `limit` items in result
|
limit int, // stop listing at `limit` items in result
|
||||||
cursor *Cursor, // start from cursor object
|
cursor *Cursor, // start from cursor object
|
||||||
threshold bool, // ignore cursor and start immediately
|
threshold bool, // ignore cursor and start immediately
|
||||||
) ([]objectcore.AddressWithType, []byte, *Cursor, error) {
|
) ([]objectcore.Info, []byte, *Cursor, error) {
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
}
|
}
|
||||||
|
@ -219,18 +219,27 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
var isLinkingObj bool
|
var isLinkingObj bool
|
||||||
|
var ecInfo *objectcore.ECInfo
|
||||||
if objType == objectSDK.TypeRegular {
|
if objType == objectSDK.TypeRegular {
|
||||||
var o objectSDK.Object
|
var o objectSDK.Object
|
||||||
if err := o.Unmarshal(v); err != nil {
|
if err := o.Unmarshal(v); err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = isLinkObject(&o)
|
||||||
|
ecHeader := o.ECHeader()
|
||||||
|
if ecHeader != nil {
|
||||||
|
ecInfo = &objectcore.ECInfo{
|
||||||
|
ParentID: ecHeader.Parent(),
|
||||||
|
Index: ecHeader.Index(),
|
||||||
|
Total: ecHeader.Total(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var a oid.Address
|
var a oid.Address
|
||||||
a.SetContainer(cnt)
|
a.SetContainer(cnt)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
to = append(to, objectcore.AddressWithType{Address: a, Type: objType, IsLinkingObject: isLinkingObj})
|
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
total = containers * 4 // regular + ts + child + lock
|
total = containers * 4 // regular + ts + child + lock
|
||||||
)
|
)
|
||||||
|
|
||||||
expected := make([]object.AddressWithType, 0, total)
|
expected := make([]object.Info, 0, total)
|
||||||
|
|
||||||
// fill metabase with objects
|
// fill metabase with objects
|
||||||
for i := 0; i < containers; i++ {
|
for i := 0; i < containers; i++ {
|
||||||
|
@ -88,21 +88,21 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
obj.SetType(objectSDK.TypeRegular)
|
obj.SetType(objectSDK.TypeRegular)
|
||||||
err := putBig(db, obj)
|
err := putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||||
|
|
||||||
// add one tombstone
|
// add one tombstone
|
||||||
obj = testutil.GenerateObjectWithCID(containerID)
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
obj.SetType(objectSDK.TypeTombstone)
|
obj.SetType(objectSDK.TypeTombstone)
|
||||||
err = putBig(db, obj)
|
err = putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
|
||||||
|
|
||||||
// add one lock
|
// add one lock
|
||||||
obj = testutil.GenerateObjectWithCID(containerID)
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
obj.SetType(objectSDK.TypeLock)
|
obj.SetType(objectSDK.TypeLock)
|
||||||
err = putBig(db, obj)
|
err = putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
|
||||||
|
|
||||||
// add one inhumed (do not include into expected)
|
// add one inhumed (do not include into expected)
|
||||||
obj = testutil.GenerateObjectWithCID(containerID)
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
@ -124,12 +124,12 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
child.SetSplitID(splitID)
|
child.SetSplitID(splitID)
|
||||||
err = putBig(db, child)
|
err = putBig(db, child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("success with various count", func(t *testing.T) {
|
t.Run("success with various count", func(t *testing.T) {
|
||||||
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
||||||
got := make([]object.AddressWithType, 0, total)
|
got := make([]object.Info, 0, total)
|
||||||
|
|
||||||
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
|
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
|
||||||
require.NoError(t, err, "count:%d", countPerReq)
|
require.NoError(t, err, "count:%d", countPerReq)
|
||||||
|
@ -211,7 +211,7 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) {
|
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.Info, *meta.Cursor, error) {
|
||||||
var listPrm meta.ListPrm
|
var listPrm meta.ListPrm
|
||||||
listPrm.SetCount(count)
|
listPrm.SetCount(count)
|
||||||
listPrm.SetCursor(cursor)
|
listPrm.SetCursor(cursor)
|
||||||
|
|
|
@ -42,7 +42,7 @@ type ListWithCursorPrm struct {
|
||||||
|
|
||||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||||
type ListWithCursorRes struct {
|
type ListWithCursorRes struct {
|
||||||
addrList []objectcore.AddressWithType
|
addrList []objectcore.Info
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
func (r ListWithCursorRes) AddressList() []objectcore.Info {
|
||||||
return r.addrList
|
return r.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
|
||||||
Obj: obj,
|
Obj: obj,
|
||||||
Nodes: nodes,
|
Nodes: nodes,
|
||||||
}
|
}
|
||||||
s.replicator.HandleTask(ctx, task, &res)
|
s.replicator.HandleReplicationTask(ctx, task, &res)
|
||||||
|
|
||||||
if res.count == 0 {
|
if res.count == 0 {
|
||||||
return errors.New("object was not replicated")
|
return errors.New("object was not replicated")
|
||||||
|
|
55
pkg/services/object/get/remote_getter.go
Normal file
55
pkg/services/object/get/remote_getter.go
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RemoteGetPrm struct {
|
||||||
|
Address oid.Address
|
||||||
|
Node netmapSDK.NodeInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoteGetter struct {
|
||||||
|
s remoteStorageConstructor
|
||||||
|
es epochSource
|
||||||
|
ks keyStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *RemoteGetter) Get(ctx context.Context, prm RemoteGetPrm) (*objectSDK.Object, error) {
|
||||||
|
var nodeInfo client.NodeInfo
|
||||||
|
if err := client.NodeInfoFromRawNetmapElement(&nodeInfo, netmapCore.Node(prm.Node)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rs, err := g.s.Get(nodeInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
epoch, err := g.es.Epoch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
key, err := g.ks.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r := RemoteRequestParams{
|
||||||
|
Epoch: epoch,
|
||||||
|
TTL: 1,
|
||||||
|
PrivateKey: key,
|
||||||
|
}
|
||||||
|
return rs.Get(ctx, prm.Address, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRemoteGetter(cc clientConstructor, es epochSource, ks keyStorage) *RemoteGetter {
|
||||||
|
return &RemoteGetter{
|
||||||
|
s: &multiclientRemoteStorageConstructor{clientConstructor: cc},
|
||||||
|
es: es,
|
||||||
|
ks: ks,
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,17 +0,0 @@
|
||||||
package headsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Prm struct {
|
|
||||||
addr oid.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Prm) WithAddress(v oid.Address) *Prm {
|
|
||||||
if p != nil {
|
|
||||||
p.addr = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
|
@ -34,10 +34,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||||
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
|
||||||
errInvalidECObject = errors.New("object must be splitted to EC parts")
|
|
||||||
)
|
|
||||||
|
|
||||||
type putSingleRequestSigner struct {
|
type putSingleRequestSigner struct {
|
||||||
req *objectAPI.PutSingleRequest
|
req *objectAPI.PutSingleRequest
|
||||||
|
@ -181,10 +178,6 @@ func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlac
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
|
|
||||||
return errInvalidECObject
|
|
||||||
}
|
|
||||||
|
|
||||||
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package headsvc
|
package object
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -18,18 +18,18 @@ type ClientConstructor interface {
|
||||||
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
|
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteHeader represents utility for getting
|
// RemoteReader represents utility for getting
|
||||||
// the object header from a remote host.
|
// the object from a remote host.
|
||||||
type RemoteHeader struct {
|
type RemoteReader struct {
|
||||||
keyStorage *util.KeyStorage
|
keyStorage *util.KeyStorage
|
||||||
|
|
||||||
clientCache ClientConstructor
|
clientCache ClientConstructor
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteHeadPrm groups remote header operation parameters.
|
// RemoteRequestPrm groups remote operation parameters.
|
||||||
type RemoteHeadPrm struct {
|
type RemoteRequestPrm struct {
|
||||||
commonHeadPrm *Prm
|
addr oid.Address
|
||||||
|
raw bool
|
||||||
node netmap.NodeInfo
|
node netmap.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,16 +37,16 @@ const remoteOpTTL = 1
|
||||||
|
|
||||||
var ErrNotFound = errors.New("object header not found")
|
var ErrNotFound = errors.New("object header not found")
|
||||||
|
|
||||||
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
|
// NewRemoteReader creates, initializes and returns new RemoteHeader instance.
|
||||||
func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteHeader {
|
func NewRemoteReader(keyStorage *util.KeyStorage, cache ClientConstructor) *RemoteReader {
|
||||||
return &RemoteHeader{
|
return &RemoteReader{
|
||||||
keyStorage: keyStorage,
|
keyStorage: keyStorage,
|
||||||
clientCache: cache,
|
clientCache: cache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeInfo sets information about the remote node.
|
// WithNodeInfo sets information about the remote node.
|
||||||
func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
|
func (p *RemoteRequestPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteRequestPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.node = v
|
p.node = v
|
||||||
}
|
}
|
||||||
|
@ -55,16 +55,23 @@ func (p *RemoteHeadPrm) WithNodeInfo(v netmap.NodeInfo) *RemoteHeadPrm {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithObjectAddress sets object address.
|
// WithObjectAddress sets object address.
|
||||||
func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm {
|
func (p *RemoteRequestPrm) WithObjectAddress(v oid.Address) *RemoteRequestPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.commonHeadPrm = new(Prm).WithAddress(v)
|
p.addr = v
|
||||||
}
|
}
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *RemoteRequestPrm) WithRaw(v bool) *RemoteRequestPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.raw = v
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
// Head requests object header from the remote node.
|
// Head requests object header from the remote node.
|
||||||
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK.Object, error) {
|
func (h *RemoteReader) Head(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
|
||||||
key, err := h.keyStorage.GetKey(nil)
|
key, err := h.keyStorage.GetKey(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||||
|
@ -86,8 +93,11 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
|
||||||
|
|
||||||
headPrm.SetClient(c)
|
headPrm.SetClient(c)
|
||||||
headPrm.SetPrivateKey(key)
|
headPrm.SetPrivateKey(key)
|
||||||
headPrm.SetAddress(prm.commonHeadPrm.addr)
|
headPrm.SetAddress(prm.addr)
|
||||||
headPrm.SetTTL(remoteOpTTL)
|
headPrm.SetTTL(remoteOpTTL)
|
||||||
|
if prm.raw {
|
||||||
|
headPrm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
res, err := internalclient.HeadObject(ctx, headPrm)
|
res, err := internalclient.HeadObject(ctx, headPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -96,3 +106,39 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*objectSDK
|
||||||
|
|
||||||
return res.Header(), nil
|
return res.Header(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *RemoteReader) Get(ctx context.Context, prm *RemoteRequestPrm) (*objectSDK.Object, error) {
|
||||||
|
key, err := h.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
|
err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(prm.node))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := h.clientCache.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var getPrm internalclient.GetObjectPrm
|
||||||
|
|
||||||
|
getPrm.SetClient(c)
|
||||||
|
getPrm.SetPrivateKey(key)
|
||||||
|
getPrm.SetAddress(prm.addr)
|
||||||
|
getPrm.SetTTL(remoteOpTTL)
|
||||||
|
if prm.raw {
|
||||||
|
getPrm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.GetObject(ctx, getPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Object(), nil
|
||||||
|
}
|
|
@ -18,19 +18,15 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error {
|
func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error {
|
||||||
addr := addrWithType.Address
|
cnr, err := p.cnrSrc.Get(objInfo.Address.Container())
|
||||||
idCnr := addr.Container()
|
|
||||||
idObj := addr.Object()
|
|
||||||
|
|
||||||
cnr, err := p.cnrSrc.Get(idCnr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrContainerNotFound(err) {
|
if client.IsErrContainerNotFound(err) {
|
||||||
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr)
|
existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container())
|
||||||
if errWasRemoved != nil {
|
if errWasRemoved != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
|
||||||
} else if existed {
|
} else if existed {
|
||||||
err := p.buryFn(ctx, addrWithType.Address)
|
err := p.buryFn(ctx, objInfo.Address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
|
||||||
}
|
}
|
||||||
|
@ -41,11 +37,16 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
}
|
}
|
||||||
|
|
||||||
policy := cnr.Value.PlacementPolicy()
|
policy := cnr.Value.PlacementPolicy()
|
||||||
if policycore.IsECPlacement(policy) {
|
|
||||||
// EC not supported yet by policer
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if policycore.IsECPlacement(policy) {
|
||||||
|
return p.processECContainerObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
return p.processRepContainerObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
idObj := objInfo.Address.Object()
|
||||||
|
idCnr := objInfo.Address.Container()
|
||||||
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||||
|
@ -53,11 +54,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
|
|
||||||
c := &placementRequirements{}
|
c := &placementRequirements{}
|
||||||
|
|
||||||
var numOfContainerNodes int
|
|
||||||
for i := range nn {
|
|
||||||
numOfContainerNodes += len(nn[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
// cached info about already checked nodes
|
// cached info about already checked nodes
|
||||||
checkedNodes := newNodeCache()
|
checkedNodes := newNodeCache()
|
||||||
|
|
||||||
|
@ -68,15 +64,24 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
p.processNodes(ctx, c, addrWithType, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes)
|
shortage := policy.ReplicaDescriptor(i).NumberOfObjects()
|
||||||
|
if objInfo.Type == objectSDK.TypeLock || objInfo.Type == objectSDK.TypeTombstone || objInfo.IsLinkingObject {
|
||||||
|
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
||||||
|
// for correct object removal protection:
|
||||||
|
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
||||||
|
// - `LOCK` object removal is a prohibited action in the GC.
|
||||||
|
shortage = uint32(len(nn[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
p.processRepNodes(ctx, c, objInfo, nn[i], shortage, checkedNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.needLocalCopy && c.removeLocalCopy {
|
if !c.needLocalCopy && c.removeLocalCopy {
|
||||||
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
||||||
zap.Stringer("object", addr),
|
zap.Stringer("object", objInfo.Address),
|
||||||
)
|
)
|
||||||
|
|
||||||
p.cbRedundantCopy(ctx, addr)
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -89,23 +94,13 @@ type placementRequirements struct {
|
||||||
removeLocalCopy bool
|
removeLocalCopy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) processNodes(ctx context.Context, requirements *placementRequirements, addrWithType objectcore.AddressWithType,
|
func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info,
|
||||||
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache,
|
nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache,
|
||||||
) {
|
) {
|
||||||
addr := addrWithType.Address
|
addr := objInfo.Address
|
||||||
typ := addrWithType.Type
|
|
||||||
|
|
||||||
// Number of copies that are stored on maintenance nodes.
|
// Number of copies that are stored on maintenance nodes.
|
||||||
var uncheckedCopies int
|
var uncheckedCopies int
|
||||||
|
|
||||||
if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || addrWithType.IsLinkingObject {
|
|
||||||
// all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects
|
|
||||||
// for correct object removal protection:
|
|
||||||
// - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests;
|
|
||||||
// - `LOCK` object removal is a prohibited action in the GC.
|
|
||||||
shortage = uint32(len(nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -133,7 +128,7 @@ func (p *Policer) processNodes(ctx context.Context, requirements *placementRequi
|
||||||
|
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
|
|
||||||
_, err := p.remoteHeader(callCtx, nodes[i], addr)
|
_, err := p.remoteHeader(callCtx, nodes[i], addr, false)
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -196,7 +191,7 @@ func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address
|
||||||
Nodes: nodes,
|
Nodes: nodes,
|
||||||
}
|
}
|
||||||
|
|
||||||
p.replicator.HandleTask(ctx, task, checkedNodes)
|
p.replicator.HandleReplicationTask(ctx, task, checkedNodes)
|
||||||
|
|
||||||
case uncheckedCopies > 0:
|
case uncheckedCopies > 0:
|
||||||
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
// If we have more copies than needed, but some of them are from the maintenance nodes,
|
||||||
|
|
390
pkg/services/policer/ec.go
Normal file
390
pkg/services/policer/ec.go
Normal file
|
@ -0,0 +1,390 @@
|
||||||
|
package policer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errNoECinfoReturnded = errors.New("no EC info returned")
|
||||||
|
|
||||||
|
type ecChunkProcessResult struct {
|
||||||
|
validPlacement bool
|
||||||
|
removeLocal bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector")
|
||||||
|
|
||||||
|
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
if objInfo.ECInfo == nil {
|
||||||
|
return p.processECContainerRepObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
return p.processECContainerECObject(ctx, objInfo, policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
|
||||||
|
// All of them must be stored on all of the container nodes.
|
||||||
|
func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
objID := objInfo.Address.Object()
|
||||||
|
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objID, policy)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||||
|
}
|
||||||
|
if len(nn) != 1 {
|
||||||
|
return errInvalidECPlacement
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &placementRequirements{}
|
||||||
|
checkedNodes := newNodeCache()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
p.processRepNodes(ctx, c, objInfo, nn[0], uint32(len(nn[0])), checkedNodes)
|
||||||
|
|
||||||
|
if !c.needLocalCopy && c.removeLocalCopy {
|
||||||
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
||||||
|
zap.Stringer("object", objInfo.Address),
|
||||||
|
)
|
||||||
|
|
||||||
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
||||||
|
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
||||||
|
}
|
||||||
|
if len(nn) != 1 {
|
||||||
|
return errInvalidECPlacement
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
res := p.processECChunk(ctx, objInfo, nn[0])
|
||||||
|
if !res.validPlacement {
|
||||||
|
// drop local chunk only if all required chunks are in place
|
||||||
|
res.removeLocal = res.removeLocal && p.pullRequiredECChunks(ctx, objInfo, nn[0])
|
||||||
|
}
|
||||||
|
p.adjustECPlacement(ctx, objInfo, nn[0], policy)
|
||||||
|
|
||||||
|
if res.removeLocal {
|
||||||
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
|
||||||
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processECChunk replicates EC chunk if needed.
|
||||||
|
func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) ecChunkProcessResult {
|
||||||
|
var removeLocalChunk bool
|
||||||
|
requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))]
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) {
|
||||||
|
// current node is required node, we are happy
|
||||||
|
return ecChunkProcessResult{
|
||||||
|
validPlacement: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if requiredNode.IsMaintenance() {
|
||||||
|
// consider maintenance mode has object, but do not drop local copy
|
||||||
|
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
||||||
|
return ecChunkProcessResult{}
|
||||||
|
}
|
||||||
|
|
||||||
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
|
_, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address, false)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
removeLocalChunk = true
|
||||||
|
} else if client.IsErrObjectNotFound(err) {
|
||||||
|
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", objInfo.Address), zap.Uint32("shortage", 1))
|
||||||
|
task := replicator.Task{
|
||||||
|
NumCopies: 1,
|
||||||
|
Addr: objInfo.Address,
|
||||||
|
Nodes: []netmap.NodeInfo{requiredNode},
|
||||||
|
}
|
||||||
|
p.replicator.HandleReplicationTask(ctx, task, newNodeCache())
|
||||||
|
} else if isClientErrMaintenance(err) {
|
||||||
|
// consider maintenance mode has object, but do not drop local copy
|
||||||
|
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
||||||
|
} else {
|
||||||
|
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return ecChunkProcessResult{
|
||||||
|
removeLocal: removeLocalChunk,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) pullRequiredECChunks(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool {
|
||||||
|
var parentAddress oid.Address
|
||||||
|
parentAddress.SetContainer(objInfo.Address.Container())
|
||||||
|
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||||
|
|
||||||
|
requiredChunkIndexes := p.collectRequiredECChunks(nodes, objInfo)
|
||||||
|
if len(requiredChunkIndexes) == 0 {
|
||||||
|
p.log.Info(logs.PolicerNodeIsNotECObjectNode, zap.Stringer("object", objInfo.ECInfo.ParentID))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
err := p.resolveLocalECChunks(ctx, parentAddress, requiredChunkIndexes)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToGetLocalECChunks, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if len(requiredChunkIndexes) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
indexToObjectID := make(map[uint32]oid.ID)
|
||||||
|
success := p.resolveRemoteECChunks(ctx, parentAddress, nodes, requiredChunkIndexes, indexToObjectID)
|
||||||
|
if !success {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, candidates := range requiredChunkIndexes {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(objInfo.Address.Container())
|
||||||
|
addr.SetObject(indexToObjectID[index])
|
||||||
|
p.replicator.HandlePullTask(ctx, replicator.Task{
|
||||||
aarifullin
commented
Just for curiosity: do we always need to pull missing chunks from remote node? I suppose this is not a big deal and barely gives a big profit... Just for curiosity: do we *always* need to pull missing chunks from remote node?
`processECChunk` sets `removeLocal = true` but a chunk, that is being replicated, is not removed yet until `pullRequiredECChunks`'s run. Could a policer try to `reconstruct` missing chunk if the local node, *by good fortune*, keeps required chunks for reconstruction?
I suppose this is not a big deal and barely gives a big profit...
dstepanov-yadro
commented
By default local node should keep only one chunk. By default local node should keep only one chunk.
|
|||||||
|
Addr: addr,
|
||||||
|
Nodes: candidates,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// there was some missing chunks, it's not ok
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) collectRequiredECChunks(nodes []netmap.NodeInfo, objInfo objectcore.Info) map[uint32][]netmap.NodeInfo {
|
||||||
|
requiredChunkIndexes := make(map[uint32][]netmap.NodeInfo)
|
||||||
|
for i, n := range nodes {
|
||||||
|
if uint32(i) == objInfo.ECInfo.Total {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||||
|
requiredChunkIndexes[uint32(i)] = []netmap.NodeInfo{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return requiredChunkIndexes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) resolveLocalECChunks(ctx context.Context, parentAddress oid.Address, required map[uint32][]netmap.NodeInfo) error {
|
||||||
|
_, err := p.localHeader(ctx, parentAddress)
|
||||||
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
if err == nil { // should not be happen
|
||||||
|
return errNoECinfoReturnded
|
||||||
|
}
|
||||||
|
if !errors.As(err, &eiErr) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, ch := range eiErr.ECInfo().Chunks {
|
||||||
|
delete(required, ch.Index)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) resolveRemoteECChunks(ctx context.Context, parentAddress oid.Address, nodes []netmap.NodeInfo, required map[uint32][]netmap.NodeInfo, indexToObjectID map[uint32]oid.ID) bool {
|
||||||
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
for _, n := range nodes {
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err := p.remoteHeader(ctx, n, parentAddress, true)
|
||||||
|
if !errors.As(err, &eiErr) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, ch := range eiErr.ECInfo().Chunks {
|
||||||
|
if candidates, ok := required[ch.Index]; ok {
|
||||||
|
candidates = append(candidates, n)
|
||||||
|
required[ch.Index] = candidates
|
||||||
|
|
||||||
|
var chunkID oid.ID
|
||||||
|
if err := chunkID.ReadFromV2(ch.ID); err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if existed, ok := indexToObjectID[ch.Index]; ok && existed != chunkID {
|
||||||
|
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", existed),
|
||||||
|
zap.Stringer("second", chunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
indexToObjectID[ch.Index] = chunkID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, candidates := range required {
|
||||||
|
if len(candidates) == 0 {
|
||||||
|
p.log.Error(logs.PolicerMissingECChunk, zap.Stringer("object", parentAddress), zap.Uint32("index", index))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) adjustECPlacement(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo, policy netmap.PlacementPolicy) {
|
||||||
|
var parentAddress oid.Address
|
||||||
|
parentAddress.SetContainer(objInfo.Address.Container())
|
||||||
|
parentAddress.SetObject(objInfo.ECInfo.ParentID)
|
||||||
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
resolved := make(map[uint32][]netmap.NodeInfo)
|
||||||
|
chunkIDs := make(map[uint32]oid.ID)
|
||||||
|
restore := true // do not restore EC chunks if some node returned error
|
||||||
|
for idx, n := range nodes {
|
||||||
|
if uint32(idx) >= objInfo.ECInfo.Total && uint32(len(resolved)) == objInfo.ECInfo.Total {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
||||||
|
_, err = p.localHeader(ctx, parentAddress)
|
||||||
|
} else {
|
||||||
|
_, err = p.remoteHeader(ctx, n, parentAddress, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.As(err, &eiErr) {
|
||||||
|
for _, ch := range eiErr.ECInfo().Chunks {
|
||||||
|
resolved[ch.Index] = append(resolved[ch.Index], n)
|
||||||
|
var ecInfoChunkID oid.ID
|
||||||
|
if err := ecInfoChunkID.ReadFromV2(ch.ID); err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToDecodeECChunkID, zap.Error(err), zap.Stringer("object", parentAddress))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if chunkID, exist := chunkIDs[ch.Index]; exist && chunkID != ecInfoChunkID {
|
||||||
|
p.log.Error(logs.PolicerDifferentObjectIDForTheSameECChunk, zap.Stringer("first", chunkID),
|
||||||
|
zap.Stringer("second", ecInfoChunkID), zap.Stringer("object", parentAddress), zap.Uint32("index", ch.Index))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
chunkIDs[ch.Index] = ecInfoChunkID
|
||||||
|
}
|
||||||
|
} else if !p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) && uint32(idx) < objInfo.ECInfo.Total {
|
||||||
|
p.log.Warn(logs.PolicerCouldNotGetObjectFromNodeMoving, zap.String("node", hex.EncodeToString(n.PublicKey())), zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||||
|
NumCopies: 1,
|
||||||
|
Addr: objInfo.Address,
|
||||||
|
Nodes: []netmap.NodeInfo{n},
|
||||||
|
}, newNodeCache())
|
||||||
|
restore = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !restore || uint32(len(resolved)) == objInfo.ECInfo.Total {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if objInfo.ECInfo.Total-uint32(len(resolved)) > policy.ReplicaDescriptor(0).GetECParityCount() {
|
||||||
|
var found []uint32
|
||||||
|
for i := range resolved {
|
||||||
|
found = append(found, i)
|
||||||
|
}
|
||||||
|
p.log.Error(logs.PolicerCouldNotRestoreObjectNotEnoughChunks, zap.Stringer("object", parentAddress), zap.Uint32s("found_chunks", found))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.restoreECObject(ctx, objInfo, parentAddress, nodes, resolved, chunkIDs, policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) restoreECObject(ctx context.Context, objInfo objectcore.Info, parentAddress oid.Address, nodes []netmap.NodeInfo, existedChunks map[uint32][]netmap.NodeInfo, chunkIDs map[uint32]oid.ID, policy netmap.PlacementPolicy) {
|
||||||
|
c, err := erasurecode.NewConstructor(int(policy.ReplicaDescriptor(0).GetECDataCount()), int(policy.ReplicaDescriptor(0).GetECParityCount()))
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
parts := p.collectExistedChunks(ctx, objInfo, existedChunks, parentAddress, chunkIDs)
|
||||||
|
if parts == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key, err := p.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
required := make([]bool, len(parts))
|
||||||
|
for i, p := range parts {
|
||||||
|
if p == nil {
|
||||||
|
required[i] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := c.ReconstructParts(parts, required, key); err != nil {
|
||||||
|
p.log.Error(logs.PolicerFailedToRestoreObject, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for idx, part := range parts {
|
||||||
|
if _, exists := existedChunks[uint32(idx)]; exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(parentAddress.Container())
|
||||||
|
pID, _ := part.ID()
|
||||||
|
addr.SetObject(pID)
|
||||||
|
targetNode := nodes[idx%len(nodes)]
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(targetNode.PublicKey()) {
|
||||||
|
p.replicator.HandleLocalPutTask(ctx, replicator.Task{
|
||||||
|
Addr: addr,
|
||||||
|
Obj: part,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
p.replicator.HandleReplicationTask(ctx, replicator.Task{
|
||||||
|
NumCopies: 1,
|
||||||
|
Addr: addr,
|
||||||
|
Nodes: []netmap.NodeInfo{targetNode},
|
||||||
|
Obj: part,
|
||||||
|
}, newNodeCache())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Policer) collectExistedChunks(ctx context.Context, objInfo objectcore.Info, existedChunks map[uint32][]netmap.NodeInfo, parentAddress oid.Address, chunkIDs map[uint32]oid.ID) []*objectSDK.Object {
|
||||||
|
parts := make([]*objectSDK.Object, objInfo.ECInfo.Total)
|
||||||
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for idx, nodes := range existedChunks {
|
||||||
|
idx := idx
|
||||||
|
nodes := nodes
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
var objID oid.Address
|
||||||
|
objID.SetContainer(parentAddress.Container())
|
||||||
|
objID.SetObject(chunkIDs[idx])
|
||||||
|
var obj *objectSDK.Object
|
||||||
|
var err error
|
||||||
|
for _, node := range nodes {
|
||||||
|
if p.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
|
obj, err = p.localObject(egCtx, objID)
|
||||||
|
} else {
|
||||||
|
obj, err = p.remoteObject(egCtx, node, objID)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
p.log.Warn(logs.PolicerCouldNotGetChunk, zap.Stringer("object", parentAddress), zap.Stringer("chunkID", objID), zap.Error(err), zap.String("node", hex.EncodeToString(node.PublicKey())))
|
||||||
|
}
|
||||||
|
if obj != nil {
|
||||||
|
parts[idx] = obj
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := errGroup.Wait(); err != nil {
|
||||||
|
p.log.Error(logs.PolicerCouldNotGetChunks, zap.Stringer("object", parentAddress), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return parts
|
||||||
|
}
|
565
pkg/services/policer/ec_test.go
Normal file
565
pkg/services/policer/ec_test.go
Normal file
|
@ -0,0 +1,565 @@
|
||||||
|
package policer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestECChunkHasValidPlacement(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
chunkAddress := oidtest.Address()
|
||||||
|
parentID := oidtest.ID()
|
||||||
|
|
||||||
|
var policy netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||||
|
|
||||||
|
cnr := &container.Container{}
|
||||||
|
cnr.Value.Init()
|
||||||
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
|
containerSrc := containerSrc{
|
||||||
|
get: func(id cid.ID) (*container.Container, error) {
|
||||||
|
if id.Equals(chunkAddress.Container()) {
|
||||||
|
return cnr, nil
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||||
|
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
|
||||||
|
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("unexpected placement build")
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, raw, "remote header for parent object must be called with raw flag")
|
||||||
|
index := int(ni.PublicKey()[0])
|
||||||
|
require.True(t, index == 1 || index == 2, "invalid node to get parent header")
|
||||||
|
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = uint32(index)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, a.Container() == chunkAddress.Container() && a.Object() == parentID, "invalid address to get remote header")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = uint32(0)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(remoteHeadFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadFn),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
objInfo := objectcore.Info{
|
||||||
|
Address: chunkAddress,
|
||||||
|
Type: objectSDK.TypeRegular,
|
||||||
|
ECInfo: &objectcore.ECInfo{
|
||||||
|
ParentID: parentID,
|
||||||
|
Index: 0,
|
||||||
|
Total: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestECChunkHasInvalidPlacement(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
chunkAddress := oidtest.Address()
|
||||||
|
parentID := oidtest.ID()
|
||||||
|
chunkObject := objectSDK.New()
|
||||||
|
chunkObject.SetContainerID(chunkAddress.Container())
|
||||||
|
chunkObject.SetID(chunkAddress.Object())
|
||||||
|
chunkObject.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||||
|
chunkObject.SetPayloadSize(uint64(10))
|
||||||
|
chunkObject.SetECHeader(objectSDK.NewECHeader(parentID, 1, 3, []byte{}, 0))
|
||||||
|
|
||||||
|
var policy netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||||
|
|
||||||
|
cnr := &container.Container{}
|
||||||
|
cnr.Value.Init()
|
||||||
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
|
containerSrc := containerSrc{
|
||||||
|
get: func(id cid.ID) (*container.Container, error) {
|
||||||
|
if id.Equals(chunkAddress.Container()) {
|
||||||
|
return cnr, nil
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||||
|
if cnr.Equals(chunkAddress.Container()) && obj.Equals(parentID) {
|
||||||
|
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("unexpected placement build")
|
||||||
|
}
|
||||||
|
|
||||||
|
objInfo := objectcore.Info{
|
||||||
|
Address: chunkAddress,
|
||||||
|
Type: objectSDK.TypeRegular,
|
||||||
|
ECInfo: &objectcore.ECInfo{
|
||||||
|
ParentID: parentID,
|
||||||
|
Index: 1,
|
||||||
|
Total: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("node0 has chunk1, node1 has chunk0 and chunk1", func(t *testing.T) {
|
||||||
|
// policer should pull chunk0 on first run and drop chunk1 on second run
|
||||||
|
var allowDrop bool
|
||||||
|
requiredChunkID := oidtest.ID()
|
||||||
|
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||||
|
return chunkObject, nil
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.Index = 0
|
||||||
|
ch.SetID(requiredChunkID)
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 2
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
require.Fail(t, "unexpected remote HEAD")
|
||||||
|
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||||
|
if allowDrop {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.SetID(requiredChunkID)
|
||||||
|
ch.Index = 0
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var pullCounter atomic.Int64
|
||||||
|
var dropped []oid.Address
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(headFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadF),
|
||||||
|
WithReplicator(&testReplicator{
|
||||||
|
handlePullTask: (func(ctx context.Context, r replicator.Task) {
|
||||||
|
require.True(t, r.Addr.Container() == chunkAddress.Container() && r.Addr.Object() == requiredChunkID &&
|
||||||
|
len(r.Nodes) == 1 && bytes.Equal(r.Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid pull task")
|
||||||
|
pullCounter.Add(1)
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||||
|
require.True(t, allowDrop, "invalid redundent copy call")
|
||||||
|
dropped = append(dropped, a)
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
|
||||||
|
require.Equal(t, 0, len(dropped), "invalid dropped count")
|
||||||
|
allowDrop = true
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(1), pullCounter.Load(), "invalid pull count")
|
||||||
|
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||||
|
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("node0 has chunk0 and chunk1, node1 has chunk1", func(t *testing.T) {
|
||||||
|
// policer should drop chunk1
|
||||||
|
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||||
|
return chunkObject, nil
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 2
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
require.Fail(t, "unexpected remote HEAD")
|
||||||
|
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 0
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var dropped []oid.Address
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(headFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadF),
|
||||||
|
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||||
|
dropped = append(dropped, a)
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||||
|
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("node0 has chunk0 and chunk1, node1 has no chunks", func(t *testing.T) {
|
||||||
|
// policer should replicate chunk1 to node1 on first run and drop chunk1 on node0 on second run
|
||||||
|
var secondRun bool
|
||||||
|
headFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a == chunkAddress && !raw {
|
||||||
|
if !secondRun {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
return chunkObject, nil
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[1].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[2].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 2
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if bytes.Equal(ni.PublicKey(), nodes[3].PublicKey()) && a.Container() == chunkAddress.Container() &&
|
||||||
|
a.Object() == parentID && raw {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
require.Fail(t, "unexpected remote HEAD")
|
||||||
|
return nil, fmt.Errorf("unexpected remote HEAD")
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadF := func(_ context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, addr.Container() == chunkAddress.Container() && addr.Object() == parentID, "unexpected local HEAD")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkAddress.Object())
|
||||||
|
ch.Index = 1
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
ch.SetID(oidtest.ID())
|
||||||
|
ch.Index = 0
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var dropped []oid.Address
|
||||||
|
var replicated []replicator.Task
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(headFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadF),
|
||||||
|
WithRedundantCopyCallback(func(ctx context.Context, a oid.Address) {
|
||||||
|
dropped = append(dropped, a)
|
||||||
|
}),
|
||||||
|
WithReplicator(&testReplicator{
|
||||||
|
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
|
||||||
|
replicated = append(replicated, t)
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
)
|
||||||
|
|
||||||
|
err := p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(dropped), "invalid dropped count")
|
||||||
|
require.Equal(t, 1, len(replicated), "invalid replicated count")
|
||||||
|
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
|
||||||
|
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
|
||||||
|
|
||||||
|
secondRun = true
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(replicated), "invalid replicated count")
|
||||||
|
require.Equal(t, chunkAddress, replicated[0].Addr, "invalid replicated object")
|
||||||
|
require.True(t, bytes.Equal(replicated[0].Nodes[0].PublicKey(), nodes[1].PublicKey()), "invalid replicate target")
|
||||||
|
require.Equal(t, 1, len(dropped), "invalid dropped count")
|
||||||
|
require.True(t, chunkAddress.Equals(dropped[0]), "invalid dropped object")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestECChunkRestore(t *testing.T) {
|
||||||
|
// node0 has chunk0, node1 has chunk1
|
||||||
|
// policer should replicate chunk0 to node2 on the first run
|
||||||
|
// then restore EC object and replicate chunk2 to node2 on the second run
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
payload := make([]byte, 64)
|
||||||
|
rand.Read(payload)
|
||||||
|
parentAddress := oidtest.Address()
|
||||||
|
parentObject := objectSDK.New()
|
||||||
|
parentObject.SetContainerID(parentAddress.Container())
|
||||||
|
parentObject.SetPayload(payload)
|
||||||
|
parentObject.SetPayloadSize(64)
|
||||||
|
objectSDK.CalculateAndSetPayloadChecksum(parentObject)
|
||||||
|
err := objectSDK.CalculateAndSetID(parentObject)
|
||||||
|
require.NoError(t, err)
|
||||||
|
id, _ := parentObject.ID()
|
||||||
|
parentAddress.SetObject(id)
|
||||||
|
|
||||||
|
chunkIDs := make([]oid.ID, 3)
|
||||||
|
c, err := erasurecode.NewConstructor(2, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
chunks, err := c.Split(parentObject, &key.PrivateKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for i, ch := range chunks {
|
||||||
|
chunkIDs[i], _ = ch.ID()
|
||||||
|
}
|
||||||
|
|
||||||
|
var policy netmapSDK.PlacementPolicy
|
||||||
|
require.NoError(t, policy.DecodeString("EC 2.1"))
|
||||||
|
|
||||||
|
cnr := &container.Container{}
|
||||||
|
cnr.Value.Init()
|
||||||
|
cnr.Value.SetPlacementPolicy(policy)
|
||||||
|
containerSrc := containerSrc{
|
||||||
|
get: func(id cid.ID) (*container.Container, error) {
|
||||||
|
if id.Equals(parentAddress.Container()) {
|
||||||
|
return cnr, nil
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]netmapSDK.NodeInfo, 4)
|
||||||
|
for i := range nodes {
|
||||||
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmapSDK.PlacementPolicy) ([][]netmapSDK.NodeInfo, error) {
|
||||||
|
if cnr.Equals(parentAddress.Container()) && obj.Equals(parentAddress.Object()) {
|
||||||
|
return [][]netmapSDK.NodeInfo{nodes}, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("unexpected placement build")
|
||||||
|
}
|
||||||
|
var secondRun bool
|
||||||
|
remoteHeadFn := func(_ context.Context, ni netmapSDK.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, raw, "remote header for parent object must be called with raw flag")
|
||||||
|
index := int(ni.PublicKey()[0])
|
||||||
|
require.True(t, index == 1 || index == 2 || index == 3, "invalid node to get parent header")
|
||||||
|
require.True(t, a == parentAddress, "invalid address to get remote header")
|
||||||
|
if index == 1 {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkIDs[1])
|
||||||
|
ch.Index = uint32(1)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
if index == 2 && secondRun {
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkIDs[0])
|
||||||
|
ch.Index = uint32(0)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
localHeadFn := func(_ context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, a == parentAddress, "invalid address to get remote header")
|
||||||
|
ei := objectSDK.NewECInfo()
|
||||||
|
var ch objectSDK.ECChunk
|
||||||
|
ch.SetID(chunkIDs[0])
|
||||||
|
ch.Index = uint32(0)
|
||||||
|
ch.Total = 3
|
||||||
|
ei.AddChunk(ch)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
|
}
|
||||||
|
|
||||||
|
var replicatedObj []*objectSDK.Object
|
||||||
|
p := New(
|
||||||
|
WithContainerSource(containerSrc),
|
||||||
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
||||||
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
||||||
|
return bytes.Equal(k, nodes[0].PublicKey())
|
||||||
|
})),
|
||||||
|
WithRemoteObjectHeaderFunc(remoteHeadFn),
|
||||||
|
WithLocalObjectHeaderFunc(localHeadFn),
|
||||||
|
WithReplicator(&testReplicator{
|
||||||
|
handleReplicationTask: func(ctx context.Context, t replicator.Task, tr replicator.TaskResult) {
|
||||||
|
if t.Obj != nil {
|
||||||
|
replicatedObj = append(replicatedObj, t.Obj)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
WithLocalObjectGetFunc(func(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
require.True(t, a.Container() == parentAddress.Container() && a.Object() == chunkIDs[0], "invalid local object request")
|
||||||
|
return chunks[0], nil
|
||||||
|
}),
|
||||||
|
WithRemoteObjectGetFunc(func(ctx context.Context, ni netmapSDK.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
||||||
|
index := ni.PublicKey()[0]
|
||||||
|
if index == 2 {
|
||||||
|
return nil, new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
return chunks[index], nil
|
||||||
|
}),
|
||||||
|
WithPool(testPool(t)),
|
||||||
|
WithKeyStorage(util.NewKeyStorage(&key.PrivateKey, nil, nil)),
|
||||||
|
)
|
||||||
|
|
||||||
|
var chunkAddress oid.Address
|
||||||
|
chunkAddress.SetContainer(parentAddress.Container())
|
||||||
|
chunkAddress.SetObject(chunkIDs[0])
|
||||||
|
objInfo := objectcore.Info{
|
||||||
|
Address: chunkAddress,
|
||||||
|
Type: objectSDK.TypeRegular,
|
||||||
|
ECInfo: &objectcore.ECInfo{
|
||||||
|
ParentID: parentAddress.Object(),
|
||||||
|
Index: 0,
|
||||||
|
Total: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
secondRun = true
|
||||||
|
err = p.processObject(context.Background(), objInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(replicatedObj), "invalid replicated objects count")
|
||||||
|
chunks[2].SetSignature(nil)
|
||||||
|
expectedData, err := chunks[2].MarshalJSON()
|
||||||
|
require.NoError(t, err)
|
||||||
|
replicatedObj[0].SetSignature(nil)
|
||||||
|
actualData, err := replicatedObj[0].MarshalJSON()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, string(expectedData), string(actualData), "invalid restored objects")
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
@ -22,7 +23,7 @@ import (
|
||||||
// Note that the underlying implementation might be circular: i.e. it can restart
|
// Note that the underlying implementation might be circular: i.e. it can restart
|
||||||
// when the end of the key space is reached.
|
// when the end of the key space is reached.
|
||||||
type KeySpaceIterator interface {
|
type KeySpaceIterator interface {
|
||||||
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
Next(context.Context, uint32) ([]objectcore.Info, error)
|
||||||
Rewind()
|
Rewind()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,11 +36,20 @@ type BuryFunc func(context.Context, oid.Address) error
|
||||||
|
|
||||||
// Replicator is the interface to a consumer of replication tasks.
|
// Replicator is the interface to a consumer of replication tasks.
|
||||||
type Replicator interface {
|
type Replicator interface {
|
||||||
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||||
|
HandlePullTask(ctx context.Context, task replicator.Task)
|
||||||
|
HandleLocalPutTask(ctx context.Context, task replicator.Task)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
||||||
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address, bool) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
// LocalObjectHeaderFunc is the function to obtain HEAD info from the current node.
|
||||||
|
type LocalObjectHeaderFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
type RemoteObjectGetFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
type LocalObjectGetFunc func(context.Context, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
headTimeout time.Duration
|
headTimeout time.Duration
|
||||||
|
@ -56,6 +66,8 @@ type cfg struct {
|
||||||
|
|
||||||
remoteHeader RemoteObjectHeaderFunc
|
remoteHeader RemoteObjectHeaderFunc
|
||||||
|
|
||||||
|
localHeader LocalObjectHeaderFunc
|
||||||
|
|
||||||
netmapKeys netmap.AnnouncedKeys
|
netmapKeys netmap.AnnouncedKeys
|
||||||
|
|
||||||
replicator Replicator
|
replicator Replicator
|
||||||
|
@ -69,6 +81,12 @@ type cfg struct {
|
||||||
evictDuration, sleepDuration time.Duration
|
evictDuration, sleepDuration time.Duration
|
||||||
|
|
||||||
metrics MetricsRegister
|
metrics MetricsRegister
|
||||||
|
|
||||||
|
remoteObject RemoteObjectGetFunc
|
||||||
|
|
||||||
|
localObject LocalObjectGetFunc
|
||||||
|
|
||||||
|
keyStorage *util.KeyStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -125,13 +143,32 @@ func WithPlacementBuilder(v placement.Builder) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
|
// WithRemoteObjectHeader returns option to set remote object header receiver of Policer.
|
||||||
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.remoteHeader = v
|
c.remoteHeader = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLocalObjectHeaderFunc returns option to set local object header receiver of Policer.
|
||||||
|
func WithLocalObjectHeaderFunc(v LocalObjectHeaderFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.localHeader = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithRemoteObjectGetFunc(v RemoteObjectGetFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.remoteObject = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithLocalObjectGetFunc(v LocalObjectGetFunc) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.localObject = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
||||||
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
@ -169,3 +206,9 @@ func WithMetrics(m MetricsRegister) Option {
|
||||||
c.metrics = m
|
c.metrics = m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithKeyStorage(ks *util.KeyStorage) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.keyStorage = ks
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ import (
|
||||||
func TestBuryObjectWithoutContainer(t *testing.T) {
|
func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||||
// Key space
|
// Key space
|
||||||
addr := oidtest.Address()
|
addr := oidtest.Address()
|
||||||
objs := []objectcore.AddressWithType{
|
objs := []objectcore.Info{
|
||||||
{
|
{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Type: objectSDK.TypeRegular,
|
Type: objectSDK.TypeRegular,
|
||||||
|
@ -78,6 +78,7 @@ func TestProcessObject(t *testing.T) {
|
||||||
maintenanceNodes []int
|
maintenanceNodes []int
|
||||||
wantRemoveRedundant bool
|
wantRemoveRedundant bool
|
||||||
wantReplicateTo []int
|
wantReplicateTo []int
|
||||||
|
ecInfo *objectcore.ECInfo
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "1 copy already held by local node",
|
desc: "1 copy already held by local node",
|
||||||
|
@ -144,6 +145,22 @@ func TestProcessObject(t *testing.T) {
|
||||||
objHolders: []int{1},
|
objHolders: []int{1},
|
||||||
maintenanceNodes: []int{2},
|
maintenanceNodes: []int{2},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "lock object must be replicated to all EC nodes",
|
||||||
|
objType: objectSDK.TypeLock,
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `EC 1.1`,
|
||||||
|
placement: [][]int{{0, 1, 2}},
|
||||||
|
wantReplicateTo: []int{1, 2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "tombstone object must be replicated to all EC nodes",
|
||||||
|
objType: objectSDK.TypeTombstone,
|
||||||
|
nodeCount: 3,
|
||||||
|
policy: `EC 1.1`,
|
||||||
|
placement: [][]int{{0, 1, 2}},
|
||||||
|
wantReplicateTo: []int{1, 2},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range tests {
|
for i := range tests {
|
||||||
|
@ -173,12 +190,15 @@ func TestProcessObject(t *testing.T) {
|
||||||
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
||||||
return placementVectors, nil
|
return placementVectors, nil
|
||||||
}
|
}
|
||||||
|
if ti.ecInfo != nil && cnr.Equals(addr.Container()) && obj != nil && obj.Equals(ti.ecInfo.ParentID) {
|
||||||
|
return placementVectors, nil
|
||||||
|
}
|
||||||
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
||||||
return nil, errors.New("unexpected placement build")
|
return nil, errors.New("unexpected placement build")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object remote header
|
// Object remote header
|
||||||
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||||
index := int(ni.PublicKey()[0])
|
index := int(ni.PublicKey()[0])
|
||||||
if a != addr || index < 1 || index >= ti.nodeCount {
|
if a != addr || index < 1 || index >= ti.nodeCount {
|
||||||
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
||||||
|
@ -229,18 +249,21 @@ func TestProcessObject(t *testing.T) {
|
||||||
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
|
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
|
||||||
gotRemoveRedundant = true
|
gotRemoveRedundant = true
|
||||||
}),
|
}),
|
||||||
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
WithReplicator(&testReplicator{
|
||||||
|
handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||||
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
|
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
|
||||||
for _, node := range task.Nodes {
|
for _, node := range task.Nodes {
|
||||||
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
||||||
}
|
}
|
||||||
})),
|
},
|
||||||
|
}),
|
||||||
WithPool(testPool(t)),
|
WithPool(testPool(t)),
|
||||||
)
|
)
|
||||||
|
|
||||||
addrWithType := objectcore.AddressWithType{
|
addrWithType := objectcore.Info{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Type: ti.objType,
|
Type: ti.objType,
|
||||||
|
ECInfo: ti.ecInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.processObject(context.Background(), addrWithType)
|
err := p.processObject(context.Background(), addrWithType)
|
||||||
|
@ -276,7 +299,7 @@ func TestProcessObjectError(t *testing.T) {
|
||||||
WithPool(testPool(t)),
|
WithPool(testPool(t)),
|
||||||
)
|
)
|
||||||
|
|
||||||
addrWithType := objectcore.AddressWithType{
|
addrWithType := objectcore.Info{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,7 +308,7 @@ func TestProcessObjectError(t *testing.T) {
|
||||||
|
|
||||||
func TestIteratorContract(t *testing.T) {
|
func TestIteratorContract(t *testing.T) {
|
||||||
addr := oidtest.Address()
|
addr := oidtest.Address()
|
||||||
objs := []objectcore.AddressWithType{{
|
objs := []objectcore.Info{{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Type: objectSDK.TypeRegular,
|
Type: objectSDK.TypeRegular,
|
||||||
}}
|
}}
|
||||||
|
@ -350,7 +373,7 @@ func testPool(t *testing.T) *ants.Pool {
|
||||||
}
|
}
|
||||||
|
|
||||||
type nextResult struct {
|
type nextResult struct {
|
||||||
objs []objectcore.AddressWithType
|
objs []objectcore.Info
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,7 +384,7 @@ type predefinedIterator struct {
|
||||||
calls []string
|
calls []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.Info, error) {
|
||||||
if it.pos == len(it.scenario) {
|
if it.pos == len(it.scenario) {
|
||||||
close(it.finishCh)
|
close(it.finishCh)
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
@ -380,11 +403,11 @@ func (it *predefinedIterator) Rewind() {
|
||||||
|
|
||||||
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
||||||
type sliceKeySpaceIterator struct {
|
type sliceKeySpaceIterator struct {
|
||||||
objs []objectcore.AddressWithType
|
objs []objectcore.Info
|
||||||
cur int
|
cur int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.Info, error) {
|
||||||
if it.cur >= len(it.objs) {
|
if it.cur >= len(it.objs) {
|
||||||
return nil, engine.ErrEndOfListing
|
return nil, engine.ErrEndOfListing
|
||||||
}
|
}
|
||||||
|
@ -419,9 +442,20 @@ type announcedKeysFunc func([]byte) bool
|
||||||
|
|
||||||
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
||||||
|
|
||||||
// replicatorFunc is a Replicator backed by a function.
|
type testReplicator struct {
|
||||||
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
||||||
|
handleLocalPutTask func(ctx context.Context, task replicator.Task)
|
||||||
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
handlePullTask func(ctx context.Context, task replicator.Task)
|
||||||
f(ctx, task, res)
|
}
|
||||||
|
|
||||||
|
func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
||||||
|
r.handleReplicationTask(ctx, task, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) {
|
||||||
|
r.handleLocalPutTask(ctx, task)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) {
|
||||||
|
r.handlePullTask(ctx, task)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,9 +21,9 @@ type TaskResult interface {
|
||||||
SubmitSuccessfulReplication(netmap.NodeInfo)
|
SubmitSuccessfulReplication(netmap.NodeInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleTask executes replication task inside invoking goroutine.
|
// HandleReplicationTask executes replication task inside invoking goroutine.
|
||||||
// Passes all the nodes that accepted the replication to the TaskResult.
|
// Passes all the nodes that accepted the replication to the TaskResult.
|
||||||
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
|
func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res TaskResult) {
|
||||||
p.metrics.IncInFlightRequest()
|
p.metrics.IncInFlightRequest()
|
||||||
defer p.metrics.DecInFlightRequest()
|
defer p.metrics.DecInFlightRequest()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -32,7 +32,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
||||||
)
|
)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleTask",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleReplicateTask",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.Stringer("address", task.Addr),
|
attribute.Stringer("address", task.Addr),
|
||||||
attribute.Int64("number_of_copies", int64(task.NumCopies)),
|
attribute.Int64("number_of_copies", int64(task.NumCopies)),
|
||||||
|
|
72
pkg/services/replicator/pull.go
Normal file
72
pkg/services/replicator/pull.go
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
package replicator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node")
|
||||||
|
|
||||||
|
func (p *Replicator) HandlePullTask(ctx context.Context, task Task) {
|
||||||
|
p.metrics.IncInFlightRequest()
|
||||||
|
defer p.metrics.DecInFlightRequest()
|
||||||
|
defer func() {
|
||||||
|
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandlePullTask",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Stringer("address", task.Addr),
|
||||||
|
attribute.Int("nodes_count", len(task.Nodes)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var obj *objectSDK.Object
|
||||||
|
|
||||||
|
for _, node := range task.Nodes {
|
||||||
|
var err error
|
||||||
|
obj, err = p.remoteGetter.Get(ctx, getsvc.RemoteGetPrm{
|
||||||
|
Address: task.Addr,
|
||||||
|
Node: node,
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var endpoints []string
|
||||||
|
node.IterateNetworkEndpoints(func(s string) bool {
|
||||||
|
endpoints = append(endpoints, s)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(err),
|
||||||
|
zap.Strings("endpoints", endpoints),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj == nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(errFailedToGetObjectFromAnyNode),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := engine.Put(ctx, p.localStorage, obj)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
}
|
47
pkg/services/replicator/put.go
Normal file
47
pkg/services/replicator/put.go
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
package replicator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errObjectNotDefined = errors.New("object is not defined")
|
||||||
|
|
||||||
|
func (p *Replicator) HandleLocalPutTask(ctx context.Context, task Task) {
|
||||||
|
p.metrics.IncInFlightRequest()
|
||||||
|
defer p.metrics.DecInFlightRequest()
|
||||||
|
defer func() {
|
||||||
|
p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandleLocalPutTask",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Stringer("address", task.Addr),
|
||||||
|
attribute.Int("nodes_count", len(task.Nodes)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if task.Obj == nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(errObjectNotDefined),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := engine.Put(ctx, p.localStorage, task.Obj)
|
||||||
|
if err != nil {
|
||||||
|
p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage,
|
||||||
|
zap.Stringer("object", task.Addr),
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -25,6 +26,8 @@ type cfg struct {
|
||||||
|
|
||||||
remoteSender *putsvc.RemoteSender
|
remoteSender *putsvc.RemoteSender
|
||||||
|
|
||||||
|
remoteGetter *getsvc.RemoteGetter
|
||||||
|
|
||||||
localStorage *engine.StorageEngine
|
localStorage *engine.StorageEngine
|
||||||
|
|
||||||
metrics MetricsRegister
|
metrics MetricsRegister
|
||||||
|
@ -70,6 +73,12 @@ func WithRemoteSender(v *putsvc.RemoteSender) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithRemoteGetter(v *getsvc.RemoteGetter) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.remoteGetter = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithLocalStorage returns option to set local object storage of Replicator.
|
// WithLocalStorage returns option to set local object storage of Replicator.
|
||||||
func WithLocalStorage(v *engine.StorageEngine) Option {
|
func WithLocalStorage(v *engine.StorageEngine) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
|
Loading…
Reference in a new issue
Why have you decided to go with 2 func options instead of an interface?
Because this approach has already been used in policer.