forked from TrueCloudLab/frostfs-node
Compare commits
7 commits
df05057ed4
...
7dfe1ee9c1
Author | SHA1 | Date | |
---|---|---|---|
7dfe1ee9c1 | |||
148d68933b | |||
51ee132ea3 | |||
226dd25dd0 | |||
bd0197eaa8 | |||
e44b84c18c | |||
bed49e6ace |
43 changed files with 137 additions and 170 deletions
|
@ -9,7 +9,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -43,6 +42,8 @@ func initObjectHashCmd() {
|
|||
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||
|
||||
flags.String("range", "", "Range to take hash from in the form offset1:length1,...")
|
||||
_ = objectHashCmd.MarkFlagRequired("range")
|
||||
|
||||
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
|
||||
flags.String(getRangeHashSaltFlag, "", "Salt in hex format")
|
||||
}
|
||||
|
@ -66,36 +67,6 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
|
|||
pk := key.GetOrGenerate(cmd)
|
||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||
|
||||
tz := typ == hashTz
|
||||
fullHash := len(ranges) == 0
|
||||
if fullHash {
|
||||
var headPrm internalclient.HeadObjectPrm
|
||||
headPrm.SetClient(cli)
|
||||
Prepare(cmd, &headPrm)
|
||||
headPrm.SetAddress(objAddr)
|
||||
|
||||
// get hash of full payload through HEAD (may be user can do it through dedicated command?)
|
||||
res, err := internalclient.HeadObject(cmd.Context(), headPrm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
var cs checksum.Checksum
|
||||
var csSet bool
|
||||
|
||||
if tz {
|
||||
cs, csSet = res.Header().PayloadHomomorphicHash()
|
||||
} else {
|
||||
cs, csSet = res.Header().PayloadChecksum()
|
||||
}
|
||||
|
||||
if csSet {
|
||||
cmd.Println(hex.EncodeToString(cs.Value()))
|
||||
} else {
|
||||
cmd.Println("Missing checksum in object header.")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var hashPrm internalclient.HashPayloadRangesPrm
|
||||
hashPrm.SetClient(cli)
|
||||
Prepare(cmd, &hashPrm)
|
||||
|
@ -104,7 +75,7 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
|
|||
hashPrm.SetSalt(salt)
|
||||
hashPrm.SetRanges(ranges)
|
||||
|
||||
if tz {
|
||||
if typ == hashTz {
|
||||
hashPrm.TZ()
|
||||
}
|
||||
|
||||
|
|
|
@ -1220,9 +1220,9 @@ func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
|
|||
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
|
||||
// with the binary-encoded information from the current node's configuration.
|
||||
// The state is set using the provided setter which MUST NOT be nil.
|
||||
func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.NodeInfo)) error {
|
||||
func (c *cfg) bootstrapWithState(ctx context.Context, state netmap.NodeState) error {
|
||||
ni := c.cfgNodeInfo.localInfo
|
||||
stateSetter(&ni)
|
||||
ni.SetStatus(state)
|
||||
|
||||
prm := nmClient.AddPeerPrm{}
|
||||
prm.SetNodeInfo(ni)
|
||||
|
@ -1232,9 +1232,7 @@ func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.N
|
|||
|
||||
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
|
||||
func bootstrapOnline(ctx context.Context, c *cfg) error {
|
||||
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
||||
ni.SetStatus(netmap.Online)
|
||||
})
|
||||
return c.bootstrapWithState(ctx, netmap.Online)
|
||||
}
|
||||
|
||||
// bootstrap calls bootstrapWithState with:
|
||||
|
@ -1245,9 +1243,7 @@ func (c *cfg) bootstrap(ctx context.Context) error {
|
|||
st := c.cfgNetmap.state.controlNetmapStatus()
|
||||
if st == control.NetmapStatus_MAINTENANCE {
|
||||
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
|
||||
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
||||
ni.SetStatus(netmap.Maintenance)
|
||||
})
|
||||
return c.bootstrapWithState(ctx, netmap.Maintenance)
|
||||
}
|
||||
|
||||
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
|
||||
|
|
|
@ -12,7 +12,7 @@ type ApplicationInfo struct {
|
|||
func NewApplicationInfo(version string) *ApplicationInfo {
|
||||
appInfo := &ApplicationInfo{
|
||||
versionValue: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "app_info",
|
||||
Name: "frostfs_node_app_info",
|
||||
Help: "General information about the application.",
|
||||
}, []string{"version"}),
|
||||
}
|
||||
|
|
|
@ -129,7 +129,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
|
|||
})
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't determine DB size: %w", err)
|
||||
return fmt.Errorf("determine DB size: %w", err)
|
||||
}
|
||||
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
|
||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||
|
@ -140,7 +140,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
|
|||
return saveItemsCount(tx, items)
|
||||
}); err != nil {
|
||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err)
|
||||
return fmt.Errorf("save blobovnicza's size and items count: %w", err)
|
||||
}
|
||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||
}
|
||||
|
|
|
@ -146,7 +146,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
|
|||
if prm.ignoreErrors {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("could not decode address key: %w", err)
|
||||
return fmt.Errorf("decode address key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -115,13 +115,13 @@ func (b *Blobovniczas) getObject(ctx context.Context, blz *blobovnicza.Blobovnic
|
|||
// decompress the data
|
||||
data, err := b.compression.Decompress(res.Object())
|
||||
if err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||
}
|
||||
|
||||
// unmarshal the object
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
return common.GetRes{Object: obj, RawData: data}, nil
|
||||
|
|
|
@ -130,13 +130,13 @@ func (b *Blobovniczas) getObjectRange(ctx context.Context, blz *blobovnicza.Blob
|
|||
// decompress the data
|
||||
data, err := b.compression.Decompress(res.Object())
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
return common.GetRangeRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||
}
|
||||
|
||||
// unmarshal the object
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
return common.GetRangeRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
from := prm.Range.GetOffset()
|
||||
|
|
|
@ -49,7 +49,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
|||
zap.String("root_path", b.rootPath))
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("could not decompress object data: %w", err)
|
||||
return fmt.Errorf("decompress object data: %w", err)
|
||||
}
|
||||
|
||||
if prm.Handler != nil {
|
||||
|
@ -82,7 +82,7 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
|
|||
zap.String("root_path", b.rootPath))
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||
return false, fmt.Errorf("open blobovnicza %s: %w", p, err)
|
||||
}
|
||||
defer shBlz.Close(ctx)
|
||||
|
||||
|
|
|
@ -69,10 +69,10 @@ func (b *sharedDB) Open(ctx context.Context) (*blobovnicza.Blobovnicza, error) {
|
|||
)...)
|
||||
|
||||
if err := blz.Open(ctx); err != nil {
|
||||
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
|
||||
return nil, fmt.Errorf("open blobovnicza %s: %w", b.path, err)
|
||||
}
|
||||
if err := blz.Init(ctx); err != nil {
|
||||
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
|
||||
return nil, fmt.Errorf("init blobovnicza %s: %w", b.path, err)
|
||||
}
|
||||
|
||||
b.refCount++
|
||||
|
@ -127,7 +127,7 @@ func (b *sharedDB) CloseAndRemoveFile(ctx context.Context) error {
|
|||
zap.String("id", b.path),
|
||||
zap.Error(err),
|
||||
)
|
||||
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
|
||||
return fmt.Errorf("close blobovnicza (path = %s): %w", b.path, err)
|
||||
}
|
||||
|
||||
b.refCount = 0
|
||||
|
|
|
@ -538,7 +538,7 @@ func (t *FSTree) countFiles() (uint64, uint64, error) {
|
|||
},
|
||||
)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||
return 0, 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
|
||||
}
|
||||
|
||||
return count, size, nil
|
||||
|
@ -577,7 +577,7 @@ func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
|||
},
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||
return 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
|
||||
}
|
||||
success = true
|
||||
return result, nil
|
||||
|
|
|
@ -47,13 +47,13 @@ func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes,
|
|||
// Decompress the data.
|
||||
var err error
|
||||
if data, err = s.compression.Decompress(data); err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||
}
|
||||
|
||||
// Unmarshal the SDK object.
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
return common.GetRes{Object: obj, RawData: data}, nil
|
||||
|
|
|
@ -27,7 +27,7 @@ func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
|
|||
}
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't set blobstor mode (old=%s, new=%s): %w", b.mode, m, err)
|
||||
return fmt.Errorf("set blobstor mode (old=%s, new=%s): %w", b.mode, m, err)
|
||||
}
|
||||
|
||||
b.mode = m
|
||||
|
|
|
@ -52,7 +52,7 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
|
|||
// marshal object
|
||||
data, err := prm.Object.Marshal()
|
||||
if err != nil {
|
||||
return common.PutRes{}, fmt.Errorf("could not marshal the object: %w", err)
|
||||
return common.PutRes{}, fmt.Errorf("marshal the object: %w", err)
|
||||
}
|
||||
prm.RawData = data
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
|||
err := eg.Wait()
|
||||
close(errCh)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize shards: %w", err)
|
||||
return fmt.Errorf("initialize shards: %w", err)
|
||||
}
|
||||
|
||||
for res := range errCh {
|
||||
|
@ -117,7 +117,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
|||
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
||||
return fmt.Errorf("initialize shard %s: %w", res.id, res.err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -320,7 +320,7 @@ loop:
|
|||
for _, newID := range shardsToAdd {
|
||||
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
||||
return fmt.Errorf("add new shard with '%s' metabase path: %w", newID, err)
|
||||
}
|
||||
|
||||
idStr := sh.ID().String()
|
||||
|
@ -331,13 +331,13 @@ loop:
|
|||
}
|
||||
if err != nil {
|
||||
_ = sh.Close(ctx)
|
||||
return fmt.Errorf("could not init %s shard: %w", idStr, err)
|
||||
return fmt.Errorf("init %s shard: %w", idStr, err)
|
||||
}
|
||||
|
||||
err = e.addShard(sh)
|
||||
if err != nil {
|
||||
_ = sh.Close(ctx)
|
||||
return fmt.Errorf("could not add %s shard: %w", idStr, err)
|
||||
return fmt.Errorf("add %s shard: %w", idStr, err)
|
||||
}
|
||||
|
||||
e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr))
|
||||
|
|
|
@ -578,7 +578,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
|||
|
||||
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
|
||||
if prm.TreeHandler == nil {
|
||||
return false, "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
||||
return false, "", fmt.Errorf("evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
||||
}
|
||||
|
||||
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
||||
|
|
|
@ -108,12 +108,12 @@ func (m *metricsWithID) SetEvacuationInProgress(value bool) {
|
|||
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
||||
sh, err := e.createShard(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a shard: %w", err)
|
||||
return nil, fmt.Errorf("create a shard: %w", err)
|
||||
}
|
||||
|
||||
err = e.addShard(sh)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err)
|
||||
return nil, fmt.Errorf("add %s shard: %w", sh.ID().String(), err)
|
||||
}
|
||||
|
||||
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
|
||||
|
@ -124,7 +124,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
|
|||
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||
id, err := generateShardID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||
return nil, fmt.Errorf("generate shard ID: %w", err)
|
||||
}
|
||||
|
||||
opts = e.appendMetrics(id, opts)
|
||||
|
@ -180,7 +180,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
|||
|
||||
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create pool: %w", err)
|
||||
return fmt.Errorf("create pool: %w", err)
|
||||
}
|
||||
|
||||
strID := sh.ID().String()
|
||||
|
@ -374,7 +374,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
|
|||
zap.Error(err),
|
||||
)
|
||||
multiErrGuard.Lock()
|
||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
||||
multiErr = errors.Join(multiErr, fmt.Errorf("change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
||||
multiErrGuard.Unlock()
|
||||
}
|
||||
|
||||
|
@ -385,7 +385,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
|
|||
zap.Error(err),
|
||||
)
|
||||
multiErrGuard.Lock()
|
||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not close removed shard (id:%s): %w", sh.ID(), err))
|
||||
multiErr = errors.Join(multiErr, fmt.Errorf("close removed shard (id:%s): %w", sh.ID(), err))
|
||||
multiErrGuard.Unlock()
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -54,7 +54,7 @@ func (db *DB) Open(ctx context.Context, m mode.Mode) error {
|
|||
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
|
||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
||||
return fmt.Errorf("create dir %s for metabase: %w", db.info.Path, err)
|
||||
}
|
||||
|
||||
db.log.Debug(ctx, logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
|
||||
|
@ -73,7 +73,7 @@ func (db *DB) openBolt(ctx context.Context) error {
|
|||
|
||||
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open boltDB database: %w", err)
|
||||
return fmt.Errorf("open boltDB database: %w", err)
|
||||
}
|
||||
db.boltDB.MaxBatchDelay = db.boltBatchDelay
|
||||
db.boltDB.MaxBatchSize = db.boltBatchSize
|
||||
|
@ -145,27 +145,27 @@ func (db *DB) init(reset bool) error {
|
|||
if reset {
|
||||
err := tx.DeleteBucket(name)
|
||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("could not delete static bucket %s: %w", k, err)
|
||||
return fmt.Errorf("delete static bucket %s: %w", k, err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err := tx.CreateBucketIfNotExists(name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create static bucket %s: %w", k, err)
|
||||
return fmt.Errorf("create static bucket %s: %w", k, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, b := range deprecatedBuckets {
|
||||
err := tx.DeleteBucket(b)
|
||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err)
|
||||
return fmt.Errorf("delete deprecated bucket %s: %w", string(b), err)
|
||||
}
|
||||
}
|
||||
|
||||
if !reset { // counters will be recalculated by refill metabase
|
||||
err = syncCounter(tx, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not sync object counter: %w", err)
|
||||
return fmt.Errorf("sync object counter: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -238,14 +238,14 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
|||
}
|
||||
|
||||
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||
return fmt.Errorf("increase phy object counter: %w", err)
|
||||
}
|
||||
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||
return fmt.Errorf("increase logical object counter: %w", err)
|
||||
}
|
||||
if isUserObject {
|
||||
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase user object counter: %w", err)
|
||||
return fmt.Errorf("increase user object counter: %w", err)
|
||||
}
|
||||
}
|
||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||
|
@ -362,7 +362,7 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject
|
|||
func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||
shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
||||
return fmt.Errorf("get shard info bucket: %w", err)
|
||||
}
|
||||
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
|
||||
len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
|
||||
|
@ -375,7 +375,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
|
||||
containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get container counter bucket: %w", err)
|
||||
return fmt.Errorf("get container counter bucket: %w", err)
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
|
@ -428,7 +428,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not iterate objects: %w", err)
|
||||
return fmt.Errorf("iterate objects: %w", err)
|
||||
}
|
||||
|
||||
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
||||
|
@ -448,7 +448,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
|||
value := containerCounterValue(count)
|
||||
err := containerCounterB.Put(key, value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update phy container object counter: %w", err)
|
||||
return fmt.Errorf("update phy container object counter: %w", err)
|
||||
}
|
||||
}
|
||||
phyData := make([]byte, 8)
|
||||
|
@ -456,7 +456,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
|||
|
||||
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update phy object counter: %w", err)
|
||||
return fmt.Errorf("update phy object counter: %w", err)
|
||||
}
|
||||
|
||||
logData := make([]byte, 8)
|
||||
|
@ -464,7 +464,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
|||
|
||||
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
||||
return fmt.Errorf("update logic object counter: %w", err)
|
||||
}
|
||||
|
||||
userData := make([]byte, 8)
|
||||
|
@ -472,7 +472,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
|||
|
||||
err = shardInfoB.Put(objectUserCounterKey, userData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update user object counter: %w", err)
|
||||
return fmt.Errorf("update user object counter: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -492,7 +492,7 @@ func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
|||
}
|
||||
var cnrID cid.ID
|
||||
if err := cnrID.Decode(buf); err != nil {
|
||||
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
return cid.ID{}, fmt.Errorf("decode container ID: %w", err)
|
||||
}
|
||||
return cnrID, nil
|
||||
}
|
||||
|
|
|
@ -163,26 +163,26 @@ func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
|||
if res.phyCount > 0 {
|
||||
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not decrease phy object counter: %w", err)
|
||||
return fmt.Errorf("decrease phy object counter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if res.logicCount > 0 {
|
||||
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not decrease logical object counter: %w", err)
|
||||
return fmt.Errorf("decrease logical object counter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if res.userCount > 0 {
|
||||
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not decrease user object counter: %w", err)
|
||||
return fmt.Errorf("decrease user object counter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
||||
return fmt.Errorf("could not decrease container object counter: %w", err)
|
||||
return fmt.Errorf("decrease container object counter: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -259,7 +259,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
if garbageBKT != nil {
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||
return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
return deleteSingleResult{}, nil
|
||||
|
@ -280,7 +280,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
if garbageBKT != nil {
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||
return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -308,7 +308,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
// remove object
|
||||
err = db.deleteObject(tx, obj, false)
|
||||
if err != nil {
|
||||
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
|
||||
return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
|
||||
}
|
||||
|
||||
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
||||
|
@ -335,12 +335,12 @@ func (db *DB) deleteObject(
|
|||
|
||||
err = updateListIndexes(tx, obj, delListIndexItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't remove list indexes: %w", err)
|
||||
return fmt.Errorf("remove list indexes: %w", err)
|
||||
}
|
||||
|
||||
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
||||
return fmt.Errorf("remove fake bucket tree indexes: %w", err)
|
||||
}
|
||||
|
||||
if isParent {
|
||||
|
@ -351,7 +351,7 @@ func (db *DB) deleteObject(
|
|||
addrKey := addressKey(object.AddressOf(obj), key)
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||
return fmt.Errorf("remove from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -529,7 +529,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
|||
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
||||
return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -567,7 +567,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
|||
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
|
||||
err := garbageBKT.Delete(addrKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
||||
return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, e
|
|||
|
||||
err := splitInfo.Unmarshal(bytes.Clone(rawSplitInfo))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
||||
return nil, fmt.Errorf("unmarshal split info from root index: %w", err)
|
||||
}
|
||||
|
||||
return splitInfo, nil
|
||||
|
|
|
@ -187,7 +187,7 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
|||
|
||||
err = child.Unmarshal(bytes.Clone(data))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't unmarshal child with parent: %w", err)
|
||||
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
|
||||
}
|
||||
|
||||
par := child.Parent()
|
||||
|
|
|
@ -177,7 +177,7 @@ type gcHandler struct {
|
|||
func (g gcHandler) handleKV(k, _ []byte) error {
|
||||
o, err := garbageFromKV(k)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse garbage object: %w", err)
|
||||
return fmt.Errorf("parse garbage object: %w", err)
|
||||
}
|
||||
|
||||
return g.h(o)
|
||||
|
@ -190,7 +190,7 @@ type graveyardHandler struct {
|
|||
func (g graveyardHandler) handleKV(k, v []byte) error {
|
||||
o, err := graveFromKV(k, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse grave: %w", err)
|
||||
return fmt.Errorf("parse grave: %w", err)
|
||||
}
|
||||
|
||||
return g.h(o)
|
||||
|
@ -240,7 +240,7 @@ func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *oid.Address)
|
|||
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
||||
err = decodeAddressFromKey(&res.addr, k)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not parse address: %w", err)
|
||||
err = fmt.Errorf("parse address: %w", err)
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
@ -373,7 +373,7 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
|
|||
if data != nil {
|
||||
err := targetBucket.Delete(tombKey)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
||||
return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
|||
|
||||
if !db.mode.NoMetabase() {
|
||||
if err := db.Close(ctx); err != nil {
|
||||
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||
return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
|||
err = db.Init(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||
return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -180,18 +180,18 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
|||
|
||||
err := putUniqueIndexes(tx, obj, si, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't put unique indexes: %w", err)
|
||||
return fmt.Errorf("put unique indexes: %w", err)
|
||||
}
|
||||
|
||||
err = updateListIndexes(tx, obj, putListIndexItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't put list indexes: %w", err)
|
||||
return fmt.Errorf("put list indexes: %w", err)
|
||||
}
|
||||
|
||||
if indexAttributes {
|
||||
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
||||
return fmt.Errorf("put fake bucket tree indexes: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -250,7 +250,7 @@ func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, ad
|
|||
}
|
||||
rawObject, err := obj.CutPayload().Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't marshal object header: %w", err)
|
||||
return fmt.Errorf("marshal object header: %w", err)
|
||||
}
|
||||
return putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: bucketName,
|
||||
|
@ -475,7 +475,7 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
|
|||
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
data, err := update(bkt.Get(item.key), item.val)
|
||||
|
@ -492,12 +492,12 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
||||
return fmt.Errorf("create fake bucket tree index %v: %w", item.key, err)
|
||||
}
|
||||
|
||||
return fkbtRoot.Put(item.val, zeroValue)
|
||||
|
@ -506,19 +506,19 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
lst, err := decodeList(bkt.Get(item.key))
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
|
||||
return fmt.Errorf("decode leaf list %v: %w", item.key, err)
|
||||
}
|
||||
|
||||
lst = append(lst, item.val)
|
||||
|
||||
encodedLst, err := encodeList(lst)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
|
||||
return fmt.Errorf("encode leaf list %v: %w", item.key, err)
|
||||
}
|
||||
|
||||
return bkt.Put(item.key, encodedLst)
|
||||
|
|
|
@ -565,7 +565,7 @@ func groupFilters(filters objectSDK.SearchFilters, useAttributeIndex bool) (filt
|
|||
case v2object.FilterHeaderContainerID: // support deprecated field
|
||||
err := res.cnr.DecodeString(filters[i].Value())
|
||||
if err != nil {
|
||||
return filterGroup{}, fmt.Errorf("can't parse container id: %w", err)
|
||||
return filterGroup{}, fmt.Errorf("parse container id: %w", err)
|
||||
}
|
||||
|
||||
res.withCnrFilter = true
|
||||
|
|
|
@ -32,13 +32,13 @@ func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error
|
|||
}
|
||||
|
||||
if err := db.openDB(ctx, mode); err != nil {
|
||||
return nil, fmt.Errorf("failed to open metabase: %w", err)
|
||||
return nil, fmt.Errorf("open metabase: %w", err)
|
||||
}
|
||||
|
||||
id, err := db.readShardID()
|
||||
|
||||
if cErr := db.close(); cErr != nil {
|
||||
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
||||
err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
|
||||
}
|
||||
|
||||
return id, metaerr.Wrap(err)
|
||||
|
@ -70,7 +70,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
|
|||
}
|
||||
|
||||
if err := db.openDB(ctx, mode); err != nil {
|
||||
return fmt.Errorf("failed to open metabase: %w", err)
|
||||
return fmt.Errorf("open metabase: %w", err)
|
||||
}
|
||||
|
||||
err := db.writeShardID(id)
|
||||
|
@ -79,7 +79,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
|
|||
}
|
||||
|
||||
if cErr := db.close(); cErr != nil {
|
||||
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
||||
err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
|
||||
}
|
||||
|
||||
return metaerr.Wrap(err)
|
||||
|
|
|
@ -95,7 +95,7 @@ func compactDB(db *bbolt.DB) error {
|
|||
NoSync: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open new metabase to compact: %w", err)
|
||||
return fmt.Errorf("open new metabase to compact: %w", err)
|
||||
}
|
||||
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
|
||||
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
||||
|
@ -292,7 +292,7 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i
|
|||
}
|
||||
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
||||
return fmt.Errorf("parse expiration epoch: %w", err)
|
||||
}
|
||||
expirationEpochBucket := b.Bucket(attrValue)
|
||||
attrKeyValueC := expirationEpochBucket.Cursor()
|
||||
|
@ -399,7 +399,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
|
|||
for _, key := range keys {
|
||||
attr, ok := attributeFromAttributeBucket(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
||||
return nil, fmt.Errorf("parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
||||
}
|
||||
if !IsAtrributeIndexed(attr) {
|
||||
keysToDrop = append(keysToDrop, key)
|
||||
|
@ -407,7 +407,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
|
|||
}
|
||||
contID, ok := cidFromAttributeBucket(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
||||
return nil, fmt.Errorf("parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
||||
}
|
||||
info, err := cs.Info(contID)
|
||||
if err != nil {
|
||||
|
|
|
@ -231,11 +231,11 @@ func parseExpirationEpochKey(key []byte) (uint64, cid.ID, oid.ID, error) {
|
|||
epoch := binary.BigEndian.Uint64(key)
|
||||
var cnr cid.ID
|
||||
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
|
||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (container ID): %w", err)
|
||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (container ID): %w", err)
|
||||
}
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
|
||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (object ID): %w", err)
|
||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (object ID): %w", err)
|
||||
}
|
||||
return epoch, cnr, obj, nil
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
|
|||
|
||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create auxiliary bucket: %w", err)
|
||||
return fmt.Errorf("create auxiliary bucket: %w", err)
|
||||
}
|
||||
return b.Put(versionKey, data)
|
||||
}
|
||||
|
|
|
@ -106,7 +106,7 @@ func (t *boltForest) SetMode(ctx context.Context, m mode.Mode) error {
|
|||
}
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
|
||||
return fmt.Errorf("set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
|
||||
}
|
||||
|
||||
t.mode = m
|
||||
|
@ -128,7 +128,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
|
|||
readOnly := m.ReadOnly()
|
||||
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
|
||||
return metaerr.Wrap(fmt.Errorf("create dir %s for the pilorama: %w", t.path, err))
|
||||
}
|
||||
|
||||
opts := *bbolt.DefaultOptions
|
||||
|
@ -139,7 +139,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
|
|||
|
||||
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(fmt.Errorf("can't open the pilorama DB: %w", err))
|
||||
return metaerr.Wrap(fmt.Errorf("open the pilorama DB: %w", err))
|
||||
}
|
||||
|
||||
t.db.MaxBatchSize = t.maxBatchSize
|
||||
|
@ -1360,7 +1360,7 @@ func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, err
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err))
|
||||
return nil, metaerr.Wrap(fmt.Errorf("list trees: %w", err))
|
||||
}
|
||||
success = true
|
||||
return ids, nil
|
||||
|
@ -1504,7 +1504,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
|
|||
|
||||
var contID cidSDK.ID
|
||||
if err := contID.Decode(k[:32]); err != nil {
|
||||
return fmt.Errorf("failed to decode containerID: %w", err)
|
||||
return fmt.Errorf("decode container ID: %w", err)
|
||||
}
|
||||
res.Items = append(res.Items, ContainerIDTreeID{
|
||||
CID: contID,
|
||||
|
|
|
@ -36,7 +36,7 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
|||
|
||||
size, err := s.metaBase.ContainerSize(prm.cnr)
|
||||
if err != nil {
|
||||
return ContainerSizeRes{}, fmt.Errorf("could not get container size: %w", err)
|
||||
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
|
||||
}
|
||||
|
||||
return ContainerSizeRes{
|
||||
|
@ -71,7 +71,7 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
|
|||
|
||||
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
||||
if err != nil {
|
||||
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err)
|
||||
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
|
||||
}
|
||||
|
||||
return ContainerCountRes{
|
||||
|
|
|
@ -38,7 +38,7 @@ func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err err
|
|||
|
||||
err = s.SetMode(ctx, mode.DegradedReadOnly)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
||||
return fmt.Errorf("switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
|||
for j := i + 1; j < len(components); j++ {
|
||||
if err := components[j].Open(ctx, m); err != nil {
|
||||
// Other components must be opened, fail.
|
||||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
||||
return fmt.Errorf("open %T: %w", components[j], err)
|
||||
}
|
||||
}
|
||||
err = s.handleMetabaseFailure(ctx, "open", err)
|
||||
|
@ -83,7 +83,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
|||
break
|
||||
}
|
||||
|
||||
return fmt.Errorf("could not open %T: %w", component, err)
|
||||
return fmt.Errorf("open %T: %w", component, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -184,7 +184,7 @@ func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
|
|||
break
|
||||
}
|
||||
|
||||
return fmt.Errorf("could not initialize %T: %w", component, err)
|
||||
return fmt.Errorf("initialize %T: %w", component, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -205,7 +205,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
|||
|
||||
err := s.metaBase.Reset()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not reset metabase: %w", err)
|
||||
return fmt.Errorf("reset metabase: %w", err)
|
||||
}
|
||||
|
||||
withCount := true
|
||||
|
@ -254,12 +254,12 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
|||
|
||||
err = errors.Join(egErr, itErr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not put objects to the meta: %w", err)
|
||||
return fmt.Errorf("put objects to the meta: %w", err)
|
||||
}
|
||||
|
||||
err = s.metaBase.SyncCounters()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not sync object counters: %w", err)
|
||||
return fmt.Errorf("sync object counters: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
|
@ -318,7 +318,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
|
|||
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
var lock objectSDK.Lock
|
||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
||||
return fmt.Errorf("unmarshal lock content: %w", err)
|
||||
}
|
||||
|
||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||
|
@ -328,7 +328,7 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
|||
id, _ := obj.ID()
|
||||
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock objects: %w", err)
|
||||
return fmt.Errorf("lock objects: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -337,7 +337,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
|||
tombstone := objectSDK.NewTombstone()
|
||||
|
||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
||||
return fmt.Errorf("unmarshal tombstone content: %w", err)
|
||||
}
|
||||
|
||||
tombAddr := object.AddressOf(obj)
|
||||
|
@ -358,7 +358,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
|||
|
||||
_, err := s.metaBase.Inhume(ctx, inhumePrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not inhume objects: %w", err)
|
||||
return fmt.Errorf("inhume objects: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -175,7 +175,7 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
|
|||
|
||||
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||
return nil, true, fmt.Errorf("fetch blobovnicza id from metabase: %w", err)
|
||||
}
|
||||
|
||||
storageID := mExRes.StorageID()
|
||||
|
|
|
@ -36,7 +36,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
|||
modeDegraded := s.GetMode().NoMetabase()
|
||||
if !modeDegraded {
|
||||
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
|
||||
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
||||
err = fmt.Errorf("read shard id from metabase: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
|||
|
||||
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
||||
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
|
||||
err = errors.Join(err, fmt.Errorf("write shard id to metabase: %w", setErr))
|
||||
}
|
||||
}
|
||||
return
|
||||
|
|
|
@ -109,7 +109,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
|
|||
|
||||
lst, err := s.metaBase.Containers(ctx)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("can't list stored containers: %w", err)
|
||||
return res, fmt.Errorf("list stored containers: %w", err)
|
||||
}
|
||||
|
||||
filters := objectSDK.NewSearchFilters()
|
||||
|
@ -149,7 +149,7 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
|
|||
|
||||
containers, err := s.metaBase.Containers(ctx)
|
||||
if err != nil {
|
||||
return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err)
|
||||
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
|
||||
}
|
||||
|
||||
return ListContainersRes{
|
||||
|
@ -180,7 +180,7 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
|||
metaPrm.SetCursor(prm.cursor)
|
||||
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err)
|
||||
return ListWithCursorRes{}, fmt.Errorf("get list of objects: %w", err)
|
||||
}
|
||||
|
||||
return ListWithCursorRes{
|
||||
|
@ -208,7 +208,7 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
|
|||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not iterate over containers: %w", err)
|
||||
return fmt.Errorf("iterate over containers: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -235,7 +235,7 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
|||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not iterate over objects: %w", err)
|
||||
return fmt.Errorf("iterate over objects: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -258,7 +258,7 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
|
|||
metaPrm.ContainerID = prm.ContainerID
|
||||
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
||||
return 0, fmt.Errorf("count alive objects in bucket: %w", err)
|
||||
}
|
||||
|
||||
return count, nil
|
||||
|
|
|
@ -81,7 +81,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
|
||||
res, err = s.blobStor.Put(ctx, putPrm)
|
||||
if err != nil {
|
||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||
return PutRes{}, fmt.Errorf("put object to BLOB storage: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,7 +94,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
if err != nil {
|
||||
// may we need to handle this case in a special way
|
||||
// since the object has been successfully written to BlobStor
|
||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
return PutRes{}, fmt.Errorf("put object to metabase: %w", err)
|
||||
}
|
||||
|
||||
if res.Inserted {
|
||||
|
|
|
@ -67,7 +67,7 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
|||
|
||||
mRes, err := s.metaBase.Select(ctx, selectPrm)
|
||||
if err != nil {
|
||||
return SelectRes{}, fmt.Errorf("could not select objects from metabase: %w", err)
|
||||
return SelectRes{}, fmt.Errorf("select objects from metabase: %w", err)
|
||||
}
|
||||
|
||||
return SelectRes{
|
||||
|
|
|
@ -30,7 +30,7 @@ func IterateDB(db *bbolt.DB, f func(oid.Address) error) error {
|
|||
return b.ForEach(func(k, _ []byte) error {
|
||||
err := addr.DecodeString(string(k))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse object address: %w", err)
|
||||
return fmt.Errorf("parse object address: %w", err)
|
||||
}
|
||||
|
||||
return f(addr)
|
||||
|
|
|
@ -83,7 +83,7 @@ func (c *cache) closeStorage(ctx context.Context, shrink bool) error {
|
|||
}
|
||||
if !shrink {
|
||||
if err := c.fsTree.Close(ctx); err != nil {
|
||||
return fmt.Errorf("can't close write-cache storage: %w", err)
|
||||
return fmt.Errorf("close write-cache storage: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -98,16 +98,16 @@ func (c *cache) closeStorage(ctx context.Context, shrink bool) error {
|
|||
if errors.Is(err, errIterationCompleted) {
|
||||
empty = false
|
||||
} else {
|
||||
return fmt.Errorf("failed to check write-cache items: %w", err)
|
||||
return fmt.Errorf("check write-cache items: %w", err)
|
||||
}
|
||||
}
|
||||
if err := c.fsTree.Close(ctx); err != nil {
|
||||
return fmt.Errorf("can't close write-cache storage: %w", err)
|
||||
return fmt.Errorf("close write-cache storage: %w", err)
|
||||
}
|
||||
if empty {
|
||||
err := os.RemoveAll(c.path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to remove write-cache files: %w", err)
|
||||
return fmt.Errorf("remove write-cache files: %w", err)
|
||||
}
|
||||
} else {
|
||||
c.log.Info(ctx, logs.WritecacheShrinkSkippedNotEmpty)
|
||||
|
|
|
@ -31,10 +31,10 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
|||
fstree.WithFileCounter(c.counter),
|
||||
)
|
||||
if err := c.fsTree.Open(mod); err != nil {
|
||||
return fmt.Errorf("could not open FSTree: %w", err)
|
||||
return fmt.Errorf("open FSTree: %w", err)
|
||||
}
|
||||
if err := c.fsTree.Init(); err != nil {
|
||||
return fmt.Errorf("could not init FSTree: %w", err)
|
||||
return fmt.Errorf("init FSTree: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -25,11 +25,11 @@ func (c *cache) flushAndDropBBoltDB(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not check write-cache database existence: %w", err)
|
||||
return fmt.Errorf("check write-cache database existence: %w", err)
|
||||
}
|
||||
db, err := OpenDB(c.path, true, os.OpenFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not open write-cache database: %w", err)
|
||||
return fmt.Errorf("open write-cache database: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = db.Close()
|
||||
|
|
2
pkg/network/cache/multi.go
vendored
2
pkg/network/cache/multi.go
vendored
|
@ -155,7 +155,7 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
|
|||
group.IterateAddresses(func(addr network.Address) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
firstErr = context.Canceled
|
||||
firstErr = fmt.Errorf("try %v: %w", addr, context.Canceled)
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue