forked from TrueCloudLab/frostfs-node
Compare commits
7 commits
df05057ed4
...
7dfe1ee9c1
Author | SHA1 | Date | |
---|---|---|---|
7dfe1ee9c1 | |||
148d68933b | |||
51ee132ea3 | |||
226dd25dd0 | |||
bd0197eaa8 | |||
e44b84c18c | |||
bed49e6ace |
43 changed files with 137 additions and 170 deletions
|
@ -9,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
@ -43,6 +42,8 @@ func initObjectHashCmd() {
|
||||||
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
|
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
flags.String("range", "", "Range to take hash from in the form offset1:length1,...")
|
flags.String("range", "", "Range to take hash from in the form offset1:length1,...")
|
||||||
|
_ = objectHashCmd.MarkFlagRequired("range")
|
||||||
|
|
||||||
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
|
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
|
||||||
flags.String(getRangeHashSaltFlag, "", "Salt in hex format")
|
flags.String(getRangeHashSaltFlag, "", "Salt in hex format")
|
||||||
}
|
}
|
||||||
|
@ -66,36 +67,6 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
|
||||||
pk := key.GetOrGenerate(cmd)
|
pk := key.GetOrGenerate(cmd)
|
||||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||||
|
|
||||||
tz := typ == hashTz
|
|
||||||
fullHash := len(ranges) == 0
|
|
||||||
if fullHash {
|
|
||||||
var headPrm internalclient.HeadObjectPrm
|
|
||||||
headPrm.SetClient(cli)
|
|
||||||
Prepare(cmd, &headPrm)
|
|
||||||
headPrm.SetAddress(objAddr)
|
|
||||||
|
|
||||||
// get hash of full payload through HEAD (may be user can do it through dedicated command?)
|
|
||||||
res, err := internalclient.HeadObject(cmd.Context(), headPrm)
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
var cs checksum.Checksum
|
|
||||||
var csSet bool
|
|
||||||
|
|
||||||
if tz {
|
|
||||||
cs, csSet = res.Header().PayloadHomomorphicHash()
|
|
||||||
} else {
|
|
||||||
cs, csSet = res.Header().PayloadChecksum()
|
|
||||||
}
|
|
||||||
|
|
||||||
if csSet {
|
|
||||||
cmd.Println(hex.EncodeToString(cs.Value()))
|
|
||||||
} else {
|
|
||||||
cmd.Println("Missing checksum in object header.")
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var hashPrm internalclient.HashPayloadRangesPrm
|
var hashPrm internalclient.HashPayloadRangesPrm
|
||||||
hashPrm.SetClient(cli)
|
hashPrm.SetClient(cli)
|
||||||
Prepare(cmd, &hashPrm)
|
Prepare(cmd, &hashPrm)
|
||||||
|
@ -104,7 +75,7 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
|
||||||
hashPrm.SetSalt(salt)
|
hashPrm.SetSalt(salt)
|
||||||
hashPrm.SetRanges(ranges)
|
hashPrm.SetRanges(ranges)
|
||||||
|
|
||||||
if tz {
|
if typ == hashTz {
|
||||||
hashPrm.TZ()
|
hashPrm.TZ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1220,9 +1220,9 @@ func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
|
||||||
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
|
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
|
||||||
// with the binary-encoded information from the current node's configuration.
|
// with the binary-encoded information from the current node's configuration.
|
||||||
// The state is set using the provided setter which MUST NOT be nil.
|
// The state is set using the provided setter which MUST NOT be nil.
|
||||||
func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.NodeInfo)) error {
|
func (c *cfg) bootstrapWithState(ctx context.Context, state netmap.NodeState) error {
|
||||||
ni := c.cfgNodeInfo.localInfo
|
ni := c.cfgNodeInfo.localInfo
|
||||||
stateSetter(&ni)
|
ni.SetStatus(state)
|
||||||
|
|
||||||
prm := nmClient.AddPeerPrm{}
|
prm := nmClient.AddPeerPrm{}
|
||||||
prm.SetNodeInfo(ni)
|
prm.SetNodeInfo(ni)
|
||||||
|
@ -1232,9 +1232,7 @@ func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.N
|
||||||
|
|
||||||
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
|
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
|
||||||
func bootstrapOnline(ctx context.Context, c *cfg) error {
|
func bootstrapOnline(ctx context.Context, c *cfg) error {
|
||||||
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
return c.bootstrapWithState(ctx, netmap.Online)
|
||||||
ni.SetStatus(netmap.Online)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// bootstrap calls bootstrapWithState with:
|
// bootstrap calls bootstrapWithState with:
|
||||||
|
@ -1245,9 +1243,7 @@ func (c *cfg) bootstrap(ctx context.Context) error {
|
||||||
st := c.cfgNetmap.state.controlNetmapStatus()
|
st := c.cfgNetmap.state.controlNetmapStatus()
|
||||||
if st == control.NetmapStatus_MAINTENANCE {
|
if st == control.NetmapStatus_MAINTENANCE {
|
||||||
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
|
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
|
||||||
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
return c.bootstrapWithState(ctx, netmap.Maintenance)
|
||||||
ni.SetStatus(netmap.Maintenance)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
|
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
|
||||||
|
|
|
@ -12,7 +12,7 @@ type ApplicationInfo struct {
|
||||||
func NewApplicationInfo(version string) *ApplicationInfo {
|
func NewApplicationInfo(version string) *ApplicationInfo {
|
||||||
appInfo := &ApplicationInfo{
|
appInfo := &ApplicationInfo{
|
||||||
versionValue: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
versionValue: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Name: "app_info",
|
Name: "frostfs_node_app_info",
|
||||||
Help: "General information about the application.",
|
Help: "General information about the application.",
|
||||||
}, []string{"version"}),
|
}, []string{"version"}),
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't determine DB size: %w", err)
|
return fmt.Errorf("determine DB size: %w", err)
|
||||||
}
|
}
|
||||||
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
|
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
|
||||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
|
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||||
|
@ -140,7 +140,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
|
||||||
return saveItemsCount(tx, items)
|
return saveItemsCount(tx, items)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
|
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||||
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err)
|
return fmt.Errorf("save blobovnicza's size and items count: %w", err)
|
||||||
}
|
}
|
||||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
|
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,7 +146,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
|
||||||
if prm.ignoreErrors {
|
if prm.ignoreErrors {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decode address key: %w", err)
|
return fmt.Errorf("decode address key: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -115,13 +115,13 @@ func (b *Blobovniczas) getObject(ctx context.Context, blz *blobovnicza.Blobovnic
|
||||||
// decompress the data
|
// decompress the data
|
||||||
data, err := b.compression.Decompress(res.Object())
|
data, err := b.compression.Decompress(res.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmarshal the object
|
// unmarshal the object
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return common.GetRes{Object: obj, RawData: data}, nil
|
return common.GetRes{Object: obj, RawData: data}, nil
|
||||||
|
|
|
@ -130,13 +130,13 @@ func (b *Blobovniczas) getObjectRange(ctx context.Context, blz *blobovnicza.Blob
|
||||||
// decompress the data
|
// decompress the data
|
||||||
data, err := b.compression.Decompress(res.Object())
|
data, err := b.compression.Decompress(res.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
return common.GetRangeRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmarshal the object
|
// unmarshal the object
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
return common.GetRangeRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
from := prm.Range.GetOffset()
|
from := prm.Range.GetOffset()
|
||||||
|
|
|
@ -49,7 +49,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
||||||
zap.String("root_path", b.rootPath))
|
zap.String("root_path", b.rootPath))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.Handler != nil {
|
if prm.Handler != nil {
|
||||||
|
@ -82,7 +82,7 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
|
||||||
zap.String("root_path", b.rootPath))
|
zap.String("root_path", b.rootPath))
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
return false, fmt.Errorf("open blobovnicza %s: %w", p, err)
|
||||||
}
|
}
|
||||||
defer shBlz.Close(ctx)
|
defer shBlz.Close(ctx)
|
||||||
|
|
||||||
|
|
|
@ -69,10 +69,10 @@ func (b *sharedDB) Open(ctx context.Context) (*blobovnicza.Blobovnicza, error) {
|
||||||
)...)
|
)...)
|
||||||
|
|
||||||
if err := blz.Open(ctx); err != nil {
|
if err := blz.Open(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
|
return nil, fmt.Errorf("open blobovnicza %s: %w", b.path, err)
|
||||||
}
|
}
|
||||||
if err := blz.Init(ctx); err != nil {
|
if err := blz.Init(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
|
return nil, fmt.Errorf("init blobovnicza %s: %w", b.path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.refCount++
|
b.refCount++
|
||||||
|
@ -127,7 +127,7 @@ func (b *sharedDB) CloseAndRemoveFile(ctx context.Context) error {
|
||||||
zap.String("id", b.path),
|
zap.String("id", b.path),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
|
return fmt.Errorf("close blobovnicza (path = %s): %w", b.path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.refCount = 0
|
b.refCount = 0
|
||||||
|
|
|
@ -538,7 +538,7 @@ func (t *FSTree) countFiles() (uint64, uint64, error) {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
return 0, 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return count, size, nil
|
return count, size, nil
|
||||||
|
@ -577,7 +577,7 @@ func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
return 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
|
||||||
}
|
}
|
||||||
success = true
|
success = true
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
|
@ -47,13 +47,13 @@ func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes,
|
||||||
// Decompress the data.
|
// Decompress the data.
|
||||||
var err error
|
var err error
|
||||||
if data, err = s.compression.Decompress(data); err != nil {
|
if data, err = s.compression.Decompress(data); err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal the SDK object.
|
// Unmarshal the SDK object.
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return common.GetRes{Object: obj, RawData: data}, nil
|
return common.GetRes{Object: obj, RawData: data}, nil
|
||||||
|
|
|
@ -27,7 +27,7 @@ func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't set blobstor mode (old=%s, new=%s): %w", b.mode, m, err)
|
return fmt.Errorf("set blobstor mode (old=%s, new=%s): %w", b.mode, m, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.mode = m
|
b.mode = m
|
||||||
|
|
|
@ -52,7 +52,7 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
|
||||||
// marshal object
|
// marshal object
|
||||||
data, err := prm.Object.Marshal()
|
data, err := prm.Object.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.PutRes{}, fmt.Errorf("could not marshal the object: %w", err)
|
return common.PutRes{}, fmt.Errorf("marshal the object: %w", err)
|
||||||
}
|
}
|
||||||
prm.RawData = data
|
prm.RawData = data
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
err := eg.Wait()
|
err := eg.Wait()
|
||||||
close(errCh)
|
close(errCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to initialize shards: %w", err)
|
return fmt.Errorf("initialize shards: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for res := range errCh {
|
for res := range errCh {
|
||||||
|
@ -117,7 +117,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
return fmt.Errorf("initialize shard %s: %w", res.id, res.err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,7 +320,7 @@ loop:
|
||||||
for _, newID := range shardsToAdd {
|
for _, newID := range shardsToAdd {
|
||||||
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
return fmt.Errorf("add new shard with '%s' metabase path: %w", newID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
idStr := sh.ID().String()
|
idStr := sh.ID().String()
|
||||||
|
@ -331,13 +331,13 @@ loop:
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = sh.Close(ctx)
|
_ = sh.Close(ctx)
|
||||||
return fmt.Errorf("could not init %s shard: %w", idStr, err)
|
return fmt.Errorf("init %s shard: %w", idStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = e.addShard(sh)
|
err = e.addShard(sh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = sh.Close(ctx)
|
_ = sh.Close(ctx)
|
||||||
return fmt.Errorf("could not add %s shard: %w", idStr, err)
|
return fmt.Errorf("add %s shard: %w", idStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr))
|
e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr))
|
||||||
|
|
|
@ -578,7 +578,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
|
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
|
||||||
if prm.TreeHandler == nil {
|
if prm.TreeHandler == nil {
|
||||||
return false, "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
return false, "", fmt.Errorf("evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
||||||
|
|
|
@ -108,12 +108,12 @@ func (m *metricsWithID) SetEvacuationInProgress(value bool) {
|
||||||
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
||||||
sh, err := e.createShard(ctx, opts)
|
sh, err := e.createShard(ctx, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create a shard: %w", err)
|
return nil, fmt.Errorf("create a shard: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = e.addShard(sh)
|
err = e.addShard(sh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err)
|
return nil, fmt.Errorf("add %s shard: %w", sh.ID().String(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
|
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
|
||||||
|
@ -124,7 +124,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
|
||||||
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||||
id, err := generateShardID()
|
id, err := generateShardID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
return nil, fmt.Errorf("generate shard ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
opts = e.appendMetrics(id, opts)
|
opts = e.appendMetrics(id, opts)
|
||||||
|
@ -180,7 +180,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
||||||
|
|
||||||
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create pool: %w", err)
|
return fmt.Errorf("create pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
strID := sh.ID().String()
|
strID := sh.ID().String()
|
||||||
|
@ -374,7 +374,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
multiErrGuard.Lock()
|
multiErrGuard.Lock()
|
||||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
multiErr = errors.Join(multiErr, fmt.Errorf("change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
||||||
multiErrGuard.Unlock()
|
multiErrGuard.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,7 +385,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
multiErrGuard.Lock()
|
multiErrGuard.Lock()
|
||||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not close removed shard (id:%s): %w", sh.ID(), err))
|
multiErr = errors.Join(multiErr, fmt.Errorf("close removed shard (id:%s): %w", sh.ID(), err))
|
||||||
multiErrGuard.Unlock()
|
multiErrGuard.Unlock()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -54,7 +54,7 @@ func (db *DB) Open(ctx context.Context, m mode.Mode) error {
|
||||||
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
|
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
|
||||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
return fmt.Errorf("create dir %s for metabase: %w", db.info.Path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
db.log.Debug(ctx, logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
|
db.log.Debug(ctx, logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
|
||||||
|
@ -73,7 +73,7 @@ func (db *DB) openBolt(ctx context.Context) error {
|
||||||
|
|
||||||
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't open boltDB database: %w", err)
|
return fmt.Errorf("open boltDB database: %w", err)
|
||||||
}
|
}
|
||||||
db.boltDB.MaxBatchDelay = db.boltBatchDelay
|
db.boltDB.MaxBatchDelay = db.boltBatchDelay
|
||||||
db.boltDB.MaxBatchSize = db.boltBatchSize
|
db.boltDB.MaxBatchSize = db.boltBatchSize
|
||||||
|
@ -145,27 +145,27 @@ func (db *DB) init(reset bool) error {
|
||||||
if reset {
|
if reset {
|
||||||
err := tx.DeleteBucket(name)
|
err := tx.DeleteBucket(name)
|
||||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||||
return fmt.Errorf("could not delete static bucket %s: %w", k, err)
|
return fmt.Errorf("delete static bucket %s: %w", k, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := tx.CreateBucketIfNotExists(name)
|
_, err := tx.CreateBucketIfNotExists(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create static bucket %s: %w", k, err)
|
return fmt.Errorf("create static bucket %s: %w", k, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range deprecatedBuckets {
|
for _, b := range deprecatedBuckets {
|
||||||
err := tx.DeleteBucket(b)
|
err := tx.DeleteBucket(b)
|
||||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||||
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err)
|
return fmt.Errorf("delete deprecated bucket %s: %w", string(b), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reset { // counters will be recalculated by refill metabase
|
if !reset { // counters will be recalculated by refill metabase
|
||||||
err = syncCounter(tx, false)
|
err = syncCounter(tx, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not sync object counter: %w", err)
|
return fmt.Errorf("sync object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -238,14 +238,14 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
return fmt.Errorf("increase phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
return fmt.Errorf("increase logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
if isUserObject {
|
if isUserObject {
|
||||||
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase user object counter: %w", err)
|
return fmt.Errorf("increase user object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||||
|
@ -362,7 +362,7 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject
|
||||||
func syncCounter(tx *bbolt.Tx, force bool) error {
|
func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket)
|
shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
return fmt.Errorf("get shard info bucket: %w", err)
|
||||||
}
|
}
|
||||||
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
|
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
|
||||||
len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
|
len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
|
||||||
|
@ -375,7 +375,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
|
|
||||||
containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName)
|
containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get container counter bucket: %w", err)
|
return fmt.Errorf("get container counter bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
@ -428,7 +428,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not iterate objects: %w", err)
|
return fmt.Errorf("iterate objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
||||||
|
@ -448,7 +448,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
value := containerCounterValue(count)
|
value := containerCounterValue(count)
|
||||||
err := containerCounterB.Put(key, value)
|
err := containerCounterB.Put(key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update phy container object counter: %w", err)
|
return fmt.Errorf("update phy container object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
phyData := make([]byte, 8)
|
phyData := make([]byte, 8)
|
||||||
|
@ -456,7 +456,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
|
|
||||||
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update phy object counter: %w", err)
|
return fmt.Errorf("update phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logData := make([]byte, 8)
|
logData := make([]byte, 8)
|
||||||
|
@ -464,7 +464,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
|
|
||||||
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
return fmt.Errorf("update logic object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
userData := make([]byte, 8)
|
userData := make([]byte, 8)
|
||||||
|
@ -472,7 +472,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
|
|
||||||
err = shardInfoB.Put(objectUserCounterKey, userData)
|
err = shardInfoB.Put(objectUserCounterKey, userData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update user object counter: %w", err)
|
return fmt.Errorf("update user object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -492,7 +492,7 @@ func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
||||||
}
|
}
|
||||||
var cnrID cid.ID
|
var cnrID cid.ID
|
||||||
if err := cnrID.Decode(buf); err != nil {
|
if err := cnrID.Decode(buf); err != nil {
|
||||||
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
|
return cid.ID{}, fmt.Errorf("decode container ID: %w", err)
|
||||||
}
|
}
|
||||||
return cnrID, nil
|
return cnrID, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,26 +163,26 @@ func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
||||||
if res.phyCount > 0 {
|
if res.phyCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
|
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease phy object counter: %w", err)
|
return fmt.Errorf("decrease phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.logicCount > 0 {
|
if res.logicCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
|
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease logical object counter: %w", err)
|
return fmt.Errorf("decrease logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.userCount > 0 {
|
if res.userCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
|
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease user object counter: %w", err)
|
return fmt.Errorf("decrease user object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
||||||
return fmt.Errorf("could not decrease container object counter: %w", err)
|
return fmt.Errorf("decrease container object counter: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
if garbageBKT != nil {
|
if garbageBKT != nil {
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return deleteSingleResult{}, nil
|
return deleteSingleResult{}, nil
|
||||||
|
@ -280,7 +280,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
if garbageBKT != nil {
|
if garbageBKT != nil {
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,7 +308,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
// remove object
|
// remove object
|
||||||
err = db.deleteObject(tx, obj, false)
|
err = db.deleteObject(tx, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
||||||
|
@ -335,12 +335,12 @@ func (db *DB) deleteObject(
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, delListIndexItem)
|
err = updateListIndexes(tx, obj, delListIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't remove list indexes: %w", err)
|
return fmt.Errorf("remove list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
return fmt.Errorf("remove fake bucket tree indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if isParent {
|
if isParent {
|
||||||
|
@ -351,7 +351,7 @@ func (db *DB) deleteObject(
|
||||||
addrKey := addressKey(object.AddressOf(obj), key)
|
addrKey := addressKey(object.AddressOf(obj), key)
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
return fmt.Errorf("remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -529,7 +529,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
|
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,7 +567,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
|
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -229,7 +229,7 @@ func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, e
|
||||||
|
|
||||||
err := splitInfo.Unmarshal(bytes.Clone(rawSplitInfo))
|
err := splitInfo.Unmarshal(bytes.Clone(rawSplitInfo))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
return nil, fmt.Errorf("unmarshal split info from root index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return splitInfo, nil
|
return splitInfo, nil
|
||||||
|
|
|
@ -187,7 +187,7 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
||||||
|
|
||||||
err = child.Unmarshal(bytes.Clone(data))
|
err = child.Unmarshal(bytes.Clone(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't unmarshal child with parent: %w", err)
|
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
par := child.Parent()
|
par := child.Parent()
|
||||||
|
|
|
@ -177,7 +177,7 @@ type gcHandler struct {
|
||||||
func (g gcHandler) handleKV(k, _ []byte) error {
|
func (g gcHandler) handleKV(k, _ []byte) error {
|
||||||
o, err := garbageFromKV(k)
|
o, err := garbageFromKV(k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse garbage object: %w", err)
|
return fmt.Errorf("parse garbage object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return g.h(o)
|
return g.h(o)
|
||||||
|
@ -190,7 +190,7 @@ type graveyardHandler struct {
|
||||||
func (g graveyardHandler) handleKV(k, v []byte) error {
|
func (g graveyardHandler) handleKV(k, v []byte) error {
|
||||||
o, err := graveFromKV(k, v)
|
o, err := graveFromKV(k, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse grave: %w", err)
|
return fmt.Errorf("parse grave: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return g.h(o)
|
return g.h(o)
|
||||||
|
@ -240,7 +240,7 @@ func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *oid.Address)
|
||||||
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
||||||
err = decodeAddressFromKey(&res.addr, k)
|
err = decodeAddressFromKey(&res.addr, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("could not parse address: %w", err)
|
err = fmt.Errorf("parse address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -373,7 +373,7 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
|
||||||
if data != nil {
|
if data != nil {
|
||||||
err := targetBucket.Delete(tombKey)
|
err := targetBucket.Delete(tombKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
|
|
||||||
if !db.mode.NoMetabase() {
|
if !db.mode.NoMetabase() {
|
||||||
if err := db.Close(ctx); err != nil {
|
if err := db.Close(ctx); err != nil {
|
||||||
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
err = db.Init(ctx)
|
err = db.Init(ctx)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -180,18 +180,18 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
|
|
||||||
err := putUniqueIndexes(tx, obj, si, id)
|
err := putUniqueIndexes(tx, obj, si, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put unique indexes: %w", err)
|
return fmt.Errorf("put unique indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, putListIndexItem)
|
err = updateListIndexes(tx, obj, putListIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put list indexes: %w", err)
|
return fmt.Errorf("put list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if indexAttributes {
|
if indexAttributes {
|
||||||
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
return fmt.Errorf("put fake bucket tree indexes: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,7 +250,7 @@ func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, ad
|
||||||
}
|
}
|
||||||
rawObject, err := obj.CutPayload().Marshal()
|
rawObject, err := obj.CutPayload().Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't marshal object header: %w", err)
|
return fmt.Errorf("marshal object header: %w", err)
|
||||||
}
|
}
|
||||||
return putUniqueIndexItem(tx, namedBucketItem{
|
return putUniqueIndexItem(tx, namedBucketItem{
|
||||||
name: bucketName,
|
name: bucketName,
|
||||||
|
@ -475,7 +475,7 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
|
||||||
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := update(bkt.Get(item.key), item.val)
|
data, err := update(bkt.Get(item.key), item.val)
|
||||||
|
@ -492,12 +492,12 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
return fmt.Errorf("create fake bucket tree index %v: %w", item.key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return fkbtRoot.Put(item.val, zeroValue)
|
return fkbtRoot.Put(item.val, zeroValue)
|
||||||
|
@ -506,19 +506,19 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lst, err := decodeList(bkt.Get(item.key))
|
lst, err := decodeList(bkt.Get(item.key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
|
return fmt.Errorf("decode leaf list %v: %w", item.key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lst = append(lst, item.val)
|
lst = append(lst, item.val)
|
||||||
|
|
||||||
encodedLst, err := encodeList(lst)
|
encodedLst, err := encodeList(lst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
|
return fmt.Errorf("encode leaf list %v: %w", item.key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return bkt.Put(item.key, encodedLst)
|
return bkt.Put(item.key, encodedLst)
|
||||||
|
|
|
@ -565,7 +565,7 @@ func groupFilters(filters objectSDK.SearchFilters, useAttributeIndex bool) (filt
|
||||||
case v2object.FilterHeaderContainerID: // support deprecated field
|
case v2object.FilterHeaderContainerID: // support deprecated field
|
||||||
err := res.cnr.DecodeString(filters[i].Value())
|
err := res.cnr.DecodeString(filters[i].Value())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return filterGroup{}, fmt.Errorf("can't parse container id: %w", err)
|
return filterGroup{}, fmt.Errorf("parse container id: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res.withCnrFilter = true
|
res.withCnrFilter = true
|
||||||
|
|
|
@ -32,13 +32,13 @@ func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.openDB(ctx, mode); err != nil {
|
if err := db.openDB(ctx, mode); err != nil {
|
||||||
return nil, fmt.Errorf("failed to open metabase: %w", err)
|
return nil, fmt.Errorf("open metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := db.readShardID()
|
id, err := db.readShardID()
|
||||||
|
|
||||||
if cErr := db.close(); cErr != nil {
|
if cErr := db.close(); cErr != nil {
|
||||||
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
|
||||||
}
|
}
|
||||||
|
|
||||||
return id, metaerr.Wrap(err)
|
return id, metaerr.Wrap(err)
|
||||||
|
@ -70,7 +70,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.openDB(ctx, mode); err != nil {
|
if err := db.openDB(ctx, mode); err != nil {
|
||||||
return fmt.Errorf("failed to open metabase: %w", err)
|
return fmt.Errorf("open metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.writeShardID(id)
|
err := db.writeShardID(id)
|
||||||
|
@ -79,7 +79,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
|
||||||
}
|
}
|
||||||
|
|
||||||
if cErr := db.close(); cErr != nil {
|
if cErr := db.close(); cErr != nil {
|
||||||
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
|
||||||
}
|
}
|
||||||
|
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
|
|
|
@ -95,7 +95,7 @@ func compactDB(db *bbolt.DB) error {
|
||||||
NoSync: true,
|
NoSync: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't open new metabase to compact: %w", err)
|
return fmt.Errorf("open new metabase to compact: %w", err)
|
||||||
}
|
}
|
||||||
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
|
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
|
||||||
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
||||||
|
@ -292,7 +292,7 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i
|
||||||
}
|
}
|
||||||
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
|
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
return fmt.Errorf("parse expiration epoch: %w", err)
|
||||||
}
|
}
|
||||||
expirationEpochBucket := b.Bucket(attrValue)
|
expirationEpochBucket := b.Bucket(attrValue)
|
||||||
attrKeyValueC := expirationEpochBucket.Cursor()
|
attrKeyValueC := expirationEpochBucket.Cursor()
|
||||||
|
@ -399,7 +399,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
attr, ok := attributeFromAttributeBucket(key)
|
attr, ok := attributeFromAttributeBucket(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
return nil, fmt.Errorf("parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
||||||
}
|
}
|
||||||
if !IsAtrributeIndexed(attr) {
|
if !IsAtrributeIndexed(attr) {
|
||||||
keysToDrop = append(keysToDrop, key)
|
keysToDrop = append(keysToDrop, key)
|
||||||
|
@ -407,7 +407,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
|
||||||
}
|
}
|
||||||
contID, ok := cidFromAttributeBucket(key)
|
contID, ok := cidFromAttributeBucket(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
return nil, fmt.Errorf("parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
||||||
}
|
}
|
||||||
info, err := cs.Info(contID)
|
info, err := cs.Info(contID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -231,11 +231,11 @@ func parseExpirationEpochKey(key []byte) (uint64, cid.ID, oid.ID, error) {
|
||||||
epoch := binary.BigEndian.Uint64(key)
|
epoch := binary.BigEndian.Uint64(key)
|
||||||
var cnr cid.ID
|
var cnr cid.ID
|
||||||
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
|
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
|
||||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (container ID): %w", err)
|
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (container ID): %w", err)
|
||||||
}
|
}
|
||||||
var obj oid.ID
|
var obj oid.ID
|
||||||
if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
|
if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
|
||||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (object ID): %w", err)
|
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (object ID): %w", err)
|
||||||
}
|
}
|
||||||
return epoch, cnr, obj, nil
|
return epoch, cnr, obj, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
|
||||||
|
|
||||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create auxiliary bucket: %w", err)
|
return fmt.Errorf("create auxiliary bucket: %w", err)
|
||||||
}
|
}
|
||||||
return b.Put(versionKey, data)
|
return b.Put(versionKey, data)
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (t *boltForest) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
|
return fmt.Errorf("set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.mode = m
|
t.mode = m
|
||||||
|
@ -128,7 +128,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
|
||||||
readOnly := m.ReadOnly()
|
readOnly := m.ReadOnly()
|
||||||
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
|
return metaerr.Wrap(fmt.Errorf("create dir %s for the pilorama: %w", t.path, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := *bbolt.DefaultOptions
|
opts := *bbolt.DefaultOptions
|
||||||
|
@ -139,7 +139,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
|
||||||
|
|
||||||
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(fmt.Errorf("can't open the pilorama DB: %w", err))
|
return metaerr.Wrap(fmt.Errorf("open the pilorama DB: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
t.db.MaxBatchSize = t.maxBatchSize
|
t.db.MaxBatchSize = t.maxBatchSize
|
||||||
|
@ -1360,7 +1360,7 @@ func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, err
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err))
|
return nil, metaerr.Wrap(fmt.Errorf("list trees: %w", err))
|
||||||
}
|
}
|
||||||
success = true
|
success = true
|
||||||
return ids, nil
|
return ids, nil
|
||||||
|
@ -1504,7 +1504,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
|
||||||
|
|
||||||
var contID cidSDK.ID
|
var contID cidSDK.ID
|
||||||
if err := contID.Decode(k[:32]); err != nil {
|
if err := contID.Decode(k[:32]); err != nil {
|
||||||
return fmt.Errorf("failed to decode containerID: %w", err)
|
return fmt.Errorf("decode container ID: %w", err)
|
||||||
}
|
}
|
||||||
res.Items = append(res.Items, ContainerIDTreeID{
|
res.Items = append(res.Items, ContainerIDTreeID{
|
||||||
CID: contID,
|
CID: contID,
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||||
|
|
||||||
size, err := s.metaBase.ContainerSize(prm.cnr)
|
size, err := s.metaBase.ContainerSize(prm.cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ContainerSizeRes{}, fmt.Errorf("could not get container size: %w", err)
|
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ContainerSizeRes{
|
return ContainerSizeRes{
|
||||||
|
@ -71,7 +71,7 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
|
||||||
|
|
||||||
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err)
|
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ContainerCountRes{
|
return ContainerCountRes{
|
||||||
|
|
|
@ -38,7 +38,7 @@ func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err err
|
||||||
|
|
||||||
err = s.SetMode(ctx, mode.DegradedReadOnly)
|
err = s.SetMode(ctx, mode.DegradedReadOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
return fmt.Errorf("switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
||||||
for j := i + 1; j < len(components); j++ {
|
for j := i + 1; j < len(components); j++ {
|
||||||
if err := components[j].Open(ctx, m); err != nil {
|
if err := components[j].Open(ctx, m); err != nil {
|
||||||
// Other components must be opened, fail.
|
// Other components must be opened, fail.
|
||||||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
return fmt.Errorf("open %T: %w", components[j], err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = s.handleMetabaseFailure(ctx, "open", err)
|
err = s.handleMetabaseFailure(ctx, "open", err)
|
||||||
|
@ -83,7 +83,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("could not open %T: %w", component, err)
|
return fmt.Errorf("open %T: %w", component, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -184,7 +184,7 @@ func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("could not initialize %T: %w", component, err)
|
return fmt.Errorf("initialize %T: %w", component, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -205,7 +205,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
|
|
||||||
err := s.metaBase.Reset()
|
err := s.metaBase.Reset()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not reset metabase: %w", err)
|
return fmt.Errorf("reset metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
withCount := true
|
withCount := true
|
||||||
|
@ -254,12 +254,12 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
|
|
||||||
err = errors.Join(egErr, itErr)
|
err = errors.Join(egErr, itErr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not put objects to the meta: %w", err)
|
return fmt.Errorf("put objects to the meta: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.metaBase.SyncCounters()
|
err = s.metaBase.SyncCounters()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not sync object counters: %w", err)
|
return fmt.Errorf("sync object counters: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
success = true
|
success = true
|
||||||
|
@ -318,7 +318,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
|
||||||
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
var lock objectSDK.Lock
|
var lock objectSDK.Lock
|
||||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
return fmt.Errorf("unmarshal lock content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||||
|
@ -328,7 +328,7 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not lock objects: %w", err)
|
return fmt.Errorf("lock objects: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -337,7 +337,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
tombstone := objectSDK.NewTombstone()
|
tombstone := objectSDK.NewTombstone()
|
||||||
|
|
||||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
return fmt.Errorf("unmarshal tombstone content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tombAddr := object.AddressOf(obj)
|
tombAddr := object.AddressOf(obj)
|
||||||
|
@ -358,7 +358,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
|
|
||||||
_, err := s.metaBase.Inhume(ctx, inhumePrm)
|
_, err := s.metaBase.Inhume(ctx, inhumePrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not inhume objects: %w", err)
|
return fmt.Errorf("inhume objects: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,7 +175,7 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
|
||||||
|
|
||||||
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
return nil, true, fmt.Errorf("fetch blobovnicza id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
storageID := mExRes.StorageID()
|
storageID := mExRes.StorageID()
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||||
modeDegraded := s.GetMode().NoMetabase()
|
modeDegraded := s.GetMode().NoMetabase()
|
||||||
if !modeDegraded {
|
if !modeDegraded {
|
||||||
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
|
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
|
||||||
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
err = fmt.Errorf("read shard id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||||
|
|
||||||
if len(idFromMetabase) == 0 && !modeDegraded {
|
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||||
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
||||||
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
|
err = errors.Join(err, fmt.Errorf("write shard id to metabase: %w", setErr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|
|
@ -109,7 +109,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
|
||||||
|
|
||||||
lst, err := s.metaBase.Containers(ctx)
|
lst, err := s.metaBase.Containers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("can't list stored containers: %w", err)
|
return res, fmt.Errorf("list stored containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
filters := objectSDK.NewSearchFilters()
|
filters := objectSDK.NewSearchFilters()
|
||||||
|
@ -149,7 +149,7 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
|
||||||
|
|
||||||
containers, err := s.metaBase.Containers(ctx)
|
containers, err := s.metaBase.Containers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err)
|
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ListContainersRes{
|
return ListContainersRes{
|
||||||
|
@ -180,7 +180,7 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
||||||
metaPrm.SetCursor(prm.cursor)
|
metaPrm.SetCursor(prm.cursor)
|
||||||
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
|
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err)
|
return ListWithCursorRes{}, fmt.Errorf("get list of objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ListWithCursorRes{
|
return ListWithCursorRes{
|
||||||
|
@ -208,7 +208,7 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
|
||||||
metaPrm.Handler = prm.Handler
|
metaPrm.Handler = prm.Handler
|
||||||
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not iterate over containers: %w", err)
|
return fmt.Errorf("iterate over containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -235,7 +235,7 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
||||||
metaPrm.Handler = prm.Handler
|
metaPrm.Handler = prm.Handler
|
||||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not iterate over objects: %w", err)
|
return fmt.Errorf("iterate over objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -258,7 +258,7 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
|
||||||
metaPrm.ContainerID = prm.ContainerID
|
metaPrm.ContainerID = prm.ContainerID
|
||||||
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
|
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
return 0, fmt.Errorf("count alive objects in bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return count, nil
|
return count, nil
|
||||||
|
|
|
@ -81,7 +81,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
|
|
||||||
res, err = s.blobStor.Put(ctx, putPrm)
|
res, err = s.blobStor.Put(ctx, putPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
return PutRes{}, fmt.Errorf("put object to BLOB storage: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// may we need to handle this case in a special way
|
// may we need to handle this case in a special way
|
||||||
// since the object has been successfully written to BlobStor
|
// since the object has been successfully written to BlobStor
|
||||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
return PutRes{}, fmt.Errorf("put object to metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Inserted {
|
if res.Inserted {
|
||||||
|
|
|
@ -67,7 +67,7 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
||||||
|
|
||||||
mRes, err := s.metaBase.Select(ctx, selectPrm)
|
mRes, err := s.metaBase.Select(ctx, selectPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return SelectRes{}, fmt.Errorf("could not select objects from metabase: %w", err)
|
return SelectRes{}, fmt.Errorf("select objects from metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
|
|
|
@ -30,7 +30,7 @@ func IterateDB(db *bbolt.DB, f func(oid.Address) error) error {
|
||||||
return b.ForEach(func(k, _ []byte) error {
|
return b.ForEach(func(k, _ []byte) error {
|
||||||
err := addr.DecodeString(string(k))
|
err := addr.DecodeString(string(k))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse object address: %w", err)
|
return fmt.Errorf("parse object address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return f(addr)
|
return f(addr)
|
||||||
|
|
|
@ -83,7 +83,7 @@ func (c *cache) closeStorage(ctx context.Context, shrink bool) error {
|
||||||
}
|
}
|
||||||
if !shrink {
|
if !shrink {
|
||||||
if err := c.fsTree.Close(ctx); err != nil {
|
if err := c.fsTree.Close(ctx); err != nil {
|
||||||
return fmt.Errorf("can't close write-cache storage: %w", err)
|
return fmt.Errorf("close write-cache storage: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -98,16 +98,16 @@ func (c *cache) closeStorage(ctx context.Context, shrink bool) error {
|
||||||
if errors.Is(err, errIterationCompleted) {
|
if errors.Is(err, errIterationCompleted) {
|
||||||
empty = false
|
empty = false
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("failed to check write-cache items: %w", err)
|
return fmt.Errorf("check write-cache items: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := c.fsTree.Close(ctx); err != nil {
|
if err := c.fsTree.Close(ctx); err != nil {
|
||||||
return fmt.Errorf("can't close write-cache storage: %w", err)
|
return fmt.Errorf("close write-cache storage: %w", err)
|
||||||
}
|
}
|
||||||
if empty {
|
if empty {
|
||||||
err := os.RemoveAll(c.path)
|
err := os.RemoveAll(c.path)
|
||||||
if err != nil && !os.IsNotExist(err) {
|
if err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("failed to remove write-cache files: %w", err)
|
return fmt.Errorf("remove write-cache files: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.log.Info(ctx, logs.WritecacheShrinkSkippedNotEmpty)
|
c.log.Info(ctx, logs.WritecacheShrinkSkippedNotEmpty)
|
||||||
|
|
|
@ -31,10 +31,10 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
||||||
fstree.WithFileCounter(c.counter),
|
fstree.WithFileCounter(c.counter),
|
||||||
)
|
)
|
||||||
if err := c.fsTree.Open(mod); err != nil {
|
if err := c.fsTree.Open(mod); err != nil {
|
||||||
return fmt.Errorf("could not open FSTree: %w", err)
|
return fmt.Errorf("open FSTree: %w", err)
|
||||||
}
|
}
|
||||||
if err := c.fsTree.Init(); err != nil {
|
if err := c.fsTree.Init(); err != nil {
|
||||||
return fmt.Errorf("could not init FSTree: %w", err)
|
return fmt.Errorf("init FSTree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -25,11 +25,11 @@ func (c *cache) flushAndDropBBoltDB(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not check write-cache database existence: %w", err)
|
return fmt.Errorf("check write-cache database existence: %w", err)
|
||||||
}
|
}
|
||||||
db, err := OpenDB(c.path, true, os.OpenFile)
|
db, err := OpenDB(c.path, true, os.OpenFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not open write-cache database: %w", err)
|
return fmt.Errorf("open write-cache database: %w", err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
|
|
2
pkg/network/cache/multi.go
vendored
2
pkg/network/cache/multi.go
vendored
|
@ -155,7 +155,7 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
|
||||||
group.IterateAddresses(func(addr network.Address) bool {
|
group.IterateAddresses(func(addr network.Address) bool {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
firstErr = context.Canceled
|
firstErr = fmt.Errorf("try %v: %w", addr, context.Canceled)
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue