forked from TrueCloudLab/frostfs-node
Compare commits
4 commits
master
...
bugfix/del
Author | SHA1 | Date | |
---|---|---|---|
8cae3eea6b | |||
8e884349ee | |||
4360ca32f7 | |||
d2bc0b83b1 |
16 changed files with 135 additions and 30 deletions
1
Makefile
1
Makefile
|
@ -40,6 +40,7 @@ $(BINS): $(DIRS) dep
|
|||
@echo "⇒ Build $@"
|
||||
CGO_ENABLED=0 \
|
||||
go build -v -trimpath \
|
||||
-gcflags "all=-N -l" \
|
||||
-ldflags "-X $(REPO)/misc.Version=$(VERSION)" \
|
||||
-o $@ ./cmd/$(notdir $@)
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package engine
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
|
@ -141,6 +142,7 @@ func (e *StorageEngine) reportShardError(
|
|||
if isLogical(err) {
|
||||
e.log.Warn(msg,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.String("logical", strconv.FormatBool(true)),
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
|
|
@ -127,6 +127,9 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
|||
defer func() {
|
||||
// if object is root we continue since information about it
|
||||
// can be presented in other shards
|
||||
e.log.Debug("defer inhumeAddr",
|
||||
zap.Bool("checkExists", checkExists),
|
||||
zap.Bool("root", root), zap.Stringer("shard_id", sh.ID()), zap.Stringer("address", addr))
|
||||
if checkExists && root {
|
||||
stop = false
|
||||
}
|
||||
|
@ -136,6 +139,10 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
|||
existPrm.SetAddress(addr)
|
||||
exRes, err := sh.Exists(existPrm)
|
||||
if err != nil {
|
||||
e.log.Debug("could not check for presents in shard",
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Stringer("address", addr),
|
||||
zap.String("error", err.Error()))
|
||||
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||
// inhumed once - no need to be inhumed again
|
||||
ok = true
|
||||
|
@ -156,6 +163,10 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
|
|||
|
||||
_, err := sh.Inhume(ctx, prm)
|
||||
if err != nil {
|
||||
e.log.Debug("could not inhume object in shard full",
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Stringer("address", addr),
|
||||
zap.String("error", err.Error()))
|
||||
switch {
|
||||
case errors.As(err, &errLocked):
|
||||
retErr = apistatus.ObjectLocked{}
|
||||
|
@ -209,6 +220,7 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []me
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
e.log.Info("interrupt processing the expired tombstones by context")
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
@ -222,7 +234,8 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, l
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
e.log.Info("interrupt processing the expired locks", zap.Error(ctx.Err()))
|
||||
e.log.Info("interrupt processing the expired locks", zap.Error(ctx.Err()),
|
||||
zap.Stringer("sh", sh.ID()))
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
@ -236,7 +249,8 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
e.log.Info("interrupt processing the deleted locks", zap.Error(ctx.Err()))
|
||||
e.log.Info("interrupt processing the deleted locks", zap.Error(ctx.Err()),
|
||||
zap.Stringer("sh", sh.ID()))
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
|
|
@ -132,15 +132,19 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool
|
|||
|
||||
_, err = sh.Put(putPrm)
|
||||
if err != nil {
|
||||
e.log.Error("could not put object to shard full",
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
zap.Stringer("address", object.AddressOf(obj)),
|
||||
)
|
||||
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
|
||||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
||||
e.log.Warn("could not put object to shard",
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.String("error", err.Error()))
|
||||
zap.Stringer("shard_id", sh.ID()))
|
||||
return
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not put object to shard", err)
|
||||
e.reportShardError(sh, "could not put object to shard", err, zap.Stringer("address", object.AddressOf(obj)))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// SelectPrm groups the parameters of Select operation.
|
||||
|
@ -62,6 +63,8 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) {
|
|||
shPrm.SetFilters(prm.filters)
|
||||
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
e.log.Debug("checking shard",
|
||||
zap.Stringer("sh", sh.ID()))
|
||||
res, err := sh.Select(shPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not select objects from shard", err)
|
||||
|
@ -69,6 +72,9 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) {
|
|||
}
|
||||
|
||||
for _, addr := range res.AddressList() { // save only unique values
|
||||
e.log.Debug("found addr",
|
||||
zap.Stringer("sh", sh.ID()),
|
||||
zap.Stringer("addr", addr))
|
||||
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
|
||||
uniqueMap[addr.EncodeToString()] = struct{}{}
|
||||
addrList = append(addrList, addr)
|
||||
|
|
|
@ -7,10 +7,12 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// DeletePrm groups the parameters of Delete operation.
|
||||
|
@ -226,8 +228,14 @@ func (db *DB) deleteObject(
|
|||
obj *objectSDK.Object,
|
||||
isParent bool,
|
||||
) error {
|
||||
err := delUniqueIndexes(tx, obj, isParent)
|
||||
err := delUniqueIndexes(db.log, tx, obj, isParent)
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
db.log.Debug("unable to delete from named bucket",
|
||||
zap.String("error", err.Error()),
|
||||
zap.Stringer("type", obj.Type()),
|
||||
zap.Stringer("addr", object.AddressOf(obj)))
|
||||
}
|
||||
return fmt.Errorf("can't remove unique indexes")
|
||||
}
|
||||
|
||||
|
@ -261,11 +269,12 @@ func parentLength(tx *bbolt.Tx, addr oid.Address) int {
|
|||
return len(lst)
|
||||
}
|
||||
|
||||
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
|
||||
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt := tx.Bucket(item.name)
|
||||
if bkt != nil {
|
||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
||||
return bkt.Delete(item.key) // ignore error, best effort there
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
|
@ -320,7 +329,8 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
||||
func delUniqueIndexes(log *logger.Logger, tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
||||
var err error
|
||||
addr := object.AddressOf(obj)
|
||||
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
|
@ -340,32 +350,59 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
|||
case objectSDK.TypeLock:
|
||||
bucketName = bucketNameLockers(cnr, bucketName)
|
||||
default:
|
||||
log.Debug("del uniq index - unknown object type")
|
||||
return ErrUnknownObjectType
|
||||
}
|
||||
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
err = delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: bucketName,
|
||||
key: objKey,
|
||||
})
|
||||
if err != nil {
|
||||
log.Debug("unable to delete from named bucket",
|
||||
zap.String("error", err.Error()),
|
||||
zap.Stringer("type", obj.Type()),
|
||||
zap.Stringer("addr", addr))
|
||||
}
|
||||
} else {
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
err = delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: parentBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
if err != nil {
|
||||
log.Debug("unable to delete from parent bucket",
|
||||
zap.String("error", err.Error()),
|
||||
zap.Stringer("addr", addr))
|
||||
}
|
||||
}
|
||||
|
||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
|
||||
err = delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
|
||||
name: smallBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
|
||||
if err != nil {
|
||||
log.Debug("unable to delete from storage id index",
|
||||
zap.String("error", err.Error()),
|
||||
zap.Stringer("addr", addr))
|
||||
}
|
||||
err = delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
|
||||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from ToMoveIt index
|
||||
if err != nil {
|
||||
log.Debug("unable to delete from root index",
|
||||
zap.String("error", err.Error()),
|
||||
zap.Stringer("addr", addr))
|
||||
}
|
||||
err = delUniqueIndexItem(tx, namedBucketItem{ // remove from ToMoveIt index
|
||||
name: toMoveItBucketName,
|
||||
key: addrKey,
|
||||
})
|
||||
if err != nil {
|
||||
log.Debug("unable to delete from tomoveit bucket",
|
||||
zap.String("error", err.Error()),
|
||||
zap.Stringer("addr", addr))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strings"
|
||||
|
||||
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -96,10 +97,12 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, c
|
|||
|
||||
if len(group.fastFilters) == 0 {
|
||||
expLen = 1
|
||||
|
||||
db.log.Debug("select all")
|
||||
db.selectAll(tx, cnr, mAddr)
|
||||
} else {
|
||||
db.log.Debug("select fastFilters")
|
||||
for i := range group.fastFilters {
|
||||
db.log.Debug("fastFilter = " + group.fastFilters[i].Header())
|
||||
db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i)
|
||||
}
|
||||
}
|
||||
|
@ -138,22 +141,34 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, c
|
|||
// selectAll adds to resulting cache all available objects in metabase.
|
||||
func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0)
|
||||
db.log.Debug("select from primary")
|
||||
selectAllFromBucket(db.log, tx, primaryBucketName(cnr, bucketName), to, 0)
|
||||
db.log.Debug("select from tombstone")
|
||||
selectAllFromBucket(db.log, tx, tombstoneBucketName(cnr, bucketName), to, 0)
|
||||
db.log.Debug("select from group")
|
||||
selectAllFromBucket(db.log, tx, storageGroupBucketName(cnr, bucketName), to, 0)
|
||||
db.log.Debug("select from parent")
|
||||
selectAllFromBucket(db.log, tx, parentBucketName(cnr, bucketName), to, 0)
|
||||
db.log.Debug("select from primary")
|
||||
selectAllFromBucket(db.log, tx, bucketNameLockers(cnr, bucketName), to, 0)
|
||||
}
|
||||
|
||||
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
||||
// resulting cache. Keys should be stringed object ids.
|
||||
func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
|
||||
func selectAllFromBucket(log *logger.Logger, tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
|
||||
bkt := tx.Bucket(name)
|
||||
if bkt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
_ = bkt.ForEach(func(k, v []byte) error {
|
||||
var id oid.ID
|
||||
err := id.Decode(k)
|
||||
if err == nil {
|
||||
log.Debug("selectAllFromBucket = " + id.String())
|
||||
} else {
|
||||
log.Debug("error decode")
|
||||
}
|
||||
markAddressInCache(to, fNum, string(k))
|
||||
|
||||
return nil
|
||||
|
@ -182,7 +197,7 @@ func (db *DB) selectFastFilter(
|
|||
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||
case v2object.FilterHeaderObjectType:
|
||||
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
||||
selectAllFromBucket(tx, bucketName, to, fNum)
|
||||
selectAllFromBucket(db.log, tx, bucketName, to, fNum)
|
||||
}
|
||||
case v2object.FilterHeaderParent:
|
||||
bucketName := parentBucketName(cnr, bucketName)
|
||||
|
@ -191,12 +206,12 @@ func (db *DB) selectFastFilter(
|
|||
bucketName := splitBucketName(cnr, bucketName)
|
||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||
case v2object.FilterPropertyRoot:
|
||||
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(db.log, tx, rootBucketName(cnr, bucketName), to, fNum)
|
||||
case v2object.FilterPropertyPhy:
|
||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(db.log, tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(db.log, tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(db.log, tx, storageGroupBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(db.log, tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
||||
default: // user attribute
|
||||
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
|
||||
|
||||
|
|
|
@ -102,6 +102,7 @@ func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) {
|
|||
if err != nil {
|
||||
s.log.Debug("can't remove object from blobStor",
|
||||
zap.Stringer("object_address", prm.addr[i]),
|
||||
zap.Stringer("my_shard", s.ID()),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,7 +59,9 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
|||
if err != nil || !tryCache {
|
||||
if err != nil {
|
||||
s.log.Debug("can't put object to the write-cache, trying blobstor",
|
||||
zap.String("err", err.Error()))
|
||||
zap.String("err", err.Error()),
|
||||
zap.Stringer("address", putPrm.Address),
|
||||
)
|
||||
}
|
||||
|
||||
res, err = s.blobStor.Put(putPrm)
|
||||
|
|
|
@ -266,6 +266,9 @@ func (c *cache) flushObject(obj *object.Object, data []byte) error {
|
|||
|
||||
_, err = c.metabase.UpdateStorageID(updPrm)
|
||||
if err != nil {
|
||||
c.log.Debug("can't update object storage ID full",
|
||||
zap.String("address", addr.String()),
|
||||
zap.Error(err))
|
||||
c.reportFlushError("can't update object storage ID",
|
||||
addr.EncodeToString(), err)
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ func (s *Service) Delete(ctx context.Context, prm Prm) error {
|
|||
}
|
||||
|
||||
func (exec *execCtx) execute(ctx context.Context) {
|
||||
exec.log.Debug("serving request...")
|
||||
exec.log.Debug("serving delete request...")
|
||||
|
||||
// perform local operation
|
||||
exec.executeLocal(ctx)
|
||||
|
|
|
@ -2,6 +2,7 @@ package deletesvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -40,6 +41,8 @@ func (exec *execCtx) formTombstone(ctx context.Context) (ok bool) {
|
|||
)
|
||||
exec.addMembers([]oid.ID{exec.address().Object()})
|
||||
|
||||
exec.log.Debug("tombstone lifetime " + strconv.FormatUint(exec.tombstone.ExpirationEpoch(), 10))
|
||||
exec.log.Debug("CurrentEpoch " + strconv.FormatUint(exec.svc.netInfo.CurrentEpoch(), 10))
|
||||
exec.log.Debug("forming split info...")
|
||||
|
||||
ok = exec.formSplitInfo(ctx)
|
||||
|
|
|
@ -198,7 +198,10 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) (*transformer.
|
|||
if t.traversal.submitPrimaryPlacementFinish() {
|
||||
_, err = t.iteratePlacement(ctx)
|
||||
if err != nil {
|
||||
t.log.Error("additional container broadcast failure", zap.Error(err))
|
||||
t.log.Error("additional container broadcast failure",
|
||||
zap.Error(err),
|
||||
zap.String("address", object.AddressOf(t.obj).String()),
|
||||
)
|
||||
// we don't fail primary operation because of broadcast failure
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,6 +109,11 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
|||
|
||||
mtx.Lock()
|
||||
exec.writeIDList(ids)
|
||||
for _, id := range ids {
|
||||
exec.log.Debug("found id on node "+string(info.PublicKey()),
|
||||
zap.String("id", id.String()),
|
||||
)
|
||||
}
|
||||
mtx.Unlock()
|
||||
}(i)
|
||||
}
|
||||
|
|
|
@ -18,5 +18,12 @@ func (exec *execCtx) executeLocal() {
|
|||
return
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
exec.log.Debug("found id",
|
||||
zap.String("id", id.String()),
|
||||
zap.String("for", exec.containerID().String()),
|
||||
)
|
||||
}
|
||||
|
||||
exec.writeIDList(ids)
|
||||
}
|
||||
|
|
|
@ -129,6 +129,8 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
|||
if !c.needLocalCopy && c.removeLocalCopy {
|
||||
p.log.Info("redundant local object copy detected",
|
||||
zap.Stringer("object", addr),
|
||||
zap.Bool("c.needLocalCopy", c.needLocalCopy),
|
||||
zap.Bool("c.removeLocalCopy", c.removeLocalCopy),
|
||||
)
|
||||
|
||||
p.cbRedundantCopy(ctx, addr)
|
||||
|
|
Loading…
Reference in a new issue