Compare commits
4 commits
master
...
bugfix/del
Author | SHA1 | Date | |
---|---|---|---|
8cae3eea6b | |||
8e884349ee | |||
4360ca32f7 | |||
d2bc0b83b1 |
16 changed files with 135 additions and 30 deletions
1
Makefile
1
Makefile
|
@ -40,6 +40,7 @@ $(BINS): $(DIRS) dep
|
||||||
@echo "⇒ Build $@"
|
@echo "⇒ Build $@"
|
||||||
CGO_ENABLED=0 \
|
CGO_ENABLED=0 \
|
||||||
go build -v -trimpath \
|
go build -v -trimpath \
|
||||||
|
-gcflags "all=-N -l" \
|
||||||
-ldflags "-X $(REPO)/misc.Version=$(VERSION)" \
|
-ldflags "-X $(REPO)/misc.Version=$(VERSION)" \
|
||||||
-o $@ ./cmd/$(notdir $@)
|
-o $@ ./cmd/$(notdir $@)
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
@ -141,6 +142,7 @@ func (e *StorageEngine) reportShardError(
|
||||||
if isLogical(err) {
|
if isLogical(err) {
|
||||||
e.log.Warn(msg,
|
e.log.Warn(msg,
|
||||||
zap.Stringer("shard_id", sh.ID()),
|
zap.Stringer("shard_id", sh.ID()),
|
||||||
|
zap.String("logical", strconv.FormatBool(true)),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,6 +127,9 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
defer func() {
|
defer func() {
|
||||||
// if object is root we continue since information about it
|
// if object is root we continue since information about it
|
||||||
// can be presented in other shards
|
// can be presented in other shards
|
||||||
|
e.log.Debug("defer inhumeAddr",
|
||||||
|
zap.Bool("checkExists", checkExists),
|
||||||
|
zap.Bool("root", root), zap.Stringer("shard_id", sh.ID()), zap.Stringer("address", addr))
|
||||||
if checkExists && root {
|
if checkExists && root {
|
||||||
stop = false
|
stop = false
|
||||||
}
|
}
|
||||||
|
@ -136,6 +139,10 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
existPrm.SetAddress(addr)
|
existPrm.SetAddress(addr)
|
||||||
exRes, err := sh.Exists(existPrm)
|
exRes, err := sh.Exists(existPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
e.log.Debug("could not check for presents in shard",
|
||||||
|
zap.Stringer("shard_id", sh.ID()),
|
||||||
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
|
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||||
// inhumed once - no need to be inhumed again
|
// inhumed once - no need to be inhumed again
|
||||||
ok = true
|
ok = true
|
||||||
|
@ -156,6 +163,10 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
||||||
|
|
||||||
_, err := sh.Inhume(ctx, prm)
|
_, err := sh.Inhume(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
e.log.Debug("could not inhume object in shard full",
|
||||||
|
zap.Stringer("shard_id", sh.ID()),
|
||||||
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
switch {
|
switch {
|
||||||
case errors.As(err, &errLocked):
|
case errors.As(err, &errLocked):
|
||||||
retErr = apistatus.ObjectLocked{}
|
retErr = apistatus.ObjectLocked{}
|
||||||
|
@ -209,6 +220,7 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []me
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
e.log.Info("interrupt processing the expired tombstones by context")
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
|
@ -222,7 +234,8 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, l
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
e.log.Info("interrupt processing the expired locks", zap.Error(ctx.Err()))
|
e.log.Info("interrupt processing the expired locks", zap.Error(ctx.Err()),
|
||||||
|
zap.Stringer("sh", sh.ID()))
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
|
@ -236,7 +249,8 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
e.log.Info("interrupt processing the deleted locks", zap.Error(ctx.Err()))
|
e.log.Info("interrupt processing the deleted locks", zap.Error(ctx.Err()),
|
||||||
|
zap.Stringer("sh", sh.ID()))
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -132,15 +132,19 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool
|
||||||
|
|
||||||
_, err = sh.Put(putPrm)
|
_, err = sh.Put(putPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
e.log.Error("could not put object to shard full",
|
||||||
|
zap.Stringer("shard_id", sh.ID()),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
zap.Stringer("address", object.AddressOf(obj)),
|
||||||
|
)
|
||||||
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
|
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
|
||||||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
||||||
e.log.Warn("could not put object to shard",
|
e.log.Warn("could not put object to shard",
|
||||||
zap.Stringer("shard_id", sh.ID()),
|
zap.Stringer("shard_id", sh.ID()))
|
||||||
zap.String("error", err.Error()))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
e.reportShardError(sh, "could not put object to shard", err)
|
e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", object.AddressOf(obj)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SelectPrm groups the parameters of Select operation.
|
// SelectPrm groups the parameters of Select operation.
|
||||||
|
@ -62,6 +63,8 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) {
|
||||||
shPrm.SetFilters(prm.filters)
|
shPrm.SetFilters(prm.filters)
|
||||||
|
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
|
e.log.Debug("checking shard",
|
||||||
|
zap.Stringer("sh", sh.ID()))
|
||||||
res, err := sh.Select(shPrm)
|
res, err := sh.Select(shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "could not select objects from shard", err)
|
e.reportShardError(sh, "could not select objects from shard", err)
|
||||||
|
@ -69,6 +72,9 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range res.AddressList() { // save only unique values
|
for _, addr := range res.AddressList() { // save only unique values
|
||||||
|
e.log.Debug("found addr",
|
||||||
|
zap.Stringer("sh", sh.ID()),
|
||||||
|
zap.Stringer("addr", addr))
|
||||||
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
|
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
|
||||||
uniqueMap[addr.EncodeToString()] = struct{}{}
|
uniqueMap[addr.EncodeToString()] = struct{}{}
|
||||||
addrList = append(addrList, addr)
|
addrList = append(addrList, addr)
|
||||||
|
|
|
@ -7,10 +7,12 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DeletePrm groups the parameters of Delete operation.
|
// DeletePrm groups the parameters of Delete operation.
|
||||||
|
@ -226,8 +228,14 @@ func (db *DB) deleteObject(
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
isParent bool,
|
isParent bool,
|
||||||
) error {
|
) error {
|
||||||
err := delUniqueIndexes(tx, obj, isParent)
|
err := delUniqueIndexes(db.log, tx, obj, isParent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err != nil {
|
||||||
|
db.log.Debug("unable to delete from named bucket",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
zap.Stringer("type", obj.Type()),
|
||||||
|
zap.Stringer("addr", object.AddressOf(obj)))
|
||||||
|
}
|
||||||
return fmt.Errorf("can't remove unique indexes")
|
return fmt.Errorf("can't remove unique indexes")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,11 +269,12 @@ func parentLength(tx *bbolt.Tx, addr oid.Address) int {
|
||||||
return len(lst)
|
return len(lst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
|
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt := tx.Bucket(item.name)
|
bkt := tx.Bucket(item.name)
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
return bkt.Delete(item.key) // ignore error, best effort there
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
|
@ -320,7 +329,8 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
func delUniqueIndexes(log *logger.Logger, tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
||||||
|
var err error
|
||||||
addr := object.AddressOf(obj)
|
addr := object.AddressOf(obj)
|
||||||
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
|
@ -340,32 +350,59 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
bucketName = bucketNameLockers(cnr, bucketName)
|
bucketName = bucketNameLockers(cnr, bucketName)
|
||||||
default:
|
default:
|
||||||
|
log.Debug("del uniq index - unknown object type")
|
||||||
return ErrUnknownObjectType
|
return ErrUnknownObjectType
|
||||||
}
|
}
|
||||||
|
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
err = delUniqueIndexItem(tx, namedBucketItem{
|
||||||
name: bucketName,
|
name: bucketName,
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Debug("unable to delete from named bucket",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
zap.Stringer("type", obj.Type()),
|
||||||
|
zap.Stringer("addr", addr))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
err = delUniqueIndexItem(tx, namedBucketItem{
|
||||||
name: parentBucketName(cnr, bucketName),
|
name: parentBucketName(cnr, bucketName),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Debug("unable to delete from parent bucket",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
zap.Stringer("addr", addr))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
|
err = delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
|
||||||
name: smallBucketName(cnr, bucketName),
|
name: smallBucketName(cnr, bucketName),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
})
|
||||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
|
if err != nil {
|
||||||
|
log.Debug("unable to delete from storage id index",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
zap.Stringer("addr", addr))
|
||||||
|
}
|
||||||
|
err = delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
|
||||||
name: rootBucketName(cnr, bucketName),
|
name: rootBucketName(cnr, bucketName),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
})
|
||||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from ToMoveIt index
|
if err != nil {
|
||||||
|
log.Debug("unable to delete from root index",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
zap.Stringer("addr", addr))
|
||||||
|
}
|
||||||
|
err = delUniqueIndexItem(tx, namedBucketItem{ // remove from ToMoveIt index
|
||||||
name: toMoveItBucketName,
|
name: toMoveItBucketName,
|
||||||
key: addrKey,
|
key: addrKey,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Debug("unable to delete from tomoveit bucket",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
zap.Stringer("addr", addr))
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -96,10 +97,12 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, c
|
||||||
|
|
||||||
if len(group.fastFilters) == 0 {
|
if len(group.fastFilters) == 0 {
|
||||||
expLen = 1
|
expLen = 1
|
||||||
|
db.log.Debug("select all")
|
||||||
db.selectAll(tx, cnr, mAddr)
|
db.selectAll(tx, cnr, mAddr)
|
||||||
} else {
|
} else {
|
||||||
|
db.log.Debug("select fastFilters")
|
||||||
for i := range group.fastFilters {
|
for i := range group.fastFilters {
|
||||||
|
db.log.Debug("fastFilter = " + group.fastFilters[i].Header())
|
||||||
db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i)
|
db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -138,22 +141,34 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, c
|
||||||
// selectAll adds to resulting cache all available objects in metabase.
|
// selectAll adds to resulting cache all available objects in metabase.
|
||||||
func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) {
|
func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) {
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0)
|
db.log.Debug("select from primary")
|
||||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0)
|
selectAllFromBucket(db.log, tx, primaryBucketName(cnr, bucketName), to, 0)
|
||||||
selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, 0)
|
db.log.Debug("select from tombstone")
|
||||||
selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0)
|
selectAllFromBucket(db.log, tx, tombstoneBucketName(cnr, bucketName), to, 0)
|
||||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0)
|
db.log.Debug("select from group")
|
||||||
|
selectAllFromBucket(db.log, tx, storageGroupBucketName(cnr, bucketName), to, 0)
|
||||||
|
db.log.Debug("select from parent")
|
||||||
|
selectAllFromBucket(db.log, tx, parentBucketName(cnr, bucketName), to, 0)
|
||||||
|
db.log.Debug("select from primary")
|
||||||
|
selectAllFromBucket(db.log, tx, bucketNameLockers(cnr, bucketName), to, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
||||||
// resulting cache. Keys should be stringed object ids.
|
// resulting cache. Keys should be stringed object ids.
|
||||||
func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
|
func selectAllFromBucket(log *logger.Logger, tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
|
||||||
bkt := tx.Bucket(name)
|
bkt := tx.Bucket(name)
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = bkt.ForEach(func(k, v []byte) error {
|
_ = bkt.ForEach(func(k, v []byte) error {
|
||||||
|
var id oid.ID
|
||||||
|
err := id.Decode(k)
|
||||||
|
if err == nil {
|
||||||
|
log.Debug("selectAllFromBucket = " + id.String())
|
||||||
|
} else {
|
||||||
|
log.Debug("error decode")
|
||||||
|
}
|
||||||
markAddressInCache(to, fNum, string(k))
|
markAddressInCache(to, fNum, string(k))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -182,7 +197,7 @@ func (db *DB) selectFastFilter(
|
||||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||||
case v2object.FilterHeaderObjectType:
|
case v2object.FilterHeaderObjectType:
|
||||||
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
||||||
selectAllFromBucket(tx, bucketName, to, fNum)
|
selectAllFromBucket(db.log, tx, bucketName, to, fNum)
|
||||||
}
|
}
|
||||||
case v2object.FilterHeaderParent:
|
case v2object.FilterHeaderParent:
|
||||||
bucketName := parentBucketName(cnr, bucketName)
|
bucketName := parentBucketName(cnr, bucketName)
|
||||||
|
@ -191,12 +206,12 @@ func (db *DB) selectFastFilter(
|
||||||
bucketName := splitBucketName(cnr, bucketName)
|
bucketName := splitBucketName(cnr, bucketName)
|
||||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||||
case v2object.FilterPropertyRoot:
|
case v2object.FilterPropertyRoot:
|
||||||
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(db.log, tx, rootBucketName(cnr, bucketName), to, fNum)
|
||||||
case v2object.FilterPropertyPhy:
|
case v2object.FilterPropertyPhy:
|
||||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(db.log, tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(db.log, tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||||
selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(db.log, tx, storageGroupBucketName(cnr, bucketName), to, fNum)
|
||||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
selectAllFromBucket(db.log, tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
||||||
default: // user attribute
|
default: // user attribute
|
||||||
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
|
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
|
||||||
|
|
||||||
|
|
|
@ -102,6 +102,7 @@ func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug("can't remove object from blobStor",
|
s.log.Debug("can't remove object from blobStor",
|
||||||
zap.Stringer("object_address", prm.addr[i]),
|
zap.Stringer("object_address", prm.addr[i]),
|
||||||
|
zap.Stringer("my_shard", s.ID()),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,9 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||||
if err != nil || !tryCache {
|
if err != nil || !tryCache {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug("can't put object to the write-cache, trying blobstor",
|
s.log.Debug("can't put object to the write-cache, trying blobstor",
|
||||||
zap.String("err", err.Error()))
|
zap.String("err", err.Error()),
|
||||||
|
zap.Stringer("address", putPrm.Address),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err = s.blobStor.Put(putPrm)
|
res, err = s.blobStor.Put(putPrm)
|
||||||
|
|
|
@ -266,6 +266,9 @@ func (c *cache) flushObject(obj *object.Object, data []byte) error {
|
||||||
|
|
||||||
_, err = c.metabase.UpdateStorageID(updPrm)
|
_, err = c.metabase.UpdateStorageID(updPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.log.Debug("can't update object storage ID full",
|
||||||
|
zap.String("address", addr.String()),
|
||||||
|
zap.Error(err))
|
||||||
c.reportFlushError("can't update object storage ID",
|
c.reportFlushError("can't update object storage ID",
|
||||||
addr.EncodeToString(), err)
|
addr.EncodeToString(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (s *Service) Delete(ctx context.Context, prm Prm) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) execute(ctx context.Context) {
|
func (exec *execCtx) execute(ctx context.Context) {
|
||||||
exec.log.Debug("serving request...")
|
exec.log.Debug("serving delete request...")
|
||||||
|
|
||||||
// perform local operation
|
// perform local operation
|
||||||
exec.executeLocal(ctx)
|
exec.executeLocal(ctx)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package deletesvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -40,6 +41,8 @@ func (exec *execCtx) formTombstone(ctx context.Context) (ok bool) {
|
||||||
)
|
)
|
||||||
exec.addMembers([]oid.ID{exec.address().Object()})
|
exec.addMembers([]oid.ID{exec.address().Object()})
|
||||||
|
|
||||||
|
exec.log.Debug("tombstone lifetime " + strconv.FormatUint(exec.tombstone.ExpirationEpoch(), 10))
|
||||||
|
exec.log.Debug("CurrentEpoch " + strconv.FormatUint(exec.svc.netInfo.CurrentEpoch(), 10))
|
||||||
exec.log.Debug("forming split info...")
|
exec.log.Debug("forming split info...")
|
||||||
|
|
||||||
ok = exec.formSplitInfo(ctx)
|
ok = exec.formSplitInfo(ctx)
|
||||||
|
|
|
@ -198,7 +198,10 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) (*transformer.
|
||||||
if t.traversal.submitPrimaryPlacementFinish() {
|
if t.traversal.submitPrimaryPlacementFinish() {
|
||||||
_, err = t.iteratePlacement(ctx)
|
_, err = t.iteratePlacement(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Error("additional container broadcast failure", zap.Error(err))
|
t.log.Error("additional container broadcast failure",
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("address", object.AddressOf(t.obj).String()),
|
||||||
|
)
|
||||||
// we don't fail primary operation because of broadcast failure
|
// we don't fail primary operation because of broadcast failure
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,6 +109,11 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||||
|
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
exec.writeIDList(ids)
|
exec.writeIDList(ids)
|
||||||
|
for _, id := range ids {
|
||||||
|
exec.log.Debug("found id on node "+string(info.PublicKey()),
|
||||||
|
zap.String("id", id.String()),
|
||||||
|
)
|
||||||
|
}
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,5 +18,12 @@ func (exec *execCtx) executeLocal() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, id := range ids {
|
||||||
|
exec.log.Debug("found id",
|
||||||
|
zap.String("id", id.String()),
|
||||||
|
zap.String("for", exec.containerID().String()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
exec.writeIDList(ids)
|
exec.writeIDList(ids)
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,6 +129,8 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
if !c.needLocalCopy && c.removeLocalCopy {
|
if !c.needLocalCopy && c.removeLocalCopy {
|
||||||
p.log.Info("redundant local object copy detected",
|
p.log.Info("redundant local object copy detected",
|
||||||
zap.Stringer("object", addr),
|
zap.Stringer("object", addr),
|
||||||
|
zap.Bool("c.needLocalCopy", c.needLocalCopy),
|
||||||
|
zap.Bool("c.removeLocalCopy", c.removeLocalCopy),
|
||||||
)
|
)
|
||||||
|
|
||||||
p.cbRedundantCopy(ctx, addr)
|
p.cbRedundantCopy(ctx, addr)
|
||||||
|
|
Loading…
Reference in a new issue