forked from TrueCloudLab/frostfs-node
Compare commits
58 commits
fix/ape_lo
...
master
Author | SHA1 | Date | |
---|---|---|---|
a788d44773 | |||
603015d029 | |||
30e14d50ef | |||
951a7ee1c7 | |||
0bcbeb26b2 | |||
c98357606b | |||
80de5d70bf | |||
57efa0bc8e | |||
26e0c82fb8 | |||
4538ccb12a | |||
84e1599997 | |||
5a270e2e61 | |||
436d65d784 | |||
c3c034ecca | |||
05fd999162 | |||
eff95bd632 | |||
fb928616cc | |||
4d5ae59a52 | |||
a9f27e074b | |||
6c51f48aab | |||
a2485637bb | |||
09faca034c | |||
ceac1c8709 | |||
f7e75b13b0 | |||
198aaebc94 | |||
85af6bcd5c | |||
8a658de0b2 | |||
3900b92927 | |||
5ccb3394b4 | |||
dc410fca90 | |||
cddcd73f04 | |||
d7fcc5ce30 | |||
c0221d76e6 | |||
242f0095d0 | |||
6fe34d266a | |||
fa08bfa553 | |||
0da998ef50 | |||
e44782473a | |||
9cd1bcef06 | |||
ca0a33ea0f | |||
f6c5222952 | |||
ea868e09f8 | |||
31d3d299bf | |||
b5b4f78b49 | |||
2832f44437 | |||
7c3bcb0f44 | |||
e64871c3fd | |||
303cd35a01 | |||
bb9ba1bce2 | |||
db03742d33 | |||
148d68933b | |||
51ee132ea3 | |||
226dd25dd0 | |||
bd0197eaa8 | |||
e44b84c18c | |||
bed49e6ace | |||
df05057ed4 | |||
b6c8ebf493 |
141 changed files with 1371 additions and 836 deletions
28
.forgejo/workflows/oci-image.yml
Normal file
28
.forgejo/workflows/oci-image.yml
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
name: OCI image
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
image:
|
||||||
|
name: Build container images
|
||||||
|
runs-on: docker
|
||||||
|
container: git.frostfs.info/truecloudlab/env:oci-image-builder-bookworm
|
||||||
|
steps:
|
||||||
|
- name: Clone git repo
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Build OCI image
|
||||||
|
run: make images
|
||||||
|
|
||||||
|
- name: Push image to OCI registry
|
||||||
|
run: |
|
||||||
|
echo "$REGISTRY_PASSWORD" \
|
||||||
|
| docker login --username truecloudlab --password-stdin git.frostfs.info
|
||||||
|
make push-images
|
||||||
|
if: >-
|
||||||
|
startsWith(github.ref, 'refs/tags/v') &&
|
||||||
|
(github.event_name == 'workflow_dispatch' || github.event_name == 'push')
|
||||||
|
env:
|
||||||
|
REGISTRY_PASSWORD: ${{secrets.FORGEJO_OCI_REGISTRY_PUSH_TOKEN}}
|
|
@ -89,5 +89,7 @@ linters:
|
||||||
- protogetter
|
- protogetter
|
||||||
- intrange
|
- intrange
|
||||||
- tenv
|
- tenv
|
||||||
|
- unconvert
|
||||||
|
- unparam
|
||||||
disable-all: true
|
disable-all: true
|
||||||
fast: false
|
fast: false
|
||||||
|
|
21
Makefile
21
Makefile
|
@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
|
||||||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||||
|
|
||||||
GO_VERSION ?= 1.22
|
GO_VERSION ?= 1.22
|
||||||
LINT_VERSION ?= 1.62.0
|
LINT_VERSION ?= 1.62.2
|
||||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
|
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
|
||||||
PROTOC_VERSION ?= 25.0
|
PROTOC_VERSION ?= 25.0
|
||||||
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
|
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
|
||||||
|
@ -139,6 +139,15 @@ images: image-storage image-ir image-cli image-adm
|
||||||
# Build dirty local Docker images
|
# Build dirty local Docker images
|
||||||
dirty-images: image-dirty-storage image-dirty-ir image-dirty-cli image-dirty-adm
|
dirty-images: image-dirty-storage image-dirty-ir image-dirty-cli image-dirty-adm
|
||||||
|
|
||||||
|
# Push FrostFS components' docker image to the registry
|
||||||
|
push-image-%:
|
||||||
|
@echo "⇒ Publish FrostFS $* docker image "
|
||||||
|
@docker push $(HUB_IMAGE)-$*:$(HUB_TAG)
|
||||||
|
|
||||||
|
# Push all Docker images to the registry
|
||||||
|
.PHONY: push-images
|
||||||
|
push-images: push-image-storage push-image-ir push-image-cli push-image-adm
|
||||||
|
|
||||||
# Run `make %` in Golang container
|
# Run `make %` in Golang container
|
||||||
docker/%:
|
docker/%:
|
||||||
docker run --rm -t \
|
docker run --rm -t \
|
||||||
|
@ -270,10 +279,12 @@ env-up: all
|
||||||
echo "Frostfs contracts not found"; exit 1; \
|
echo "Frostfs contracts not found"; exit 1; \
|
||||||
fi
|
fi
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet01.json --gas 10.0
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --gas 10.0 \
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet02.json --gas 10.0
|
--storage-wallet ./dev/storage/wallet01.json \
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet03.json --gas 10.0
|
--storage-wallet ./dev/storage/wallet02.json \
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet04.json --gas 10.0
|
--storage-wallet ./dev/storage/wallet03.json \
|
||||||
|
--storage-wallet ./dev/storage/wallet04.json
|
||||||
|
|
||||||
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
|
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
|
||||||
make locode-download; \
|
make locode-download; \
|
||||||
fi
|
fi
|
||||||
|
|
|
@ -253,7 +253,7 @@ func frostfsidListNamespaces(cmd *cobra.Command, _ []string) {
|
||||||
reader := frostfsidrpclient.NewReader(inv, hash)
|
reader := frostfsidrpclient.NewReader(inv, hash)
|
||||||
sessionID, it, err := reader.ListNamespaces()
|
sessionID, it, err := reader.ListNamespaces()
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
namespaces, err := frostfsidclient.ParseNamespaces(items)
|
namespaces, err := frostfsidclient.ParseNamespaces(items)
|
||||||
|
@ -305,7 +305,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListNamespaceSubjects(ns)
|
sessionID, it, err := reader.ListNamespaceSubjects(ns)
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
|
|
||||||
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, iteratorBatchSize, sessionID))
|
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, sessionID))
|
||||||
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
|
||||||
|
|
||||||
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
|
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
|
||||||
|
@ -319,7 +319,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListSubjects()
|
sessionID, it, err := reader.ListSubjects()
|
||||||
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
subj, err := frostfsidclient.ParseSubject(items)
|
subj, err := frostfsidclient.ParseSubject(items)
|
||||||
|
@ -365,7 +365,7 @@ func frostfsidListGroups(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListGroups(ns)
|
sessionID, it, err := reader.ListGroups(ns)
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
||||||
groups, err := frostfsidclient.ParseGroups(items)
|
groups, err := frostfsidclient.ParseGroups(items)
|
||||||
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
|
||||||
|
@ -415,7 +415,7 @@ func frostfsidListGroupSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
|
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
|
||||||
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
|
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
|
||||||
|
@ -492,17 +492,17 @@ func (f *frostfsidClient) sendWaitRes() (*state.AppExecResult, error) {
|
||||||
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
|
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func readIterator(inv *invoker.Invoker, iter *result.Iterator, batchSize int, sessionID uuid.UUID) ([]stackitem.Item, error) {
|
func readIterator(inv *invoker.Invoker, iter *result.Iterator, sessionID uuid.UUID) ([]stackitem.Item, error) {
|
||||||
var shouldStop bool
|
var shouldStop bool
|
||||||
res := make([]stackitem.Item, 0)
|
res := make([]stackitem.Item, 0)
|
||||||
for !shouldStop {
|
for !shouldStop {
|
||||||
items, err := inv.TraverseIterator(sessionID, iter, batchSize)
|
items, err := inv.TraverseIterator(sessionID, iter, iteratorBatchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res = append(res, items...)
|
res = append(res, items...)
|
||||||
shouldStop = len(items) < batchSize
|
shouldStop = len(items) < iteratorBatchSize
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||||
|
@ -141,41 +140,11 @@ func addMultisigAccount(w *wallet.Wallet, m int, name, password string, pubs key
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateStorageCreds(cmd *cobra.Command, _ []string) error {
|
func generateStorageCreds(cmd *cobra.Command, _ []string) error {
|
||||||
return refillGas(cmd, storageGasConfigFlag, true)
|
walletPath, _ := cmd.Flags().GetString(commonflags.StorageWalletFlag)
|
||||||
}
|
w, err := wallet.NewWallet(walletPath)
|
||||||
|
|
||||||
func refillGas(cmd *cobra.Command, gasFlag string, createWallet bool) (err error) {
|
|
||||||
// storage wallet path is not part of the config
|
|
||||||
storageWalletPath, _ := cmd.Flags().GetString(commonflags.StorageWalletFlag)
|
|
||||||
// wallet address is not part of the config
|
|
||||||
walletAddress, _ := cmd.Flags().GetString(walletAddressFlag)
|
|
||||||
|
|
||||||
var gasReceiver util.Uint160
|
|
||||||
|
|
||||||
if len(walletAddress) != 0 {
|
|
||||||
gasReceiver, err = address.StringToUint160(walletAddress)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("invalid wallet address %s: %w", walletAddress, err)
|
return fmt.Errorf("create wallet: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if storageWalletPath == "" {
|
|
||||||
return fmt.Errorf("missing wallet path (use '--%s <out.json>')", commonflags.StorageWalletFlag)
|
|
||||||
}
|
|
||||||
|
|
||||||
var w *wallet.Wallet
|
|
||||||
|
|
||||||
if createWallet {
|
|
||||||
w, err = wallet.NewWallet(storageWalletPath)
|
|
||||||
} else {
|
|
||||||
w, err = wallet.NewWalletFromFile(storageWalletPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create wallet: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if createWallet {
|
|
||||||
var password string
|
|
||||||
|
|
||||||
label, _ := cmd.Flags().GetString(storageWalletLabelFlag)
|
label, _ := cmd.Flags().GetString(storageWalletLabelFlag)
|
||||||
password, err := config.GetStoragePassword(viper.GetViper(), label)
|
password, err := config.GetStoragePassword(viper.GetViper(), label)
|
||||||
|
@ -190,11 +159,10 @@ func refillGas(cmd *cobra.Command, gasFlag string, createWallet bool) (err error
|
||||||
if err := w.CreateAccount(label, password); err != nil {
|
if err := w.CreateAccount(label, password); err != nil {
|
||||||
return fmt.Errorf("can't create account: %w", err)
|
return fmt.Errorf("can't create account: %w", err)
|
||||||
}
|
}
|
||||||
|
return refillGas(cmd, storageGasConfigFlag, w.Accounts[0].ScriptHash())
|
||||||
}
|
}
|
||||||
|
|
||||||
gasReceiver = w.Accounts[0].Contract.ScriptHash()
|
func refillGas(cmd *cobra.Command, gasFlag string, gasReceivers ...util.Uint160) (err error) {
|
||||||
}
|
|
||||||
|
|
||||||
gasStr := viper.GetString(gasFlag)
|
gasStr := viper.GetString(gasFlag)
|
||||||
|
|
||||||
gasAmount, err := helper.ParseGASAmount(gasStr)
|
gasAmount, err := helper.ParseGASAmount(gasStr)
|
||||||
|
@ -208,9 +176,11 @@ func refillGas(cmd *cobra.Command, gasFlag string, createWallet bool) (err error
|
||||||
}
|
}
|
||||||
|
|
||||||
bw := io.NewBufBinWriter()
|
bw := io.NewBufBinWriter()
|
||||||
|
for _, gasReceiver := range gasReceivers {
|
||||||
emit.AppCall(bw.BinWriter, gas.Hash, "transfer", callflag.All,
|
emit.AppCall(bw.BinWriter, gas.Hash, "transfer", callflag.All,
|
||||||
wCtx.CommitteeAcc.Contract.ScriptHash(), gasReceiver, int64(gasAmount), nil)
|
wCtx.CommitteeAcc.Contract.ScriptHash(), gasReceiver, int64(gasAmount), nil)
|
||||||
emit.Opcodes(bw.BinWriter, opcode.ASSERT)
|
emit.Opcodes(bw.BinWriter, opcode.ASSERT)
|
||||||
|
}
|
||||||
if bw.Err != nil {
|
if bw.Err != nil {
|
||||||
return fmt.Errorf("BUG: invalid transfer arguments: %w", bw.Err)
|
return fmt.Errorf("BUG: invalid transfer arguments: %w", bw.Err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,12 @@
|
||||||
package generate
|
package generate
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
@ -33,7 +38,27 @@ var (
|
||||||
_ = viper.BindPFlag(commonflags.RefillGasAmountFlag, cmd.Flags().Lookup(commonflags.RefillGasAmountFlag))
|
_ = viper.BindPFlag(commonflags.RefillGasAmountFlag, cmd.Flags().Lookup(commonflags.RefillGasAmountFlag))
|
||||||
},
|
},
|
||||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||||
return refillGas(cmd, commonflags.RefillGasAmountFlag, false)
|
storageWalletPaths, _ := cmd.Flags().GetStringArray(commonflags.StorageWalletFlag)
|
||||||
|
walletAddresses, _ := cmd.Flags().GetStringArray(walletAddressFlag)
|
||||||
|
|
||||||
|
var gasReceivers []util.Uint160
|
||||||
|
for _, walletAddress := range walletAddresses {
|
||||||
|
addr, err := address.StringToUint160(walletAddress)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid wallet address %s: %w", walletAddress, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gasReceivers = append(gasReceivers, addr)
|
||||||
|
}
|
||||||
|
for _, storageWalletPath := range storageWalletPaths {
|
||||||
|
w, err := wallet.NewWalletFromFile(storageWalletPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't create wallet: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gasReceivers = append(gasReceivers, w.Accounts[0].Contract.ScriptHash())
|
||||||
|
}
|
||||||
|
return refillGas(cmd, commonflags.RefillGasAmountFlag, gasReceivers...)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
GenerateAlphabetCmd = &cobra.Command{
|
GenerateAlphabetCmd = &cobra.Command{
|
||||||
|
@ -50,10 +75,10 @@ var (
|
||||||
func initRefillGasCmd() {
|
func initRefillGasCmd() {
|
||||||
RefillGasCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
|
RefillGasCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
|
||||||
RefillGasCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
|
RefillGasCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
|
||||||
RefillGasCmd.Flags().String(commonflags.StorageWalletFlag, "", "Path to storage node wallet")
|
RefillGasCmd.Flags().StringArray(commonflags.StorageWalletFlag, nil, "Path to storage node wallet")
|
||||||
RefillGasCmd.Flags().String(walletAddressFlag, "", "Address of wallet")
|
RefillGasCmd.Flags().StringArray(walletAddressFlag, nil, "Address of wallet")
|
||||||
RefillGasCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Additional amount of GAS to transfer")
|
RefillGasCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Additional amount of GAS to transfer")
|
||||||
RefillGasCmd.MarkFlagsMutuallyExclusive(walletAddressFlag, commonflags.StorageWalletFlag)
|
RefillGasCmd.MarkFlagsOneRequired(walletAddressFlag, commonflags.StorageWalletFlag)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initGenerateStorageCmd() {
|
func initGenerateStorageCmd() {
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
|
||||||
|
@ -41,7 +40,8 @@ func depositNotary(cmd *cobra.Command, _ []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
accHash := w.GetChangeAddress()
|
accHash := w.GetChangeAddress()
|
||||||
if addr, err := cmd.Flags().GetString(walletAccountFlag); err == nil {
|
addr, _ := cmd.Flags().GetString(walletAccountFlag)
|
||||||
|
if addr != "" {
|
||||||
accHash, err = address.StringToUint160(addr)
|
accHash, err = address.StringToUint160(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("invalid address: %s", addr)
|
return fmt.Errorf("invalid address: %s", addr)
|
||||||
|
@ -73,17 +73,10 @@ func depositNotary(cmd *cobra.Command, _ []string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
till := int64(defaultNotaryDepositLifetime)
|
till, _ := cmd.Flags().GetInt64(notaryDepositTillFlag)
|
||||||
tillStr, err := cmd.Flags().GetString(notaryDepositTillFlag)
|
if till <= 0 {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if tillStr != "" {
|
|
||||||
till, err = strconv.ParseInt(tillStr, 10, 64)
|
|
||||||
if err != nil || till <= 0 {
|
|
||||||
return errInvalidNotaryDepositLifetime
|
return errInvalidNotaryDepositLifetime
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return transferGas(cmd, acc, accHash, gasAmount, till)
|
return transferGas(cmd, acc, accHash, gasAmount, till)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ func initDepositoryNotaryCmd() {
|
||||||
DepositCmd.Flags().String(commonflags.StorageWalletFlag, "", "Path to storage node wallet")
|
DepositCmd.Flags().String(commonflags.StorageWalletFlag, "", "Path to storage node wallet")
|
||||||
DepositCmd.Flags().String(walletAccountFlag, "", "Wallet account address")
|
DepositCmd.Flags().String(walletAccountFlag, "", "Wallet account address")
|
||||||
DepositCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Amount of GAS to deposit")
|
DepositCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Amount of GAS to deposit")
|
||||||
DepositCmd.Flags().String(notaryDepositTillFlag, "", "Notary deposit duration in blocks")
|
DepositCmd.Flags().Int64(notaryDepositTillFlag, defaultNotaryDepositLifetime, "Notary deposit duration in blocks")
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
@ -20,23 +20,32 @@ const (
|
||||||
accountAddressFlag = "account"
|
accountAddressFlag = "account"
|
||||||
)
|
)
|
||||||
|
|
||||||
func addProxyAccount(cmd *cobra.Command, _ []string) {
|
func parseAddresses(cmd *cobra.Command) []util.Uint160 {
|
||||||
acc, _ := cmd.Flags().GetString(accountAddressFlag)
|
var addrs []util.Uint160
|
||||||
|
|
||||||
|
accs, _ := cmd.Flags().GetStringArray(accountAddressFlag)
|
||||||
|
for _, acc := range accs {
|
||||||
addr, err := address.StringToUint160(acc)
|
addr, err := address.StringToUint160(acc)
|
||||||
commonCmd.ExitOnErr(cmd, "invalid account: %w", err)
|
commonCmd.ExitOnErr(cmd, "invalid account: %w", err)
|
||||||
err = processAccount(cmd, addr, "addAccount")
|
|
||||||
|
addrs = append(addrs, addr)
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
|
}
|
||||||
|
|
||||||
|
func addProxyAccount(cmd *cobra.Command, _ []string) {
|
||||||
|
addrs := parseAddresses(cmd)
|
||||||
|
err := processAccount(cmd, addrs, "addAccount")
|
||||||
commonCmd.ExitOnErr(cmd, "processing error: %w", err)
|
commonCmd.ExitOnErr(cmd, "processing error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeProxyAccount(cmd *cobra.Command, _ []string) {
|
func removeProxyAccount(cmd *cobra.Command, _ []string) {
|
||||||
acc, _ := cmd.Flags().GetString(accountAddressFlag)
|
addrs := parseAddresses(cmd)
|
||||||
addr, err := address.StringToUint160(acc)
|
err := processAccount(cmd, addrs, "removeAccount")
|
||||||
commonCmd.ExitOnErr(cmd, "invalid account: %w", err)
|
|
||||||
err = processAccount(cmd, addr, "removeAccount")
|
|
||||||
commonCmd.ExitOnErr(cmd, "processing error: %w", err)
|
commonCmd.ExitOnErr(cmd, "processing error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func processAccount(cmd *cobra.Command, addr util.Uint160, method string) error {
|
func processAccount(cmd *cobra.Command, addrs []util.Uint160, method string) error {
|
||||||
wCtx, err := helper.NewInitializeContext(cmd, viper.GetViper())
|
wCtx, err := helper.NewInitializeContext(cmd, viper.GetViper())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't initialize context: %w", err)
|
return fmt.Errorf("can't initialize context: %w", err)
|
||||||
|
@ -54,7 +63,9 @@ func processAccount(cmd *cobra.Command, addr util.Uint160, method string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
bw := io.NewBufBinWriter()
|
bw := io.NewBufBinWriter()
|
||||||
|
for _, addr := range addrs {
|
||||||
emit.AppCall(bw.BinWriter, proxyHash, method, callflag.All, addr)
|
emit.AppCall(bw.BinWriter, proxyHash, method, callflag.All, addr)
|
||||||
|
}
|
||||||
|
|
||||||
if err := wCtx.SendConsensusTx(bw.Bytes()); err != nil {
|
if err := wCtx.SendConsensusTx(bw.Bytes()); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -29,13 +29,15 @@ var (
|
||||||
|
|
||||||
func initProxyAddAccount() {
|
func initProxyAddAccount() {
|
||||||
AddAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
|
AddAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
|
||||||
AddAccountCmd.Flags().String(accountAddressFlag, "", "Wallet address string")
|
AddAccountCmd.Flags().StringArray(accountAddressFlag, nil, "Wallet address string")
|
||||||
|
_ = AddAccountCmd.MarkFlagRequired(accountAddressFlag)
|
||||||
AddAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
|
AddAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initProxyRemoveAccount() {
|
func initProxyRemoveAccount() {
|
||||||
RemoveAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
|
RemoveAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
|
||||||
RemoveAccountCmd.Flags().String(accountAddressFlag, "", "Wallet address string")
|
RemoveAccountCmd.Flags().StringArray(accountAddressFlag, nil, "Wallet address string")
|
||||||
|
_ = AddAccountCmd.MarkFlagRequired(accountAddressFlag)
|
||||||
RemoveAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
|
RemoveAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"text/template"
|
"text/template"
|
||||||
|
@ -105,7 +106,7 @@ func storageConfig(cmd *cobra.Command, args []string) {
|
||||||
fatalOnErr(errors.New("can't find account in wallet"))
|
fatalOnErr(errors.New("can't find account in wallet"))
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Wallet.Password, err = input.ReadPassword(fmt.Sprintf("Account password for %s: ", c.Wallet.Account))
|
c.Wallet.Password, err = input.ReadPassword(fmt.Sprintf("Enter password for %s > ", c.Wallet.Account))
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
err = acc.Decrypt(c.Wallet.Password, keys.NEP2ScryptParams())
|
err = acc.Decrypt(c.Wallet.Password, keys.NEP2ScryptParams())
|
||||||
|
@ -410,8 +411,7 @@ func initClient(rpc []string) *rpcclient.Client {
|
||||||
var c *rpcclient.Client
|
var c *rpcclient.Client
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
shuffled := make([]string, len(rpc))
|
shuffled := slices.Clone(rpc)
|
||||||
copy(shuffled, rpc)
|
|
||||||
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
||||||
|
|
||||||
for _, endpoint := range shuffled {
|
for _, endpoint := range shuffled {
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"slices"
|
"slices"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
||||||
|
@ -78,13 +77,31 @@ func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContain
|
||||||
// SortedIDList returns sorted list of identifiers of user's containers.
|
// SortedIDList returns sorted list of identifiers of user's containers.
|
||||||
func (x ListContainersRes) SortedIDList() []cid.ID {
|
func (x ListContainersRes) SortedIDList() []cid.ID {
|
||||||
list := x.cliRes.Containers()
|
list := x.cliRes.Containers()
|
||||||
sort.Slice(list, func(i, j int) bool {
|
slices.SortFunc(list, func(lhs, rhs cid.ID) int {
|
||||||
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString())
|
||||||
return strings.Compare(lhs, rhs) < 0
|
|
||||||
})
|
})
|
||||||
return list
|
return list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ListContainersStream(ctx context.Context, prm ListContainersPrm, processCnr func(id cid.ID) bool) (err error) {
|
||||||
|
cliPrm := &client.PrmContainerListStream{
|
||||||
|
XHeaders: prm.XHeaders,
|
||||||
|
OwnerID: prm.OwnerID,
|
||||||
|
Session: prm.Session,
|
||||||
|
}
|
||||||
|
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("init container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rdr.Iterate(processCnr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// PutContainerPrm groups parameters of PutContainer operation.
|
// PutContainerPrm groups parameters of PutContainer operation.
|
||||||
type PutContainerPrm struct {
|
type PutContainerPrm struct {
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
|
|
|
@ -52,7 +52,7 @@ func genereateAPEOverride(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
outputPath, _ := cmd.Flags().GetString(outputFlag)
|
outputPath, _ := cmd.Flags().GetString(outputFlag)
|
||||||
if outputPath != "" {
|
if outputPath != "" {
|
||||||
err := os.WriteFile(outputPath, []byte(overrideMarshalled), 0o644)
|
err := os.WriteFile(outputPath, overrideMarshalled, 0o644)
|
||||||
commonCmd.ExitOnErr(cmd, "dump error: %w", err)
|
commonCmd.ExitOnErr(cmd, "dump error: %w", err)
|
||||||
} else {
|
} else {
|
||||||
fmt.Print("\n")
|
fmt.Print("\n")
|
||||||
|
|
|
@ -6,8 +6,11 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// flags of list command.
|
// flags of list command.
|
||||||
|
@ -51,34 +54,52 @@ var listContainersCmd = &cobra.Command{
|
||||||
|
|
||||||
var prm internalclient.ListContainersPrm
|
var prm internalclient.ListContainersPrm
|
||||||
prm.SetClient(cli)
|
prm.SetClient(cli)
|
||||||
prm.Account = idUser
|
prm.OwnerID = idUser
|
||||||
|
|
||||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
prmGet := internalclient.GetContainerPrm{
|
prmGet := internalclient.GetContainerPrm{
|
||||||
Client: cli,
|
Client: cli,
|
||||||
}
|
}
|
||||||
|
var containerIDs []cid.ID
|
||||||
|
|
||||||
containerIDs := res.SortedIDList()
|
err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) bool {
|
||||||
for _, cnrID := range containerIDs {
|
printContainer(cmd, prmGet, id)
|
||||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
return false
|
||||||
cmd.Println(cnrID.String())
|
})
|
||||||
continue
|
if err == nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
prmGet.ClientParams.ContainerID = &cnrID
|
if e, ok := status.FromError(err); ok && e.Code() == codes.Unimplemented {
|
||||||
|
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
containerIDs = res.SortedIDList()
|
||||||
|
} else {
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cnrID := range containerIDs {
|
||||||
|
printContainer(cmd, prmGet, cnrID)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func printContainer(cmd *cobra.Command, prmGet internalclient.GetContainerPrm, id cid.ID) {
|
||||||
|
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||||
|
cmd.Println(id.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
prmGet.ClientParams.ContainerID = &id
|
||||||
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Printf(" failed to read attributes: %v\n", err)
|
cmd.Printf(" failed to read attributes: %v\n", err)
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cnr := res.Container()
|
cnr := res.Container()
|
||||||
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
cmd.Println(cnrID.String())
|
cmd.Println(id.String())
|
||||||
|
|
||||||
if flagVarListPrintAttr {
|
if flagVarListPrintAttr {
|
||||||
cnr.IterateUserAttributes(func(key, val string) {
|
cnr.IterateUserAttributes(func(key, val string) {
|
||||||
|
@ -86,8 +107,6 @@ var listContainersCmd = &cobra.Command{
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func initContainerListContainersCmd() {
|
func initContainerListContainersCmd() {
|
||||||
commonflags.Init(listContainersCmd)
|
commonflags.Init(listContainersCmd)
|
||||||
|
|
|
@ -23,11 +23,11 @@ type policyPlaygroundREPL struct {
|
||||||
nodes map[string]netmap.NodeInfo
|
nodes map[string]netmap.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPolicyPlaygroundREPL(cmd *cobra.Command) (*policyPlaygroundREPL, error) {
|
func newPolicyPlaygroundREPL(cmd *cobra.Command) *policyPlaygroundREPL {
|
||||||
return &policyPlaygroundREPL{
|
return &policyPlaygroundREPL{
|
||||||
cmd: cmd,
|
cmd: cmd,
|
||||||
nodes: map[string]netmap.NodeInfo{},
|
nodes: map[string]netmap.NodeInfo{},
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repl *policyPlaygroundREPL) handleLs(args []string) error {
|
func (repl *policyPlaygroundREPL) handleLs(args []string) error {
|
||||||
|
@ -246,8 +246,7 @@ var policyPlaygroundCmd = &cobra.Command{
|
||||||
Long: `A REPL for testing placement policies.
|
Long: `A REPL for testing placement policies.
|
||||||
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
|
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
|
||||||
Run: func(cmd *cobra.Command, _ []string) {
|
Run: func(cmd *cobra.Command, _ []string) {
|
||||||
repl, err := newPolicyPlaygroundREPL(cmd)
|
repl := newPolicyPlaygroundREPL(cmd)
|
||||||
commonCmd.ExitOnErr(cmd, "could not create policy playground: %w", err)
|
|
||||||
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
|
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
@ -42,7 +41,9 @@ func initObjectHashCmd() {
|
||||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
|
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
flags.String("range", "", "Range to take hash from in the form offset1:length1,...")
|
flags.StringSlice("range", nil, "Range to take hash from in the form offset1:length1,...")
|
||||||
|
_ = objectHashCmd.MarkFlagRequired("range")
|
||||||
|
|
||||||
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
|
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
|
||||||
flags.String(getRangeHashSaltFlag, "", "Salt in hex format")
|
flags.String(getRangeHashSaltFlag, "", "Salt in hex format")
|
||||||
}
|
}
|
||||||
|
@ -66,36 +67,6 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
|
||||||
pk := key.GetOrGenerate(cmd)
|
pk := key.GetOrGenerate(cmd)
|
||||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||||
|
|
||||||
tz := typ == hashTz
|
|
||||||
fullHash := len(ranges) == 0
|
|
||||||
if fullHash {
|
|
||||||
var headPrm internalclient.HeadObjectPrm
|
|
||||||
headPrm.SetClient(cli)
|
|
||||||
Prepare(cmd, &headPrm)
|
|
||||||
headPrm.SetAddress(objAddr)
|
|
||||||
|
|
||||||
// get hash of full payload through HEAD (may be user can do it through dedicated command?)
|
|
||||||
res, err := internalclient.HeadObject(cmd.Context(), headPrm)
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
var cs checksum.Checksum
|
|
||||||
var csSet bool
|
|
||||||
|
|
||||||
if tz {
|
|
||||||
cs, csSet = res.Header().PayloadHomomorphicHash()
|
|
||||||
} else {
|
|
||||||
cs, csSet = res.Header().PayloadChecksum()
|
|
||||||
}
|
|
||||||
|
|
||||||
if csSet {
|
|
||||||
cmd.Println(hex.EncodeToString(cs.Value()))
|
|
||||||
} else {
|
|
||||||
cmd.Println("Missing checksum in object header.")
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var hashPrm internalclient.HashPayloadRangesPrm
|
var hashPrm internalclient.HashPayloadRangesPrm
|
||||||
hashPrm.SetClient(cli)
|
hashPrm.SetClient(cli)
|
||||||
Prepare(cmd, &hashPrm)
|
Prepare(cmd, &hashPrm)
|
||||||
|
@ -104,7 +75,7 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
|
||||||
hashPrm.SetSalt(salt)
|
hashPrm.SetSalt(salt)
|
||||||
hashPrm.SetRanges(ranges)
|
hashPrm.SetRanges(ranges)
|
||||||
|
|
||||||
if tz {
|
if typ == hashTz {
|
||||||
hashPrm.TZ()
|
hashPrm.TZ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@ func initObjectPatchCmd() {
|
||||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
flags.String(newAttrsFlagName, "", "New object attributes in form of Key1=Value1,Key2=Value2")
|
flags.StringSlice(newAttrsFlagName, nil, "New object attributes in form of Key1=Value1,Key2=Value2")
|
||||||
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
|
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
|
||||||
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
|
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
|
||||||
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
|
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
|
||||||
|
@ -99,11 +99,9 @@ func patch(cmd *cobra.Command, _ []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
||||||
var rawAttrs []string
|
rawAttrs, err := cmd.Flags().GetStringSlice(newAttrsFlagName)
|
||||||
|
if err != nil {
|
||||||
raw := cmd.Flag(newAttrsFlagName).Value.String()
|
return nil, err
|
||||||
if len(raw) != 0 {
|
|
||||||
rawAttrs = strings.Split(raw, ",")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
||||||
|
|
|
@ -50,7 +50,7 @@ func initObjectPutCmd() {
|
||||||
|
|
||||||
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||||
|
|
||||||
flags.String("attributes", "", "User attributes in form of Key1=Value1,Key2=Value2")
|
flags.StringSlice("attributes", nil, "User attributes in form of Key1=Value1,Key2=Value2")
|
||||||
flags.Bool("disable-filename", false, "Do not set well-known filename attribute")
|
flags.Bool("disable-filename", false, "Do not set well-known filename attribute")
|
||||||
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
|
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
|
||||||
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
|
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
|
||||||
|
@ -214,11 +214,9 @@ func getAllObjectAttributes(cmd *cobra.Command) []objectSDK.Attribute {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
func parseObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
||||||
var rawAttrs []string
|
rawAttrs, err := cmd.Flags().GetStringSlice("attributes")
|
||||||
|
if err != nil {
|
||||||
raw := cmd.Flag("attributes").Value.String()
|
return nil, err
|
||||||
if len(raw) != 0 {
|
|
||||||
rawAttrs = strings.Split(raw, ",")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
||||||
|
|
|
@ -38,7 +38,7 @@ func initObjectRangeCmd() {
|
||||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
flags.String("range", "", "Range to take data from in the form offset:length")
|
flags.StringSlice("range", nil, "Range to take data from in the form offset:length")
|
||||||
flags.String(fileFlag, "", "File to write object payload to. Default: stdout.")
|
flags.String(fileFlag, "", "File to write object payload to. Default: stdout.")
|
||||||
flags.Bool(rawFlag, false, rawFlagDesc)
|
flags.Bool(rawFlag, false, rawFlagDesc)
|
||||||
}
|
}
|
||||||
|
@ -195,11 +195,10 @@ func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
||||||
v := cmd.Flag("range").Value.String()
|
vs, err := cmd.Flags().GetStringSlice("range")
|
||||||
if len(v) == 0 {
|
if len(vs) == 0 || err != nil {
|
||||||
return nil, nil
|
return nil, err
|
||||||
}
|
}
|
||||||
vs := strings.Split(v, ",")
|
|
||||||
rs := make([]objectSDK.Range, len(vs))
|
rs := make([]objectSDK.Range, len(vs))
|
||||||
for i := range vs {
|
for i := range vs {
|
||||||
before, after, found := strings.Cut(vs[i], rangeSep)
|
before, after, found := strings.Cut(vs[i], rangeSep)
|
||||||
|
|
|
@ -124,10 +124,7 @@ func (v *BucketsView) loadNodeChildren(
|
||||||
path := parentBucket.Path
|
path := parentBucket.Path
|
||||||
parser := parentBucket.NextParser
|
parser := parentBucket.NextParser
|
||||||
|
|
||||||
buffer, err := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
|
buffer := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range buffer {
|
for item := range buffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -135,6 +132,7 @@ func (v *BucketsView) loadNodeChildren(
|
||||||
}
|
}
|
||||||
bucket := item.val
|
bucket := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
|
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -180,10 +178,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Check the current bucket's nested buckets if exist
|
// Check the current bucket's nested buckets if exist
|
||||||
bucketsBuffer, err := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
bucketsBuffer := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range bucketsBuffer {
|
for item := range bucketsBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -191,6 +186,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
b := item.val
|
b := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
|
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -206,10 +202,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the current bucket's nested records if exist
|
// Check the current bucket's nested records if exist
|
||||||
recordsBuffer, err := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
recordsBuffer := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range recordsBuffer {
|
for item := range recordsBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -217,6 +210,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
r := item.val
|
r := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
|
r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
|
|
@ -35,7 +35,7 @@ func resolvePath(tx *bbolt.Tx, path [][]byte) (*bbolt.Bucket, error) {
|
||||||
func load[T any](
|
func load[T any](
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
filter func(key, value []byte) bool, transform func(key, value []byte) T,
|
filter func(key, value []byte) bool, transform func(key, value []byte) T,
|
||||||
) (<-chan Item[T], error) {
|
) <-chan Item[T] {
|
||||||
buffer := make(chan Item[T], bufferSize)
|
buffer := make(chan Item[T], bufferSize)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -77,13 +77,13 @@ func load[T any](
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadBuckets(
|
func LoadBuckets(
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
) (<-chan Item[*Bucket], error) {
|
) <-chan Item[*Bucket] {
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, bufferSize,
|
ctx, db, path, bufferSize,
|
||||||
func(_, value []byte) bool {
|
func(_, value []byte) bool {
|
||||||
return value == nil
|
return value == nil
|
||||||
|
@ -98,17 +98,14 @@ func LoadBuckets(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadRecords(
|
func LoadRecords(
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
) (<-chan Item[*Record], error) {
|
) <-chan Item[*Record] {
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, bufferSize,
|
ctx, db, path, bufferSize,
|
||||||
func(_, value []byte) bool {
|
func(_, value []byte) bool {
|
||||||
return value != nil
|
return value != nil
|
||||||
|
@ -124,11 +121,8 @@ func LoadRecords(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBuckets checks if a bucket has nested buckets. It relies on assumption
|
// HasBuckets checks if a bucket has nested buckets. It relies on assumption
|
||||||
|
@ -137,24 +131,21 @@ func HasBuckets(ctx context.Context, db *bbolt.DB, path [][]byte) (bool, error)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, 1,
|
ctx, db, path, 1,
|
||||||
nil,
|
nil,
|
||||||
func(_, value []byte) []byte { return value },
|
func(_, value []byte) []byte { return value },
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
x, ok := <-buffer
|
x, ok := <-buffer
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if x.err != nil {
|
if x.err != nil {
|
||||||
return false, err
|
return false, x.err
|
||||||
}
|
}
|
||||||
if x.val != nil {
|
if x.val != nil {
|
||||||
return false, err
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,10 +62,7 @@ func (v *RecordsView) Mount(ctx context.Context) error {
|
||||||
|
|
||||||
ctx, v.onUnmount = context.WithCancel(ctx)
|
ctx, v.onUnmount = context.WithCancel(ctx)
|
||||||
|
|
||||||
tempBuffer, err := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
|
tempBuffer := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
v.buffer = make(chan *Record, v.ui.loadBufferSize)
|
v.buffer = make(chan *Record, v.ui.loadBufferSize)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -73,11 +70,12 @@ func (v *RecordsView) Mount(ctx context.Context) error {
|
||||||
|
|
||||||
for item := range tempBuffer {
|
for item := range tempBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
v.ui.stopOnError(err)
|
v.ui.stopOnError(item.err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
record := item.val
|
record := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
|
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
v.ui.stopOnError(err)
|
v.ui.stopOnError(err)
|
||||||
|
|
|
@ -19,6 +19,7 @@ func initAPEManagerService(c *cfg) {
|
||||||
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
||||||
|
|
||||||
execsvc := apemanager.New(c.cfgObject.cnrSource, contractStorage,
|
execsvc := apemanager.New(c.cfgObject.cnrSource, contractStorage,
|
||||||
|
c.cfgMorph.client,
|
||||||
apemanager.WithLogger(c.log))
|
apemanager.WithLogger(c.log))
|
||||||
sigsvc := apemanager.NewSignService(&c.key.PrivateKey, execsvc)
|
sigsvc := apemanager.NewSignService(&c.key.PrivateKey, execsvc)
|
||||||
auditSvc := apemanager.NewAuditService(sigsvc, c.log, c.audit)
|
auditSvc := apemanager.NewAuditService(sigsvc, c.log, c.audit)
|
||||||
|
|
|
@ -609,6 +609,7 @@ type cfgContainer struct {
|
||||||
parsers map[event.Type]event.NotificationParser
|
parsers map[event.Type]event.NotificationParser
|
||||||
subscribers map[event.Type][]event.Handler
|
subscribers map[event.Type][]event.Handler
|
||||||
workerPool util.WorkerPool // pool for asynchronous handlers
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
||||||
|
containerBatchSize uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgFrostfsID struct {
|
type cfgFrostfsID struct {
|
||||||
|
@ -697,8 +698,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
|
|
||||||
netState.metrics = c.metricsCollector
|
netState.metrics = c.metricsCollector
|
||||||
|
|
||||||
logPrm, err := c.loggerPrm()
|
logPrm := c.loggerPrm()
|
||||||
fatalOnErr(err)
|
|
||||||
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
||||||
log, err := logger.NewLogger(logPrm)
|
log, err := logger.NewLogger(logPrm)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
@ -853,7 +853,7 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
|
||||||
|
|
||||||
func initCfgGRPC() cfgGRPC {
|
func initCfgGRPC() cfgGRPC {
|
||||||
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
||||||
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
|
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
||||||
|
|
||||||
return cfgGRPC{
|
return cfgGRPC{
|
||||||
maxChunkSize: maxChunkSize,
|
maxChunkSize: maxChunkSize,
|
||||||
|
@ -1058,7 +1058,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
||||||
return sh
|
return sh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
func (c *cfg) loggerPrm() *logger.Prm {
|
||||||
// check if it has been inited before
|
// check if it has been inited before
|
||||||
if c.dynamicConfiguration.logger == nil {
|
if c.dynamicConfiguration.logger == nil {
|
||||||
c.dynamicConfiguration.logger = new(logger.Prm)
|
c.dynamicConfiguration.logger = new(logger.Prm)
|
||||||
|
@ -1077,7 +1077,7 @@ func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
||||||
}
|
}
|
||||||
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
|
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
|
||||||
|
|
||||||
return c.dynamicConfiguration.logger, nil
|
return c.dynamicConfiguration.logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) LocalAddress() network.AddressGroup {
|
func (c *cfg) LocalAddress() network.AddressGroup {
|
||||||
|
@ -1146,7 +1146,7 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) {
|
||||||
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
||||||
|
|
||||||
cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
|
cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
|
||||||
if cacheSize > 0 {
|
if cacheSize > 0 && c.cfgMorph.cacheTTL > 0 {
|
||||||
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
|
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1219,9 +1219,9 @@ func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
|
||||||
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
|
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
|
||||||
// with the binary-encoded information from the current node's configuration.
|
// with the binary-encoded information from the current node's configuration.
|
||||||
// The state is set using the provided setter which MUST NOT be nil.
|
// The state is set using the provided setter which MUST NOT be nil.
|
||||||
func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.NodeInfo)) error {
|
func (c *cfg) bootstrapWithState(ctx context.Context, state netmap.NodeState) error {
|
||||||
ni := c.cfgNodeInfo.localInfo
|
ni := c.cfgNodeInfo.localInfo
|
||||||
stateSetter(&ni)
|
ni.SetStatus(state)
|
||||||
|
|
||||||
prm := nmClient.AddPeerPrm{}
|
prm := nmClient.AddPeerPrm{}
|
||||||
prm.SetNodeInfo(ni)
|
prm.SetNodeInfo(ni)
|
||||||
|
@ -1231,9 +1231,7 @@ func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.N
|
||||||
|
|
||||||
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
|
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
|
||||||
func bootstrapOnline(ctx context.Context, c *cfg) error {
|
func bootstrapOnline(ctx context.Context, c *cfg) error {
|
||||||
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
return c.bootstrapWithState(ctx, netmap.Online)
|
||||||
ni.SetStatus(netmap.Online)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// bootstrap calls bootstrapWithState with:
|
// bootstrap calls bootstrapWithState with:
|
||||||
|
@ -1244,9 +1242,7 @@ func (c *cfg) bootstrap(ctx context.Context) error {
|
||||||
st := c.cfgNetmap.state.controlNetmapStatus()
|
st := c.cfgNetmap.state.controlNetmapStatus()
|
||||||
if st == control.NetmapStatus_MAINTENANCE {
|
if st == control.NetmapStatus_MAINTENANCE {
|
||||||
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
|
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
|
||||||
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
return c.bootstrapWithState(ctx, netmap.Maintenance)
|
||||||
ni.SetStatus(netmap.Maintenance)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
|
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
|
||||||
|
@ -1337,11 +1333,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
|
|
||||||
// Logger
|
// Logger
|
||||||
|
|
||||||
logPrm, err := c.loggerPrm()
|
logPrm := c.loggerPrm()
|
||||||
if err != nil {
|
|
||||||
c.log.Error(ctx, logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
components := c.getComponents(ctx, logPrm)
|
components := c.getComponents(ctx, logPrm)
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
|
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
|
||||||
|
@ -52,6 +53,5 @@ func (x *Config) Value(name string) any {
|
||||||
// It supports only one level of nesting and is intended to be used
|
// It supports only one level of nesting and is intended to be used
|
||||||
// to provide default values.
|
// to provide default values.
|
||||||
func (x *Config) SetDefault(from *Config) {
|
func (x *Config) SetDefault(from *Config) {
|
||||||
x.defaultPath = make([]string, len(from.path))
|
x.defaultPath = slices.Clone(from.path)
|
||||||
copy(x.defaultPath, from.path)
|
|
||||||
}
|
}
|
||||||
|
|
27
cmd/frostfs-node/config/container/container.go
Normal file
27
cmd/frostfs-node/config/container/container.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package containerconfig
|
||||||
|
|
||||||
|
import "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
|
||||||
|
const (
|
||||||
|
subsection = "container"
|
||||||
|
listStreamSubsection = "list_stream"
|
||||||
|
|
||||||
|
// ContainerBatchSizeDefault represents the maximum amount of containers to send via stream at once.
|
||||||
|
ContainerBatchSizeDefault = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
// ContainerBatchSize returns the value of "batch_size" config parameter
|
||||||
|
// from "list_stream" subsection of "container" section.
|
||||||
|
//
|
||||||
|
// Returns ContainerBatchSizeDefault if the value is missing or if
|
||||||
|
// the value is not positive integer.
|
||||||
|
func ContainerBatchSize(c *config.Config) uint32 {
|
||||||
|
if c.Sub(subsection).Sub(listStreamSubsection).Value("batch_size") == nil {
|
||||||
|
return ContainerBatchSizeDefault
|
||||||
|
}
|
||||||
|
size := config.Uint32Safe(c.Sub(subsection).Sub(listStreamSubsection), "batch_size")
|
||||||
|
if size == 0 {
|
||||||
|
return ContainerBatchSizeDefault
|
||||||
|
}
|
||||||
|
return size
|
||||||
|
}
|
27
cmd/frostfs-node/config/container/container_test.go
Normal file
27
cmd/frostfs-node/config/container/container_test.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package containerconfig_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||||
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestContainerSection(t *testing.T) {
|
||||||
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
empty := configtest.EmptyConfig()
|
||||||
|
require.Equal(t, uint32(containerconfig.ContainerBatchSizeDefault), containerconfig.ContainerBatchSize(empty))
|
||||||
|
})
|
||||||
|
|
||||||
|
const path = "../../../../config/example/node"
|
||||||
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
require.Equal(t, uint32(500), containerconfig.ContainerBatchSize(c))
|
||||||
|
}
|
||||||
|
|
||||||
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
t.Run("ENV", func(t *testing.T) {
|
||||||
|
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||||
|
})
|
||||||
|
}
|
|
@ -198,7 +198,7 @@ func (l PersistentPolicyRulesConfig) Path() string {
|
||||||
//
|
//
|
||||||
// Returns PermDefault if the value is not a positive number.
|
// Returns PermDefault if the value is not a positive number.
|
||||||
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
||||||
p := config.UintSafe((*config.Config)(l.cfg), "perm")
|
p := config.UintSafe(l.cfg, "perm")
|
||||||
if p == 0 {
|
if p == 0 {
|
||||||
p = PermDefault
|
p = PermDefault
|
||||||
}
|
}
|
||||||
|
@ -210,7 +210,7 @@ func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
||||||
//
|
//
|
||||||
// Returns false if the value is not a boolean.
|
// Returns false if the value is not a boolean.
|
||||||
func (l PersistentPolicyRulesConfig) NoSync() bool {
|
func (l PersistentPolicyRulesConfig) NoSync() bool {
|
||||||
return config.BoolSafe((*config.Config)(l.cfg), "no_sync")
|
return config.BoolSafe(l.cfg, "no_sync")
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompatibilityMode returns true if need to run node in compatibility with previous versions mode.
|
// CompatibilityMode returns true if need to run node in compatibility with previous versions mode.
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
|
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
@ -42,11 +43,12 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
||||||
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
|
||||||
if cacheSize > 0 {
|
if cacheSize > 0 && c.cfgMorph.cacheTTL > 0 {
|
||||||
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
|
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
|
||||||
}
|
}
|
||||||
|
|
||||||
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
||||||
|
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
|
||||||
|
|
||||||
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
||||||
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
||||||
|
@ -56,7 +58,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
||||||
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
||||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
containerService.NewSplitterService(
|
||||||
|
c.cfgContainer.containerBatchSize, c.respSvc,
|
||||||
|
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
service = containerService.NewAuditService(service, c.log, c.audit)
|
service = containerService.NewAuditService(service, c.log, c.audit)
|
||||||
|
@ -218,6 +222,7 @@ type morphContainerReader struct {
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||||
|
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
||||||
return x.lister.ContainersOf(id)
|
return x.lister.ContainersOf(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
|
||||||
|
return x.lister.IterateContainersOf(id, processCID)
|
||||||
|
}
|
||||||
|
|
||||||
type morphContainerWriter struct {
|
type morphContainerWriter struct {
|
||||||
neoClient *cntClient.Client
|
neoClient *cntClient.Client
|
||||||
}
|
}
|
||||||
|
|
|
@ -151,7 +151,7 @@ func makeNotaryDeposit(ctx context.Context, c *cfg) (util.Uint256, uint32, error
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error {
|
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error {
|
||||||
if err := c.cfgMorph.client.WaitTxHalt(ctx, client.InvokeRes{Hash: tx, VUB: vub}); err != nil {
|
if err := c.cfgMorph.client.WaitTxHalt(ctx, vub, tx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,7 @@ func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.setControlNetmapStatus(control.NetmapStatus(ctrlNetSt))
|
s.setControlNetmapStatus(ctrlNetSt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sets the current node state to the given value. Subsequent cfg.bootstrap
|
// sets the current node state to the given value. Subsequent cfg.bootstrap
|
||||||
|
@ -423,7 +423,7 @@ func (c *cfg) updateNetMapState(ctx context.Context, stateSetter func(*nmClient.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return c.cfgNetmap.wrapper.Morph().WaitTxHalt(ctx, res)
|
return c.cfgNetmap.wrapper.Morph().WaitTxHalt(ctx, res.VUB, res.Hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
type netInfo struct {
|
type netInfo struct {
|
||||||
|
|
|
@ -215,8 +215,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
prm.MarkAsGarbage(addr)
|
prm.MarkAsGarbage(addr)
|
||||||
prm.WithForceRemoval()
|
prm.WithForceRemoval()
|
||||||
|
|
||||||
_, err := ls.Inhume(ctx, prm)
|
return ls.Inhume(ctx, prm)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
|
||||||
|
@ -266,8 +265,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
var inhumePrm engine.InhumePrm
|
var inhumePrm engine.InhumePrm
|
||||||
inhumePrm.MarkAsGarbage(addr)
|
inhumePrm.MarkAsGarbage(addr)
|
||||||
|
|
||||||
_, err := ls.Inhume(ctx, inhumePrm)
|
if err := ls.Inhume(ctx, inhumePrm); err != nil {
|
||||||
if err != nil {
|
|
||||||
c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
|
c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
@ -476,8 +474,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
|
||||||
|
|
||||||
prm.WithTarget(tombstone, addrs...)
|
prm.WithTarget(tombstone, addrs...)
|
||||||
|
|
||||||
_, err := e.engine.Inhume(ctx, prm)
|
return e.engine.Inhume(ctx, prm)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {
|
||||||
|
|
|
@ -83,6 +83,9 @@ FROSTFS_POLICER_HEAD_TIMEOUT=15s
|
||||||
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
||||||
FROSTFS_REPLICATOR_POOL_SIZE=10
|
FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||||
|
|
||||||
|
# Container service section
|
||||||
|
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
|
||||||
|
|
||||||
# Object service section
|
# Object service section
|
||||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||||
|
|
|
@ -124,6 +124,11 @@
|
||||||
"pool_size": 10,
|
"pool_size": 10,
|
||||||
"put_timeout": "15s"
|
"put_timeout": "15s"
|
||||||
},
|
},
|
||||||
|
"container": {
|
||||||
|
"list_stream": {
|
||||||
|
"batch_size": "500"
|
||||||
|
}
|
||||||
|
},
|
||||||
"object": {
|
"object": {
|
||||||
"delete": {
|
"delete": {
|
||||||
"tombstone_lifetime": 10
|
"tombstone_lifetime": 10
|
||||||
|
|
|
@ -79,7 +79,8 @@ contracts: # side chain NEOFS contract script hashes; optional, override values
|
||||||
|
|
||||||
morph:
|
morph:
|
||||||
dial_timeout: 30s # timeout for side chain NEO RPC client connection
|
dial_timeout: 30s # timeout for side chain NEO RPC client connection
|
||||||
cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls). Negative value disables caching.
|
cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls).
|
||||||
|
# Negative value disables caching. A zero value sets the default value.
|
||||||
# Default value: block time. It is recommended to have this value less or equal to block time.
|
# Default value: block time. It is recommended to have this value less or equal to block time.
|
||||||
# Cached entities: containers, container lists, eACL tables.
|
# Cached entities: containers, container lists, eACL tables.
|
||||||
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
|
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
|
||||||
|
@ -108,6 +109,10 @@ replicator:
|
||||||
put_timeout: 15s # timeout for the Replicator PUT remote operation
|
put_timeout: 15s # timeout for the Replicator PUT remote operation
|
||||||
pool_size: 10 # maximum amount of concurrent replications
|
pool_size: 10 # maximum amount of concurrent replications
|
||||||
|
|
||||||
|
container:
|
||||||
|
list_stream:
|
||||||
|
batch_size: 500 # container_batch_size is the maximum amount of containers to send via stream at once
|
||||||
|
|
||||||
object:
|
object:
|
||||||
delete:
|
delete:
|
||||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||||
|
|
|
@ -42,7 +42,6 @@
|
||||||
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
|
||||||
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet01.json",
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet01.json",
|
||||||
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
||||||
|
@ -98,7 +97,6 @@
|
||||||
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
|
||||||
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet02.json",
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet02.json",
|
||||||
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8082",
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8082",
|
||||||
|
@ -154,7 +152,6 @@
|
||||||
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
|
||||||
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet03.json",
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet03.json",
|
||||||
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8084",
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8084",
|
||||||
|
@ -210,7 +207,6 @@
|
||||||
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
|
||||||
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet04.json",
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet04.json",
|
||||||
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8086",
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8086",
|
||||||
|
|
|
@ -95,19 +95,15 @@ $ git push origin ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
|
||||||
|
|
||||||
## Post-release
|
## Post-release
|
||||||
|
|
||||||
### Prepare and push images to a Docker Hub (if not automated)
|
### Prepare and push images to a Docker registry (automated)
|
||||||
|
|
||||||
Create Docker images for all applications and push them into Docker Hub
|
Create Docker images for all applications and push them into container registry
|
||||||
(requires [organization](https://hub.docker.com/u/truecloudlab) privileges)
|
(executed automatically in Forgejo Actions upon pushing a release tag):
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
$ git checkout ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
|
$ git checkout ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
|
||||||
$ make images
|
$ make images
|
||||||
$ docker push truecloudlab/frostfs-storage:${FROSTFS_REVISION}
|
$ make push-images
|
||||||
$ docker push truecloudlab/frostfs-storage-testnet:${FROSTFS_REVISION}
|
|
||||||
$ docker push truecloudlab/frostfs-ir:${FROSTFS_REVISION}
|
|
||||||
$ docker push truecloudlab/frostfs-cli:${FROSTFS_REVISION}
|
|
||||||
$ docker push truecloudlab/frostfs-adm:${FROSTFS_REVISION}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Make a proper release (if not automated)
|
### Make a proper release (if not automated)
|
||||||
|
|
32
go.mod
32
go.mod
|
@ -8,7 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250109084609-328d214d2d76
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
|
@ -27,7 +27,7 @@ require (
|
||||||
github.com/klauspost/compress v1.17.4
|
github.com/klauspost/compress v1.17.4
|
||||||
github.com/mailru/easyjson v0.7.7
|
github.com/mailru/easyjson v0.7.7
|
||||||
github.com/mr-tron/base58 v1.2.0
|
github.com/mr-tron/base58 v1.2.0
|
||||||
github.com/multiformats/go-multiaddr v0.12.1
|
github.com/multiformats/go-multiaddr v0.14.0
|
||||||
github.com/nspcc-dev/neo-go v0.106.3
|
github.com/nspcc-dev/neo-go v0.106.3
|
||||||
github.com/olekukonko/tablewriter v0.0.5
|
github.com/olekukonko/tablewriter v0.0.5
|
||||||
github.com/panjf2000/ants/v2 v2.9.0
|
github.com/panjf2000/ants/v2 v2.9.0
|
||||||
|
@ -40,15 +40,15 @@ require (
|
||||||
github.com/ssgreg/journald v1.0.0
|
github.com/ssgreg/journald v1.0.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
go.etcd.io/bbolt v1.3.10
|
go.etcd.io/bbolt v1.3.10
|
||||||
go.opentelemetry.io/otel v1.28.0
|
go.opentelemetry.io/otel v1.31.0
|
||||||
go.opentelemetry.io/otel/trace v1.28.0
|
go.opentelemetry.io/otel/trace v1.31.0
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
|
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
|
||||||
golang.org/x/sync v0.7.0
|
golang.org/x/sync v0.10.0
|
||||||
golang.org/x/sys v0.22.0
|
golang.org/x/sys v0.28.0
|
||||||
golang.org/x/term v0.21.0
|
golang.org/x/term v0.27.0
|
||||||
google.golang.org/grpc v1.66.2
|
google.golang.org/grpc v1.69.2
|
||||||
google.golang.org/protobuf v1.34.2
|
google.golang.org/protobuf v1.36.1
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -119,15 +119,15 @@ require (
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
|
||||||
go.opentelemetry.io/otel/metric v1.28.0 // indirect
|
go.opentelemetry.io/otel/metric v1.31.0 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
|
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
|
||||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/crypto v0.24.0 // indirect
|
golang.org/x/crypto v0.31.0 // indirect
|
||||||
golang.org/x/net v0.26.0 // indirect
|
golang.org/x/net v0.30.0 // indirect
|
||||||
golang.org/x/text v0.16.0 // indirect
|
golang.org/x/text v0.21.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||||
lukechampine.com/blake3 v1.2.1 // indirect
|
lukechampine.com/blake3 v1.2.1 // indirect
|
||||||
rsc.io/tmplfunc v0.0.3 // indirect
|
rsc.io/tmplfunc v0.0.3 // indirect
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -146,7 +146,6 @@ const (
|
||||||
ClientCantGetBlockchainHeight = "can't get blockchain height"
|
ClientCantGetBlockchainHeight = "can't get blockchain height"
|
||||||
ClientCantGetBlockchainHeight243 = "can't get blockchain height"
|
ClientCantGetBlockchainHeight243 = "can't get blockchain height"
|
||||||
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
|
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
|
||||||
EventCouldNotStartListenToEvents = "could not start listen to events"
|
|
||||||
EventStopEventListenerByError = "stop event listener by error"
|
EventStopEventListenerByError = "stop event listener by error"
|
||||||
EventStopEventListenerByContext = "stop event listener by context"
|
EventStopEventListenerByContext = "stop event listener by context"
|
||||||
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
|
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
|
||||||
|
@ -384,7 +383,6 @@ const (
|
||||||
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
|
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
|
||||||
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
|
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
|
||||||
FrostFSNodeConfigurationReading = "configuration reading"
|
FrostFSNodeConfigurationReading = "configuration reading"
|
||||||
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
|
|
||||||
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
||||||
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
||||||
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
||||||
|
|
|
@ -12,6 +12,7 @@ type ApplicationInfo struct {
|
||||||
func NewApplicationInfo(version string) *ApplicationInfo {
|
func NewApplicationInfo(version string) *ApplicationInfo {
|
||||||
appInfo := &ApplicationInfo{
|
appInfo := &ApplicationInfo{
|
||||||
versionValue: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
versionValue: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
Name: "app_info",
|
Name: "app_info",
|
||||||
Help: "General information about the application.",
|
Help: "General information about the application.",
|
||||||
}, []string{"version"}),
|
}, []string{"version"}),
|
||||||
|
|
|
@ -31,9 +31,7 @@ type RPCActorProvider interface {
|
||||||
type ProxyVerificationContractStorage struct {
|
type ProxyVerificationContractStorage struct {
|
||||||
rpcActorProvider RPCActorProvider
|
rpcActorProvider RPCActorProvider
|
||||||
|
|
||||||
acc *wallet.Account
|
cosigners []actor.SignerAccount
|
||||||
|
|
||||||
proxyScriptHash util.Uint160
|
|
||||||
|
|
||||||
policyScriptHash util.Uint160
|
policyScriptHash util.Uint160
|
||||||
}
|
}
|
||||||
|
@ -41,12 +39,27 @@ type ProxyVerificationContractStorage struct {
|
||||||
var _ ProxyAdaptedContractStorage = (*ProxyVerificationContractStorage)(nil)
|
var _ ProxyAdaptedContractStorage = (*ProxyVerificationContractStorage)(nil)
|
||||||
|
|
||||||
func NewProxyVerificationContractStorage(rpcActorProvider RPCActorProvider, key *keys.PrivateKey, proxyScriptHash, policyScriptHash util.Uint160) *ProxyVerificationContractStorage {
|
func NewProxyVerificationContractStorage(rpcActorProvider RPCActorProvider, key *keys.PrivateKey, proxyScriptHash, policyScriptHash util.Uint160) *ProxyVerificationContractStorage {
|
||||||
|
acc := wallet.NewAccountFromPrivateKey(key)
|
||||||
return &ProxyVerificationContractStorage{
|
return &ProxyVerificationContractStorage{
|
||||||
rpcActorProvider: rpcActorProvider,
|
rpcActorProvider: rpcActorProvider,
|
||||||
|
|
||||||
acc: wallet.NewAccountFromPrivateKey(key),
|
cosigners: []actor.SignerAccount{
|
||||||
|
{
|
||||||
proxyScriptHash: proxyScriptHash,
|
Signer: transaction.Signer{
|
||||||
|
Account: proxyScriptHash,
|
||||||
|
Scopes: transaction.CustomContracts,
|
||||||
|
AllowedContracts: []util.Uint160{policyScriptHash},
|
||||||
|
},
|
||||||
|
Account: notary.FakeContractAccount(proxyScriptHash),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Signer: transaction.Signer{
|
||||||
|
Account: acc.Contract.ScriptHash(),
|
||||||
|
Scopes: transaction.CalledByEntry,
|
||||||
|
},
|
||||||
|
Account: acc,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
policyScriptHash: policyScriptHash,
|
policyScriptHash: policyScriptHash,
|
||||||
}
|
}
|
||||||
|
@ -64,7 +77,7 @@ func (n *contractStorageActorAdapter) GetRPCInvoker() invoker.RPCInvoke {
|
||||||
|
|
||||||
func (contractStorage *ProxyVerificationContractStorage) newContractStorageActor() (policy_morph.ContractStorageActor, error) {
|
func (contractStorage *ProxyVerificationContractStorage) newContractStorageActor() (policy_morph.ContractStorageActor, error) {
|
||||||
rpcActor := contractStorage.rpcActorProvider.GetRPCActor()
|
rpcActor := contractStorage.rpcActorProvider.GetRPCActor()
|
||||||
act, err := actor.New(rpcActor, cosigners(contractStorage.acc, contractStorage.proxyScriptHash, contractStorage.policyScriptHash))
|
act, err := actor.New(rpcActor, contractStorage.cosigners)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -98,31 +111,16 @@ func (contractStorage *ProxyVerificationContractStorage) RemoveMorphRuleChain(na
|
||||||
|
|
||||||
// ListMorphRuleChains lists morph rule chains from Policy contract using both Proxy contract and storage account as consigners.
|
// ListMorphRuleChains lists morph rule chains from Policy contract using both Proxy contract and storage account as consigners.
|
||||||
func (contractStorage *ProxyVerificationContractStorage) ListMorphRuleChains(name chain.Name, target engine.Target) ([]*chain.Chain, error) {
|
func (contractStorage *ProxyVerificationContractStorage) ListMorphRuleChains(name chain.Name, target engine.Target) ([]*chain.Chain, error) {
|
||||||
// contractStorageActor is reconstructed per each method invocation because RPCActor's (that is, basically, WSClient) connection may get invalidated, but
|
rpcActor := contractStorage.rpcActorProvider.GetRPCActor()
|
||||||
// ProxyVerificationContractStorage does not manage reconnections.
|
inv := &invokerAdapter{Invoker: invoker.New(rpcActor, nil), rpcInvoker: rpcActor}
|
||||||
contractStorageActor, err := contractStorage.newContractStorageActor()
|
return policy_morph.NewContractStorageReader(inv, contractStorage.policyScriptHash).ListMorphRuleChains(name, target)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return policy_morph.NewContractStorage(contractStorageActor, contractStorage.policyScriptHash).ListMorphRuleChains(name, target)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func cosigners(acc *wallet.Account, proxyScriptHash, policyScriptHash util.Uint160) []actor.SignerAccount {
|
type invokerAdapter struct {
|
||||||
return []actor.SignerAccount{
|
*invoker.Invoker
|
||||||
{
|
rpcInvoker invoker.RPCInvoke
|
||||||
Signer: transaction.Signer{
|
|
||||||
Account: proxyScriptHash,
|
|
||||||
Scopes: transaction.CustomContracts,
|
|
||||||
AllowedContracts: []util.Uint160{policyScriptHash},
|
|
||||||
},
|
|
||||||
Account: notary.FakeContractAccount(proxyScriptHash),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Signer: transaction.Signer{
|
|
||||||
Account: acc.Contract.ScriptHash(),
|
|
||||||
Scopes: transaction.CalledByEntry,
|
|
||||||
},
|
|
||||||
Account: acc,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *invokerAdapter) GetRPCInvoker() invoker.RPCInvoke {
|
||||||
|
return n.rpcInvoker
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,7 @@ import (
|
||||||
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
||||||
alphaSync event.Handler,
|
alphaSync event.Handler,
|
||||||
) error {
|
) error {
|
||||||
locodeValidator, err := s.newLocodeValidator(cfg)
|
locodeValidator := s.newLocodeValidator(cfg)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
netSettings := (*networkSettings)(s.netmapClient)
|
netSettings := (*networkSettings)(s.netmapClient)
|
||||||
|
|
||||||
|
@ -51,6 +48,7 @@ func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
||||||
poolSize := cfg.GetInt("workers.netmap")
|
poolSize := cfg.GetInt("workers.netmap")
|
||||||
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
|
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
|
||||||
|
|
||||||
|
var err error
|
||||||
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
Metrics: s.irMetrics,
|
Metrics: s.irMetrics,
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, error) {
|
func (s *Server) newLocodeValidator(cfg *viper.Viper) netmap.NodeValidator {
|
||||||
locodeDB := locodebolt.New(locodebolt.Prm{
|
locodeDB := locodebolt.New(locodebolt.Prm{
|
||||||
Path: cfg.GetString("locode.db.path"),
|
Path: cfg.GetString("locode.db.path"),
|
||||||
},
|
},
|
||||||
|
@ -21,7 +21,7 @@ func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, err
|
||||||
|
|
||||||
return irlocode.New(irlocode.Prm{
|
return irlocode.New(irlocode.Prm{
|
||||||
DB: (*locodeBoltDBWrapper)(locodeDB),
|
DB: (*locodeBoltDBWrapper)(locodeDB),
|
||||||
}), nil
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type locodeBoltEntryWrapper struct {
|
type locodeBoltEntryWrapper struct {
|
||||||
|
|
|
@ -129,7 +129,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't determine DB size: %w", err)
|
return fmt.Errorf("determine DB size: %w", err)
|
||||||
}
|
}
|
||||||
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
|
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
|
||||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
|
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||||
|
@ -140,7 +140,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
|
||||||
return saveItemsCount(tx, items)
|
return saveItemsCount(tx, items)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
|
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||||
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err)
|
return fmt.Errorf("save blobovnicza's size and items count: %w", err)
|
||||||
}
|
}
|
||||||
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
|
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,7 +146,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
|
||||||
if prm.ignoreErrors {
|
if prm.ignoreErrors {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decode address key: %w", err)
|
return fmt.Errorf("decode address key: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errObjectIsDeleteProtected = errors.New("object is delete protected")
|
var (
|
||||||
|
errObjectIsDeleteProtected = errors.New("object is delete protected")
|
||||||
|
deleteRes = common.DeleteRes{}
|
||||||
|
)
|
||||||
|
|
||||||
// Delete deletes object from blobovnicza tree.
|
// Delete deletes object from blobovnicza tree.
|
||||||
//
|
//
|
||||||
|
@ -43,17 +46,17 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if b.readOnly {
|
if b.readOnly {
|
||||||
return common.DeleteRes{}, common.ErrReadOnly
|
return deleteRes, common.ErrReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.rebuildGuard.TryRLock() {
|
if b.rebuildGuard.TryRLock() {
|
||||||
defer b.rebuildGuard.RUnlock()
|
defer b.rebuildGuard.RUnlock()
|
||||||
} else {
|
} else {
|
||||||
return common.DeleteRes{}, errRebuildInProgress
|
return deleteRes, errRebuildInProgress
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.deleteProtectedObjects.Contains(prm.Address) {
|
if b.deleteProtectedObjects.Contains(prm.Address) {
|
||||||
return common.DeleteRes{}, errObjectIsDeleteProtected
|
return deleteRes, errObjectIsDeleteProtected
|
||||||
}
|
}
|
||||||
|
|
||||||
var bPrm blobovnicza.DeletePrm
|
var bPrm blobovnicza.DeletePrm
|
||||||
|
@ -98,7 +101,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
|
|
||||||
if err == nil && !objectFound {
|
if err == nil && !objectFound {
|
||||||
// not found in any blobovnicza
|
// not found in any blobovnicza
|
||||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return deleteRes, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
success = err == nil
|
success = err == nil
|
||||||
|
|
||||||
|
@ -112,7 +115,7 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
|
||||||
shBlz := b.getBlobovnicza(ctx, blzPath)
|
shBlz := b.getBlobovnicza(ctx, blzPath)
|
||||||
blz, err := shBlz.Open(ctx)
|
blz, err := shBlz.Open(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.DeleteRes{}, err
|
return deleteRes, err
|
||||||
}
|
}
|
||||||
defer shBlz.Close(ctx)
|
defer shBlz.Close(ctx)
|
||||||
|
|
||||||
|
@ -122,5 +125,5 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
|
||||||
// removes object from blobovnicza and returns common.DeleteRes.
|
// removes object from blobovnicza and returns common.DeleteRes.
|
||||||
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
|
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
|
||||||
_, err := blz.Delete(ctx, prm)
|
_, err := blz.Delete(ctx, prm)
|
||||||
return common.DeleteRes{}, err
|
return deleteRes, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,13 +115,13 @@ func (b *Blobovniczas) getObject(ctx context.Context, blz *blobovnicza.Blobovnic
|
||||||
// decompress the data
|
// decompress the data
|
||||||
data, err := b.compression.Decompress(res.Object())
|
data, err := b.compression.Decompress(res.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmarshal the object
|
// unmarshal the object
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return common.GetRes{Object: obj, RawData: data}, nil
|
return common.GetRes{Object: obj, RawData: data}, nil
|
||||||
|
|
|
@ -130,13 +130,13 @@ func (b *Blobovniczas) getObjectRange(ctx context.Context, blz *blobovnicza.Blob
|
||||||
// decompress the data
|
// decompress the data
|
||||||
data, err := b.compression.Decompress(res.Object())
|
data, err := b.compression.Decompress(res.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
return common.GetRangeRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmarshal the object
|
// unmarshal the object
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
return common.GetRangeRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
from := prm.Range.GetOffset()
|
from := prm.Range.GetOffset()
|
||||||
|
|
|
@ -49,7 +49,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
||||||
zap.String("root_path", b.rootPath))
|
zap.String("root_path", b.rootPath))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not decompress object data: %w", err)
|
return fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.Handler != nil {
|
if prm.Handler != nil {
|
||||||
|
@ -82,7 +82,7 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
|
||||||
zap.String("root_path", b.rootPath))
|
zap.String("root_path", b.rootPath))
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
return false, fmt.Errorf("open blobovnicza %s: %w", p, err)
|
||||||
}
|
}
|
||||||
defer shBlz.Close(ctx)
|
defer shBlz.Close(ctx)
|
||||||
|
|
||||||
|
@ -249,6 +249,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||||
|
|
|
@ -69,10 +69,10 @@ func (b *sharedDB) Open(ctx context.Context) (*blobovnicza.Blobovnicza, error) {
|
||||||
)...)
|
)...)
|
||||||
|
|
||||||
if err := blz.Open(ctx); err != nil {
|
if err := blz.Open(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
|
return nil, fmt.Errorf("open blobovnicza %s: %w", b.path, err)
|
||||||
}
|
}
|
||||||
if err := blz.Init(ctx); err != nil {
|
if err := blz.Init(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
|
return nil, fmt.Errorf("init blobovnicza %s: %w", b.path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.refCount++
|
b.refCount++
|
||||||
|
@ -127,7 +127,7 @@ func (b *sharedDB) CloseAndRemoveFile(ctx context.Context) error {
|
||||||
zap.String("id", b.path),
|
zap.String("id", b.path),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
|
return fmt.Errorf("close blobovnicza (path = %s): %w", b.path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.refCount = 0
|
b.refCount = 0
|
||||||
|
|
|
@ -538,7 +538,7 @@ func (t *FSTree) countFiles() (uint64, uint64, error) {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
return 0, 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return count, size, nil
|
return count, size, nil
|
||||||
|
@ -577,7 +577,7 @@ func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
return 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
|
||||||
}
|
}
|
||||||
success = true
|
success = true
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
|
@ -136,6 +136,6 @@ func (w *genericWriter) removeWithCounter(p string, size uint64) error {
|
||||||
if err := os.Remove(p); err != nil {
|
if err := os.Remove(p); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.fileCounter.Dec(uint64(size))
|
w.fileCounter.Dec(size)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,10 +69,13 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
written := 0
|
||||||
tmpPath := "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10)
|
tmpPath := "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10)
|
||||||
n, err := unix.Write(fd, data)
|
n, err := unix.Write(fd, data)
|
||||||
if err == nil {
|
for err == nil {
|
||||||
if n == len(data) {
|
written += n
|
||||||
|
|
||||||
|
if written == len(data) {
|
||||||
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.fileCounter.Inc(uint64(len(data)))
|
w.fileCounter.Inc(uint64(len(data)))
|
||||||
|
@ -80,9 +83,23 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
|
||||||
if errors.Is(err, unix.EEXIST) {
|
if errors.Is(err, unix.EEXIST) {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
} else {
|
break
|
||||||
err = errors.New("incomplete write")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// From man 2 write:
|
||||||
|
// https://www.man7.org/linux/man-pages/man2/write.2.html
|
||||||
|
//
|
||||||
|
// Note that a successful write() may transfer fewer than count
|
||||||
|
// bytes. Such partial writes can occur for various reasons; for
|
||||||
|
// example, because there was insufficient space on the disk device
|
||||||
|
// to write all of the requested bytes, or because a blocked write()
|
||||||
|
// to a socket, pipe, or similar was interrupted by a signal handler
|
||||||
|
// after it had transferred some, but before it had transferred all
|
||||||
|
// of the requested bytes. In the event of a partial write, the
|
||||||
|
// caller can make another write() call to transfer the remaining
|
||||||
|
// bytes. The subsequent call will either transfer further bytes or
|
||||||
|
// may result in an error (e.g., if the disk is now full).
|
||||||
|
n, err = unix.Write(fd, data[written:])
|
||||||
}
|
}
|
||||||
errClose := unix.Close(fd)
|
errClose := unix.Close(fd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -114,7 +131,7 @@ func (w *linuxWriter) removeFile(p string, size uint64) error {
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.fileCounter.Dec(uint64(size))
|
w.fileCounter.Dec(size)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
//go:build linux && integration
|
||||||
|
|
||||||
|
package fstree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestENOSPC(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp(t.TempDir(), "ramdisk")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
f, err := os.CreateTemp(t.TempDir(), "ramdisk_*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = unix.Mount(f.Name(), dir, "tmpfs", 0, "size=1M")
|
||||||
|
if errors.Is(err, unix.EPERM) {
|
||||||
|
t.Skipf("skip size tests: no permission to mount: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, unix.Unmount(dir, 0))
|
||||||
|
}()
|
||||||
|
|
||||||
|
fst := New(WithPath(dir), WithDepth(1))
|
||||||
|
require.NoError(t, fst.Open(mode.ComponentReadWrite))
|
||||||
|
require.NoError(t, fst.Init())
|
||||||
|
|
||||||
|
_, err = fst.Put(context.Background(), common.PutPrm{
|
||||||
|
RawData: make([]byte, 10<<20),
|
||||||
|
})
|
||||||
|
require.ErrorIs(t, err, common.ErrNoSpace)
|
||||||
|
}
|
|
@ -47,13 +47,13 @@ func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes,
|
||||||
// Decompress the data.
|
// Decompress the data.
|
||||||
var err error
|
var err error
|
||||||
if data, err = s.compression.Decompress(data); err != nil {
|
if data, err = s.compression.Decompress(data); err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal the SDK object.
|
// Unmarshal the SDK object.
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return common.GetRes{Object: obj, RawData: data}, nil
|
return common.GetRes{Object: obj, RawData: data}, nil
|
||||||
|
@ -133,11 +133,11 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
|
||||||
elem := common.IterationElement{
|
elem := common.IterationElement{
|
||||||
ObjectData: v,
|
ObjectData: v,
|
||||||
}
|
}
|
||||||
if err := elem.Address.DecodeString(string(k)); err != nil {
|
if err := elem.Address.DecodeString(k); err != nil {
|
||||||
if req.IgnoreErrors {
|
if req.IgnoreErrors {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err))
|
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, k, err))
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
|
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
|
||||||
|
|
|
@ -27,7 +27,7 @@ func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't set blobstor mode (old=%s, new=%s): %w", b.mode, m, err)
|
return fmt.Errorf("set blobstor mode (old=%s, new=%s): %w", b.mode, m, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.mode = m
|
b.mode = m
|
||||||
|
|
|
@ -52,7 +52,7 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
|
||||||
// marshal object
|
// marshal object
|
||||||
data, err := prm.Object.Marshal()
|
data, err := prm.Object.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.PutRes{}, fmt.Errorf("could not marshal the object: %w", err)
|
return common.PutRes{}, fmt.Errorf("marshal the object: %w", err)
|
||||||
}
|
}
|
||||||
prm.RawData = data
|
prm.RawData = data
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,8 +48,8 @@ func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm)
|
||||||
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
|
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.containerSize(ctx, prm)
|
res = e.containerSize(ctx, prm)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -69,7 +69,7 @@ func ContainerSize(ctx context.Context, e *StorageEngine, id cid.ID) (uint64, er
|
||||||
return res.Size(), nil
|
return res.Size(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) {
|
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes) {
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
var csPrm shard.ContainerSizePrm
|
var csPrm shard.ContainerSizePrm
|
||||||
csPrm.SetContainerID(prm.cnr)
|
csPrm.SetContainerID(prm.cnr)
|
||||||
|
@ -96,8 +96,8 @@ func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm)
|
||||||
defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
|
defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.listContainers(ctx)
|
res = e.listContainers(ctx)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -115,7 +115,7 @@ func ListContainers(ctx context.Context, e *StorageEngine) ([]cid.ID, error) {
|
||||||
return res.Containers(), nil
|
return res.Containers(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes, error) {
|
func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes {
|
||||||
uniqueIDs := make(map[string]cid.ID)
|
uniqueIDs := make(map[string]cid.ID)
|
||||||
|
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
|
@ -142,5 +142,5 @@ func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes,
|
||||||
|
|
||||||
return ListContainersRes{
|
return ListContainersRes{
|
||||||
containers: result,
|
containers: result,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
err := eg.Wait()
|
err := eg.Wait()
|
||||||
close(errCh)
|
close(errCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to initialize shards: %w", err)
|
return fmt.Errorf("initialize shards: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for res := range errCh {
|
for res := range errCh {
|
||||||
|
@ -117,7 +117,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
|
return fmt.Errorf("initialize shard %s: %w", res.id, res.err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,7 +320,7 @@ loop:
|
||||||
for _, newID := range shardsToAdd {
|
for _, newID := range shardsToAdd {
|
||||||
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
return fmt.Errorf("add new shard with '%s' metabase path: %w", newID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
idStr := sh.ID().String()
|
idStr := sh.ID().String()
|
||||||
|
@ -331,13 +331,13 @@ loop:
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = sh.Close(ctx)
|
_ = sh.Close(ctx)
|
||||||
return fmt.Errorf("could not init %s shard: %w", idStr, err)
|
return fmt.Errorf("init %s shard: %w", idStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = e.addShard(sh)
|
err = e.addShard(sh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = sh.Close(ctx)
|
_ = sh.Close(ctx)
|
||||||
return fmt.Errorf("could not add %s shard: %w", idStr, err)
|
return fmt.Errorf("add %s shard: %w", idStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr))
|
e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr))
|
||||||
|
|
|
@ -24,9 +24,6 @@ type DeletePrm struct {
|
||||||
forceRemoval bool
|
forceRemoval bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRes groups the resulting values of Delete operation.
|
|
||||||
type DeleteRes struct{}
|
|
||||||
|
|
||||||
// WithAddress is a Delete option to set the addresses of the objects to delete.
|
// WithAddress is a Delete option to set the addresses of the objects to delete.
|
||||||
//
|
//
|
||||||
// Option is required.
|
// Option is required.
|
||||||
|
@ -51,7 +48,7 @@ func (p *DeletePrm) WithForceRemoval() {
|
||||||
// NOTE: Marks any object to be deleted (despite any prohibitions
|
// NOTE: Marks any object to be deleted (despite any prohibitions
|
||||||
// on operations with that object) if WithForceRemoval option has
|
// on operations with that object) if WithForceRemoval option has
|
||||||
// been provided.
|
// been provided.
|
||||||
func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRes, err error) {
|
func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", prm.addr.EncodeToString()),
|
attribute.String("address", prm.addr.EncodeToString()),
|
||||||
|
@ -60,15 +57,12 @@ func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRe
|
||||||
defer span.End()
|
defer span.End()
|
||||||
defer elapsed("Delete", e.metrics.AddMethodDuration)()
|
defer elapsed("Delete", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
return e.execIfNotBlocked(func() error {
|
||||||
res, err = e.delete(ctx, prm)
|
return e.delete(ctx, prm)
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) error {
|
||||||
var locked struct {
|
var locked struct {
|
||||||
is bool
|
is bool
|
||||||
}
|
}
|
||||||
|
@ -126,14 +120,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
})
|
})
|
||||||
|
|
||||||
if locked.is {
|
if locked.is {
|
||||||
return DeleteRes{}, new(apistatus.ObjectLocked)
|
return new(apistatus.ObjectLocked)
|
||||||
}
|
}
|
||||||
|
|
||||||
if splitInfo != nil {
|
if splitInfo != nil {
|
||||||
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
|
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
|
||||||
}
|
}
|
||||||
|
|
||||||
return DeleteRes{}, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {
|
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {
|
||||||
|
|
|
@ -70,8 +70,7 @@ func TestDeleteBigObject(t *testing.T) {
|
||||||
deletePrm.WithForceRemoval()
|
deletePrm.WithForceRemoval()
|
||||||
deletePrm.WithAddress(addrParent)
|
deletePrm.WithAddress(addrParent)
|
||||||
|
|
||||||
_, err := e.Delete(context.Background(), deletePrm)
|
require.NoError(t, e.Delete(context.Background(), deletePrm))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
||||||
|
@ -141,8 +140,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
|
||||||
deletePrm.WithForceRemoval()
|
deletePrm.WithForceRemoval()
|
||||||
deletePrm.WithAddress(addrParent)
|
deletePrm.WithAddress(addrParent)
|
||||||
|
|
||||||
_, err := e.Delete(context.Background(), deletePrm)
|
require.NoError(t, e.Delete(context.Background(), deletePrm))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
||||||
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
||||||
|
@ -153,7 +151,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
|
||||||
// delete physical
|
// delete physical
|
||||||
var delPrm shard.DeletePrm
|
var delPrm shard.DeletePrm
|
||||||
delPrm.SetAddresses(addrParent)
|
delPrm.SetAddresses(addrParent)
|
||||||
_, err = s1.Delete(context.Background(), delPrm)
|
_, err := s1.Delete(context.Background(), delPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
delPrm.SetAddresses(addrLink)
|
delPrm.SetAddresses(addrLink)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -255,8 +256,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro
|
||||||
copyShards := func() []pooledShard {
|
copyShards := func() []pooledShard {
|
||||||
mtx.RLock()
|
mtx.RLock()
|
||||||
defer mtx.RUnlock()
|
defer mtx.RUnlock()
|
||||||
t := make([]pooledShard, len(shards))
|
t := slices.Clone(shards)
|
||||||
copy(t, shards)
|
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
|
@ -578,7 +578,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
|
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
|
||||||
if prm.TreeHandler == nil {
|
if prm.TreeHandler == nil {
|
||||||
return false, "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
return false, "", fmt.Errorf("evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
||||||
|
@ -724,7 +724,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
||||||
shards := make([]pooledShard, 0, len(e.shards))
|
shards := make([]pooledShard, 0, len(e.shards))
|
||||||
for id := range e.shards {
|
for id := range e.shards {
|
||||||
shards = append(shards, pooledShard{
|
shards = append(shards, pooledShard{
|
||||||
hashedShard: hashedShard(e.shards[id]),
|
hashedShard: e.shards[id],
|
||||||
pool: e.shardPools[id],
|
pool: e.shardPools[id],
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package engine
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -123,8 +124,7 @@ func (s *EvacuationState) DeepCopy() *EvacuationState {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
shardIDs := make([]string, len(s.shardIDs))
|
shardIDs := slices.Clone(s.shardIDs)
|
||||||
copy(shardIDs, s.shardIDs)
|
|
||||||
|
|
||||||
return &EvacuationState{
|
return &EvacuationState{
|
||||||
shardIDs: shardIDs,
|
shardIDs: shardIDs,
|
||||||
|
|
|
@ -27,9 +27,6 @@ type InhumePrm struct {
|
||||||
forceRemoval bool
|
forceRemoval bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// InhumeRes encapsulates results of inhume operation.
|
|
||||||
type InhumeRes struct{}
|
|
||||||
|
|
||||||
// WithTarget sets a list of objects that should be inhumed and tombstone address
|
// WithTarget sets a list of objects that should be inhumed and tombstone address
|
||||||
// as the reason for inhume operation.
|
// as the reason for inhume operation.
|
||||||
//
|
//
|
||||||
|
@ -67,23 +64,20 @@ var errInhumeFailure = errors.New("inhume operation failed")
|
||||||
// with that object) if WithForceRemoval option has been provided.
|
// with that object) if WithForceRemoval option has been provided.
|
||||||
//
|
//
|
||||||
// Returns an error if executions are blocked (see BlockExecution).
|
// Returns an error if executions are blocked (see BlockExecution).
|
||||||
func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) {
|
func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume")
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
|
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
return e.execIfNotBlocked(func() error {
|
||||||
res, err = e.inhume(ctx, prm)
|
return e.inhume(ctx, prm)
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) error {
|
||||||
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
|
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return InhumeRes{}, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var shPrm shard.InhumePrm
|
var shPrm shard.InhumePrm
|
||||||
|
@ -107,7 +101,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
|
||||||
zap.String("shard_id", shardID),
|
zap.String("shard_id", shardID),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
||||||
)
|
)
|
||||||
return InhumeRes{}, errInhumeFailure
|
return errInhumeFailure
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := sh.Inhume(ctx, shPrm); err != nil {
|
if _, err := sh.Inhume(ctx, shPrm); err != nil {
|
||||||
|
@ -119,11 +113,11 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
|
||||||
default:
|
default:
|
||||||
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
|
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
|
||||||
}
|
}
|
||||||
return InhumeRes{}, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return InhumeRes{}, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// groupObjectsByShard groups objects based on the shard(s) they are stored on.
|
// groupObjectsByShard groups objects based on the shard(s) they are stored on.
|
||||||
|
|
|
@ -55,7 +55,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
||||||
|
@ -85,7 +85,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
||||||
|
@ -128,7 +128,7 @@ func TestStorageEngine_ECInhume(t *testing.T) {
|
||||||
|
|
||||||
var inhumePrm InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
var alreadyRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
@ -173,7 +173,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
|
|
||||||
var prm InhumePrm
|
var prm InhumePrm
|
||||||
prm.WithTarget(ts, object.AddressOf(obj))
|
prm.WithTarget(ts, object.AddressOf(obj))
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
err := engine.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
|
||||||
|
|
||||||
var prm InhumePrm
|
var prm InhumePrm
|
||||||
prm.MarkAsGarbage(object.AddressOf(obj))
|
prm.MarkAsGarbage(object.AddressOf(obj))
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
err := engine.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
||||||
prm.WithTarget(ts, addrs...)
|
prm.WithTarget(ts, addrs...)
|
||||||
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
_, err := engine.Inhume(context.Background(), prm)
|
err := engine.Inhume(context.Background(), prm)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,7 +114,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
// 4.
|
// 4.
|
||||||
|
@ -127,7 +127,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
|
|
||||||
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
||||||
|
|
||||||
// 5.
|
// 5.
|
||||||
|
@ -136,7 +136,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
return err == nil
|
return err == nil
|
||||||
}, 30*time.Second, time.Second)
|
}, 30*time.Second, time.Second)
|
||||||
}
|
}
|
||||||
|
@ -200,7 +200,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
|
@ -212,7 +212,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
return err == nil
|
return err == nil
|
||||||
}, 30*time.Second, time.Second)
|
}, 30*time.Second, time.Second)
|
||||||
}
|
}
|
||||||
|
@ -270,12 +270,12 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
// 4.
|
// 4.
|
||||||
|
@ -283,13 +283,12 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
deletePrm.WithAddress(objectcore.AddressOf(lock))
|
deletePrm.WithAddress(objectcore.AddressOf(lock))
|
||||||
deletePrm.WithForceRemoval()
|
deletePrm.WithForceRemoval()
|
||||||
|
|
||||||
_, err = e.Delete(context.Background(), deletePrm)
|
require.NoError(t, e.Delete(context.Background(), deletePrm))
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// 5.
|
// 5.
|
||||||
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
||||||
|
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,34 +7,12 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetricRegister interface {
|
type (
|
||||||
AddMethodDuration(method string, d time.Duration)
|
MetricRegister = metrics.EngineMetrics
|
||||||
|
GCMetrics = metrics.GCMetrics
|
||||||
SetObjectCounter(shardID, objectType string, v uint64)
|
WriteCacheMetrics = metrics.WriteCacheMetrics
|
||||||
AddToObjectCounter(shardID, objectType string, delta int)
|
NullBool = metrics.NullBool
|
||||||
|
)
|
||||||
SetMode(shardID string, mode mode.Mode)
|
|
||||||
|
|
||||||
AddToContainerSize(cnrID string, size int64)
|
|
||||||
DeleteContainerSize(cnrID string)
|
|
||||||
DeleteContainerCount(cnrID string)
|
|
||||||
AddToPayloadCounter(shardID string, size int64)
|
|
||||||
IncErrorCounter(shardID string)
|
|
||||||
ClearErrorCounter(shardID string)
|
|
||||||
DeleteShardMetrics(shardID string)
|
|
||||||
|
|
||||||
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
|
||||||
IncContainerObjectCounter(shardID, contID, objectType string)
|
|
||||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
|
||||||
|
|
||||||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
|
||||||
SetRefillPercent(shardID, path string, percent uint32)
|
|
||||||
SetRefillStatus(shardID, path, status string)
|
|
||||||
SetEvacuationInProgress(shardID string, value bool)
|
|
||||||
|
|
||||||
WriteCache() metrics.WriteCacheMetrics
|
|
||||||
GC() metrics.GCMetrics
|
|
||||||
}
|
|
||||||
|
|
||||||
func elapsed(method string, addFunc func(method string, d time.Duration)) func() {
|
func elapsed(method string, addFunc func(method string, d time.Duration)) func() {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
@ -77,8 +55,8 @@ type (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
_ MetricRegister = noopMetrics{}
|
_ MetricRegister = noopMetrics{}
|
||||||
_ metrics.WriteCacheMetrics = noopWriteCacheMetrics{}
|
_ WriteCacheMetrics = noopWriteCacheMetrics{}
|
||||||
_ metrics.GCMetrics = noopGCMetrics{}
|
_ GCMetrics = noopGCMetrics{}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (noopMetrics) AddMethodDuration(string, time.Duration) {}
|
func (noopMetrics) AddMethodDuration(string, time.Duration) {}
|
||||||
|
@ -99,8 +77,8 @@ func (noopMetrics) IncRefillObjectsCount(string, string, int, bool) {}
|
||||||
func (noopMetrics) SetRefillPercent(string, string, uint32) {}
|
func (noopMetrics) SetRefillPercent(string, string, uint32) {}
|
||||||
func (noopMetrics) SetRefillStatus(string, string, string) {}
|
func (noopMetrics) SetRefillStatus(string, string, string) {}
|
||||||
func (noopMetrics) SetEvacuationInProgress(string, bool) {}
|
func (noopMetrics) SetEvacuationInProgress(string, bool) {}
|
||||||
func (noopMetrics) WriteCache() metrics.WriteCacheMetrics { return noopWriteCacheMetrics{} }
|
func (noopMetrics) WriteCache() WriteCacheMetrics { return noopWriteCacheMetrics{} }
|
||||||
func (noopMetrics) GC() metrics.GCMetrics { return noopGCMetrics{} }
|
func (noopMetrics) GC() GCMetrics { return noopGCMetrics{} }
|
||||||
|
|
||||||
func (noopWriteCacheMetrics) AddMethodDuration(string, string, string, string, bool, time.Duration) {}
|
func (noopWriteCacheMetrics) AddMethodDuration(string, string, string, string, bool, time.Duration) {}
|
||||||
func (noopWriteCacheMetrics) SetActualCount(string, string, string, uint64) {}
|
func (noopWriteCacheMetrics) SetActualCount(string, string, string, uint64) {}
|
||||||
|
|
|
@ -54,19 +54,17 @@ func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRe
|
||||||
defer elapsed("Select", e.metrics.AddMethodDuration)()
|
defer elapsed("Select", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e._select(ctx, prm)
|
res = e._select(ctx, prm)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes {
|
||||||
addrList := make([]oid.Address, 0)
|
addrList := make([]oid.Address, 0)
|
||||||
uniqueMap := make(map[string]struct{})
|
uniqueMap := make(map[string]struct{})
|
||||||
|
|
||||||
var outError error
|
|
||||||
|
|
||||||
var shPrm shard.SelectPrm
|
var shPrm shard.SelectPrm
|
||||||
shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
|
shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
|
||||||
shPrm.SetFilters(prm.filters)
|
shPrm.SetFilters(prm.filters)
|
||||||
|
@ -90,7 +88,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
addrList: addrList,
|
addrList: addrList,
|
||||||
}, outError
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns `limit` available physically storage object addresses in engine.
|
// List returns `limit` available physically storage object addresses in engine.
|
||||||
|
@ -100,14 +98,14 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
|
||||||
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
|
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
|
||||||
defer elapsed("List", e.metrics.AddMethodDuration)()
|
defer elapsed("List", e.metrics.AddMethodDuration)()
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.list(ctx, limit)
|
res = e.list(ctx, limit)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) {
|
func (e *StorageEngine) list(ctx context.Context, limit uint64) SelectRes {
|
||||||
addrList := make([]oid.Address, 0, limit)
|
addrList := make([]oid.Address, 0, limit)
|
||||||
uniqueMap := make(map[string]struct{})
|
uniqueMap := make(map[string]struct{})
|
||||||
ln := uint64(0)
|
ln := uint64(0)
|
||||||
|
@ -136,7 +134,7 @@ func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, erro
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
addrList: addrList,
|
addrList: addrList,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select selects objects from local storage using provided filters.
|
// Select selects objects from local storage using provided filters.
|
||||||
|
|
|
@ -108,12 +108,12 @@ func (m *metricsWithID) SetEvacuationInProgress(value bool) {
|
||||||
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
||||||
sh, err := e.createShard(ctx, opts)
|
sh, err := e.createShard(ctx, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create a shard: %w", err)
|
return nil, fmt.Errorf("create a shard: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = e.addShard(sh)
|
err = e.addShard(sh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err)
|
return nil, fmt.Errorf("add %s shard: %w", sh.ID().String(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
|
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
|
||||||
|
@ -124,7 +124,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
|
||||||
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||||
id, err := generateShardID()
|
id, err := generateShardID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
return nil, fmt.Errorf("generate shard ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
opts = e.appendMetrics(id, opts)
|
opts = e.appendMetrics(id, opts)
|
||||||
|
@ -180,7 +180,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
||||||
|
|
||||||
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create pool: %w", err)
|
return fmt.Errorf("create pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
strID := sh.ID().String()
|
strID := sh.ID().String()
|
||||||
|
@ -272,7 +272,7 @@ func (e *StorageEngine) sortShards(objAddr interface{ EncodeToString() string })
|
||||||
h := hrw.StringHash(objAddr.EncodeToString())
|
h := hrw.StringHash(objAddr.EncodeToString())
|
||||||
shards := make([]hashedShard, 0, len(e.shards))
|
shards := make([]hashedShard, 0, len(e.shards))
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
shards = append(shards, hashedShard(sh))
|
shards = append(shards, sh)
|
||||||
}
|
}
|
||||||
hrw.SortHasherSliceByValue(shards, h)
|
hrw.SortHasherSliceByValue(shards, h)
|
||||||
return shards
|
return shards
|
||||||
|
@ -285,7 +285,7 @@ func (e *StorageEngine) unsortedShards() []hashedShard {
|
||||||
shards := make([]hashedShard, 0, len(e.shards))
|
shards := make([]hashedShard, 0, len(e.shards))
|
||||||
|
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
shards = append(shards, hashedShard(sh))
|
shards = append(shards, sh)
|
||||||
}
|
}
|
||||||
|
|
||||||
return shards
|
return shards
|
||||||
|
@ -374,7 +374,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
multiErrGuard.Lock()
|
multiErrGuard.Lock()
|
||||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
multiErr = errors.Join(multiErr, fmt.Errorf("change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
||||||
multiErrGuard.Unlock()
|
multiErrGuard.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,7 +385,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
multiErrGuard.Lock()
|
multiErrGuard.Lock()
|
||||||
multiErr = errors.Join(multiErr, fmt.Errorf("could not close removed shard (id:%s): %w", sh.ID(), err))
|
multiErr = errors.Join(multiErr, fmt.Errorf("close removed shard (id:%s): %w", sh.ID(), err))
|
||||||
multiErrGuard.Unlock()
|
multiErrGuard.Unlock()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -56,7 +56,7 @@ func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
func (db *DB) ContainerSize(id cid.ID) (uint64, error) {
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
@ -64,21 +64,22 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
||||||
return 0, ErrDegradedMode
|
return 0, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
var size uint64
|
||||||
size, err = db.containerSize(tx, id)
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
size = db.containerSize(tx, id)
|
||||||
|
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return size, metaerr.Wrap(err)
|
return size, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) {
|
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) uint64 {
|
||||||
containerVolume := tx.Bucket(containerVolumeBucketName)
|
containerVolume := tx.Bucket(containerVolumeBucketName)
|
||||||
key := make([]byte, cidSize)
|
key := make([]byte, cidSize)
|
||||||
id.Encode(key)
|
id.Encode(key)
|
||||||
|
|
||||||
return parseContainerSize(containerVolume.Get(key)), nil
|
return parseContainerSize(containerVolume.Get(key))
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
||||||
|
|
|
@ -54,7 +54,7 @@ func (db *DB) Open(ctx context.Context, m mode.Mode) error {
|
||||||
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
|
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
|
||||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
return fmt.Errorf("create dir %s for metabase: %w", db.info.Path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
db.log.Debug(ctx, logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
|
db.log.Debug(ctx, logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
|
||||||
|
@ -73,7 +73,7 @@ func (db *DB) openBolt(ctx context.Context) error {
|
||||||
|
|
||||||
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't open boltDB database: %w", err)
|
return fmt.Errorf("open boltDB database: %w", err)
|
||||||
}
|
}
|
||||||
db.boltDB.MaxBatchDelay = db.boltBatchDelay
|
db.boltDB.MaxBatchDelay = db.boltBatchDelay
|
||||||
db.boltDB.MaxBatchSize = db.boltBatchSize
|
db.boltDB.MaxBatchSize = db.boltBatchSize
|
||||||
|
@ -145,27 +145,27 @@ func (db *DB) init(reset bool) error {
|
||||||
if reset {
|
if reset {
|
||||||
err := tx.DeleteBucket(name)
|
err := tx.DeleteBucket(name)
|
||||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||||
return fmt.Errorf("could not delete static bucket %s: %w", k, err)
|
return fmt.Errorf("delete static bucket %s: %w", k, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := tx.CreateBucketIfNotExists(name)
|
_, err := tx.CreateBucketIfNotExists(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create static bucket %s: %w", k, err)
|
return fmt.Errorf("create static bucket %s: %w", k, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range deprecatedBuckets {
|
for _, b := range deprecatedBuckets {
|
||||||
err := tx.DeleteBucket(b)
|
err := tx.DeleteBucket(b)
|
||||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||||
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err)
|
return fmt.Errorf("delete deprecated bucket %s: %w", string(b), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reset { // counters will be recalculated by refill metabase
|
if !reset { // counters will be recalculated by refill metabase
|
||||||
err = syncCounter(tx, false)
|
err = syncCounter(tx, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not sync object counter: %w", err)
|
return fmt.Errorf("sync object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -238,26 +238,26 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
return fmt.Errorf("increase phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
return fmt.Errorf("increase logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
if isUserObject {
|
if isUserObject {
|
||||||
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase user object counter: %w", err)
|
return fmt.Errorf("increase user object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
b := tx.Bucket(shardInfoBucket)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.updateShardObjectCounterBucket(b, typ, delta, inc)
|
return db.updateShardObjectCounterBucket(b, typ, delta, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
|
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
|
||||||
|
@ -362,7 +362,7 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject
|
||||||
func syncCounter(tx *bbolt.Tx, force bool) error {
|
func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket)
|
shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
return fmt.Errorf("get shard info bucket: %w", err)
|
||||||
}
|
}
|
||||||
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
|
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
|
||||||
len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
|
len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
|
||||||
|
@ -375,7 +375,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
|
|
||||||
containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName)
|
containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get container counter bucket: %w", err)
|
return fmt.Errorf("get container counter bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
@ -428,7 +428,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not iterate objects: %w", err)
|
return fmt.Errorf("iterate objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
||||||
|
@ -448,7 +448,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
value := containerCounterValue(count)
|
value := containerCounterValue(count)
|
||||||
err := containerCounterB.Put(key, value)
|
err := containerCounterB.Put(key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update phy container object counter: %w", err)
|
return fmt.Errorf("update phy container object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
phyData := make([]byte, 8)
|
phyData := make([]byte, 8)
|
||||||
|
@ -456,7 +456,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
|
|
||||||
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update phy object counter: %w", err)
|
return fmt.Errorf("update phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logData := make([]byte, 8)
|
logData := make([]byte, 8)
|
||||||
|
@ -464,7 +464,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
|
|
||||||
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
return fmt.Errorf("update logic object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
userData := make([]byte, 8)
|
userData := make([]byte, 8)
|
||||||
|
@ -472,7 +472,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
|
|
||||||
err = shardInfoB.Put(objectUserCounterKey, userData)
|
err = shardInfoB.Put(objectUserCounterKey, userData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update user object counter: %w", err)
|
return fmt.Errorf("update user object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -492,7 +492,7 @@ func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
||||||
}
|
}
|
||||||
var cnrID cid.ID
|
var cnrID cid.ID
|
||||||
if err := cnrID.Decode(buf); err != nil {
|
if err := cnrID.Decode(buf); err != nil {
|
||||||
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
|
return cid.ID{}, fmt.Errorf("decode container ID: %w", err)
|
||||||
}
|
}
|
||||||
return cnrID, nil
|
return cnrID, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,28 +161,28 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
||||||
|
|
||||||
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
||||||
if res.phyCount > 0 {
|
if res.phyCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
|
err := db.decShardObjectCounter(tx, phy, res.phyCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease phy object counter: %w", err)
|
return fmt.Errorf("decrease phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.logicCount > 0 {
|
if res.logicCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
|
err := db.decShardObjectCounter(tx, logical, res.logicCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease logical object counter: %w", err)
|
return fmt.Errorf("decrease logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.userCount > 0 {
|
if res.userCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
|
err := db.decShardObjectCounter(tx, user, res.userCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not decrease user object counter: %w", err)
|
return fmt.Errorf("decrease user object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
||||||
return fmt.Errorf("could not decrease container object counter: %w", err)
|
return fmt.Errorf("decrease container object counter: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
if garbageBKT != nil {
|
if garbageBKT != nil {
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return deleteSingleResult{}, nil
|
return deleteSingleResult{}, nil
|
||||||
|
@ -280,7 +280,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
if garbageBKT != nil {
|
if garbageBKT != nil {
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,7 +308,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
// remove object
|
// remove object
|
||||||
err = db.deleteObject(tx, obj, false)
|
err = db.deleteObject(tx, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
||||||
|
@ -335,12 +335,12 @@ func (db *DB) deleteObject(
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, delListIndexItem)
|
err = updateListIndexes(tx, obj, delListIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't remove list indexes: %w", err)
|
return fmt.Errorf("remove list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
return fmt.Errorf("remove fake bucket tree indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if isParent {
|
if isParent {
|
||||||
|
@ -351,7 +351,7 @@ func (db *DB) deleteObject(
|
||||||
addrKey := addressKey(object.AddressOf(obj), key)
|
addrKey := addressKey(object.AddressOf(obj), key)
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
return fmt.Errorf("remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -529,7 +529,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
|
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,7 +567,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
|
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
@ -227,9 +226,9 @@ func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, e
|
||||||
|
|
||||||
splitInfo := objectSDK.NewSplitInfo()
|
splitInfo := objectSDK.NewSplitInfo()
|
||||||
|
|
||||||
err := splitInfo.Unmarshal(bytes.Clone(rawSplitInfo))
|
err := splitInfo.Unmarshal(rawSplitInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
return nil, fmt.Errorf("unmarshal split info from root index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return splitInfo, nil
|
return splitInfo, nil
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
@ -112,7 +111,7 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
|
||||||
// check in primary index
|
// check in primary index
|
||||||
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key)
|
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(bytes.Clone(data))
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
||||||
|
@ -123,13 +122,13 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
|
||||||
// if not found then check in tombstone index
|
// if not found then check in tombstone index
|
||||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(bytes.Clone(data))
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found then check in locker index
|
// if not found then check in locker index
|
||||||
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(bytes.Clone(data))
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found then check if object is a virtual
|
// if not found then check if object is a virtual
|
||||||
|
@ -185,9 +184,9 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
||||||
|
|
||||||
child := objectSDK.New()
|
child := objectSDK.New()
|
||||||
|
|
||||||
err = child.Unmarshal(bytes.Clone(data))
|
err = child.Unmarshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't unmarshal child with parent: %w", err)
|
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
par := child.Parent()
|
par := child.Parent()
|
||||||
|
@ -219,7 +218,7 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||||
objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
||||||
if len(objData) != 0 {
|
if len(objData) != 0 {
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(bytes.Clone(objData)); err != nil {
|
if err := obj.Unmarshal(objData); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
chunk := objectSDK.ECChunk{}
|
chunk := objectSDK.ECChunk{}
|
||||||
|
|
|
@ -177,7 +177,7 @@ type gcHandler struct {
|
||||||
func (g gcHandler) handleKV(k, _ []byte) error {
|
func (g gcHandler) handleKV(k, _ []byte) error {
|
||||||
o, err := garbageFromKV(k)
|
o, err := garbageFromKV(k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse garbage object: %w", err)
|
return fmt.Errorf("parse garbage object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return g.h(o)
|
return g.h(o)
|
||||||
|
@ -190,7 +190,7 @@ type graveyardHandler struct {
|
||||||
func (g graveyardHandler) handleKV(k, v []byte) error {
|
func (g graveyardHandler) handleKV(k, v []byte) error {
|
||||||
o, err := graveFromKV(k, v)
|
o, err := graveFromKV(k, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse grave: %w", err)
|
return fmt.Errorf("parse grave: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return g.h(o)
|
return g.h(o)
|
||||||
|
@ -240,7 +240,7 @@ func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *oid.Address)
|
||||||
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
||||||
err = decodeAddressFromKey(&res.addr, k)
|
err = decodeAddressFromKey(&res.addr, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("could not parse address: %w", err)
|
err = fmt.Errorf("parse address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -342,10 +342,10 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
if err := db.decShardObjectCounter(tx, logical, res.LogicInhumed()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
|
if err := db.decShardObjectCounter(tx, user, res.UserInhumed()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +373,7 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
|
||||||
if data != nil {
|
if data != nil {
|
||||||
err := targetBucket.Delete(tombKey)
|
err := targetBucket.Delete(tombKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -130,7 +129,7 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) e
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.ForEach(func(k, v []byte) error {
|
return b.ForEach(func(k, v []byte) error {
|
||||||
if oid.Decode(k) == nil && obj.Unmarshal(bytes.Clone(v)) == nil {
|
if oid.Decode(k) == nil && obj.Unmarshal(v) == nil {
|
||||||
return f(cid, oid, obj)
|
return f(cid, oid, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,7 +87,8 @@ type CountAliveObjectsInContainerPrm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWithCursor lists physical objects available in metabase starting from
|
// ListWithCursor lists physical objects available in metabase starting from
|
||||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
// cursor. Includes objects of all types. Does not include inhumed and expired
|
||||||
|
// objects.
|
||||||
// Use cursor value from response for consecutive requests.
|
// Use cursor value from response for consecutive requests.
|
||||||
//
|
//
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
|
@ -143,6 +144,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
||||||
|
|
||||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||||
|
|
||||||
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for ; name != nil; name, _ = c.Next() {
|
for ; name != nil; name, _ = c.Next() {
|
||||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
||||||
|
@ -167,7 +170,7 @@ loop:
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||||
result, count, cursor, threshold)
|
result, count, cursor, threshold, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -185,8 +188,7 @@ loop:
|
||||||
if offset != nil {
|
if offset != nil {
|
||||||
// new slice is much faster but less memory efficient
|
// new slice is much faster but less memory efficient
|
||||||
// we need to copy, because offset exists during bbolt tx
|
// we need to copy, because offset exists during bbolt tx
|
||||||
cursor.inBucketOffset = make([]byte, len(offset))
|
cursor.inBucketOffset = bytes.Clone(offset)
|
||||||
copy(cursor.inBucketOffset, offset)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result) == 0 {
|
if len(result) == 0 {
|
||||||
|
@ -195,8 +197,7 @@ loop:
|
||||||
|
|
||||||
// new slice is much faster but less memory efficient
|
// new slice is much faster but less memory efficient
|
||||||
// we need to copy, because bucketName exists during bbolt tx
|
// we need to copy, because bucketName exists during bbolt tx
|
||||||
cursor.bucketName = make([]byte, len(bucketName))
|
cursor.bucketName = bytes.Clone(bucketName)
|
||||||
copy(cursor.bucketName, bucketName)
|
|
||||||
|
|
||||||
return result, cursor, nil
|
return result, cursor, nil
|
||||||
}
|
}
|
||||||
|
@ -212,6 +213,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
limit int, // stop listing at `limit` items in result
|
limit int, // stop listing at `limit` items in result
|
||||||
cursor *Cursor, // start from cursor object
|
cursor *Cursor, // start from cursor object
|
||||||
threshold bool, // ignore cursor and start immediately
|
threshold bool, // ignore cursor and start immediately
|
||||||
|
currEpoch uint64,
|
||||||
) ([]objectcore.Info, []byte, *Cursor, error) {
|
) ([]objectcore.Info, []byte, *Cursor, error) {
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
|
@ -243,13 +245,19 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var o objectSDK.Object
|
||||||
|
if err := o.Unmarshal(v); err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||||
|
if !objectLocked(bkt.Tx(), cnt, obj) && hasExpEpoch && expEpoch < currEpoch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var isLinkingObj bool
|
var isLinkingObj bool
|
||||||
var ecInfo *objectcore.ECInfo
|
var ecInfo *objectcore.ECInfo
|
||||||
if objType == objectSDK.TypeRegular {
|
if objType == objectSDK.TypeRegular {
|
||||||
var o objectSDK.Object
|
|
||||||
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
|
||||||
return nil, nil, nil, err
|
|
||||||
}
|
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = isLinkObject(&o)
|
||||||
ecHeader := o.ECHeader()
|
ecHeader := o.ECHeader()
|
||||||
if ecHeader != nil {
|
if ecHeader != nil {
|
||||||
|
@ -413,7 +421,7 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, p
|
||||||
var ecInfo *objectcore.ECInfo
|
var ecInfo *objectcore.ECInfo
|
||||||
if prm.ObjectType == objectSDK.TypeRegular {
|
if prm.ObjectType == objectSDK.TypeRegular {
|
||||||
var o objectSDK.Object
|
var o objectSDK.Object
|
||||||
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
|
if err := o.Unmarshal(v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = isLinkObject(&o)
|
||||||
|
|
|
@ -3,14 +3,17 @@ package meta_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -71,14 +74,16 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
|
||||||
func TestLisObjectsWithCursor(t *testing.T) {
|
func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
db := newDB(t)
|
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
currEpoch = 100
|
||||||
|
expEpoch = currEpoch - 1
|
||||||
containers = 5
|
containers = 5
|
||||||
total = containers * 4 // regular + ts + child + lock
|
total = containers * 6 // regular + ts + child + lock + non-expired regular + locked expired
|
||||||
)
|
)
|
||||||
|
|
||||||
|
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||||
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
expected := make([]object.Info, 0, total)
|
expected := make([]object.Info, 0, total)
|
||||||
|
|
||||||
// fill metabase with objects
|
// fill metabase with objects
|
||||||
|
@ -127,6 +132,26 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
err = putBig(db, child)
|
err = putBig(db, child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||||
|
|
||||||
|
// add expired object (do not include into expected)
|
||||||
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||||
|
require.NoError(t, metaPut(db, obj, nil))
|
||||||
|
|
||||||
|
// add non-expired object (include into expected)
|
||||||
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(currEpoch))
|
||||||
|
require.NoError(t, metaPut(db, obj, nil))
|
||||||
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||||
|
|
||||||
|
// add locked expired object (include into expected)
|
||||||
|
obj = testutil.GenerateObjectWithCID(containerID)
|
||||||
|
objID := oidtest.ID()
|
||||||
|
obj.SetID(objID)
|
||||||
|
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||||
|
require.NoError(t, metaPut(db, obj, nil))
|
||||||
|
require.NoError(t, db.Lock(context.Background(), containerID, oidtest.ID(), []oid.ID{objID}))
|
||||||
|
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("success with various count", func(t *testing.T) {
|
t.Run("success with various count", func(t *testing.T) {
|
||||||
|
|
|
@ -19,7 +19,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
|
|
||||||
if !db.mode.NoMetabase() {
|
if !db.mode.NoMetabase() {
|
||||||
if err := db.Close(ctx); err != nil {
|
if err := db.Close(ctx); err != nil {
|
||||||
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
err = db.Init(ctx)
|
err = db.Init(ctx)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -180,18 +179,18 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
|
|
||||||
err := putUniqueIndexes(tx, obj, si, id)
|
err := putUniqueIndexes(tx, obj, si, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put unique indexes: %w", err)
|
return fmt.Errorf("put unique indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, putListIndexItem)
|
err = updateListIndexes(tx, obj, putListIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put list indexes: %w", err)
|
return fmt.Errorf("put list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if indexAttributes {
|
if indexAttributes {
|
||||||
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
return fmt.Errorf("put fake bucket tree indexes: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,7 +249,7 @@ func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, ad
|
||||||
}
|
}
|
||||||
rawObject, err := obj.CutPayload().Marshal()
|
rawObject, err := obj.CutPayload().Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't marshal object header: %w", err)
|
return fmt.Errorf("marshal object header: %w", err)
|
||||||
}
|
}
|
||||||
return putUniqueIndexItem(tx, namedBucketItem{
|
return putUniqueIndexItem(tx, namedBucketItem{
|
||||||
name: bucketName,
|
name: bucketName,
|
||||||
|
@ -320,7 +319,7 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []
|
||||||
return si.Marshal()
|
return si.Marshal()
|
||||||
default:
|
default:
|
||||||
oldSI := objectSDK.NewSplitInfo()
|
oldSI := objectSDK.NewSplitInfo()
|
||||||
if err := oldSI.Unmarshal(bytes.Clone(old)); err != nil {
|
if err := oldSI.Unmarshal(old); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
si = util.MergeSplitInfo(si, oldSI)
|
si = util.MergeSplitInfo(si, oldSI)
|
||||||
|
@ -475,7 +474,7 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
|
||||||
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := update(bkt.Get(item.key), item.val)
|
data, err := update(bkt.Get(item.key), item.val)
|
||||||
|
@ -492,12 +491,12 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
return fmt.Errorf("create fake bucket tree index %v: %w", item.key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return fkbtRoot.Put(item.val, zeroValue)
|
return fkbtRoot.Put(item.val, zeroValue)
|
||||||
|
@ -506,19 +505,19 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lst, err := decodeList(bkt.Get(item.key))
|
lst, err := decodeList(bkt.Get(item.key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
|
return fmt.Errorf("decode leaf list %v: %w", item.key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lst = append(lst, item.val)
|
lst = append(lst, item.val)
|
||||||
|
|
||||||
encodedLst, err := encodeList(lst)
|
encodedLst, err := encodeList(lst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
|
return fmt.Errorf("encode leaf list %v: %w", item.key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return bkt.Put(item.key, encodedLst)
|
return bkt.Put(item.key, encodedLst)
|
||||||
|
|
|
@ -565,7 +565,7 @@ func groupFilters(filters objectSDK.SearchFilters, useAttributeIndex bool) (filt
|
||||||
case v2object.FilterHeaderContainerID: // support deprecated field
|
case v2object.FilterHeaderContainerID: // support deprecated field
|
||||||
err := res.cnr.DecodeString(filters[i].Value())
|
err := res.cnr.DecodeString(filters[i].Value())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return filterGroup{}, fmt.Errorf("can't parse container id: %w", err)
|
return filterGroup{}, fmt.Errorf("parse container id: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res.withCnrFilter = true
|
res.withCnrFilter = true
|
||||||
|
|
|
@ -32,13 +32,13 @@ func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.openDB(ctx, mode); err != nil {
|
if err := db.openDB(ctx, mode); err != nil {
|
||||||
return nil, fmt.Errorf("failed to open metabase: %w", err)
|
return nil, fmt.Errorf("open metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := db.readShardID()
|
id, err := db.readShardID()
|
||||||
|
|
||||||
if cErr := db.close(); cErr != nil {
|
if cErr := db.close(); cErr != nil {
|
||||||
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
|
||||||
}
|
}
|
||||||
|
|
||||||
return id, metaerr.Wrap(err)
|
return id, metaerr.Wrap(err)
|
||||||
|
@ -70,7 +70,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.openDB(ctx, mode); err != nil {
|
if err := db.openDB(ctx, mode); err != nil {
|
||||||
return fmt.Errorf("failed to open metabase: %w", err)
|
return fmt.Errorf("open metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.writeShardID(id)
|
err := db.writeShardID(id)
|
||||||
|
@ -79,7 +79,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
|
||||||
}
|
}
|
||||||
|
|
||||||
if cErr := db.close(); cErr != nil {
|
if cErr := db.close(); cErr != nil {
|
||||||
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr))
|
err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
|
||||||
}
|
}
|
||||||
|
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
|
|
|
@ -35,7 +35,7 @@ func (r StorageIDRes) StorageID() []byte {
|
||||||
|
|
||||||
// StorageID returns storage descriptor for objects from the blobstor.
|
// StorageID returns storage descriptor for objects from the blobstor.
|
||||||
// It is put together with the object can makes get/delete operation faster.
|
// It is put together with the object can makes get/delete operation faster.
|
||||||
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) {
|
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (StorageIDRes, error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
@ -53,32 +53,32 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
var res StorageIDRes
|
||||||
if db.mode.NoMetabase() {
|
if db.mode.NoMetabase() {
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.id, err = db.storageID(tx, prm.addr)
|
res.id = db.storageID(tx, prm.addr)
|
||||||
|
return nil
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
|
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) []byte {
|
||||||
key := make([]byte, bucketKeySize)
|
key := make([]byte, bucketKeySize)
|
||||||
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
|
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
|
||||||
if smallBucket == nil {
|
if smallBucket == nil {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
storageID := smallBucket.Get(objectKey(addr.Object(), key))
|
storageID := smallBucket.Get(objectKey(addr.Object(), key))
|
||||||
if storageID == nil {
|
if storageID == nil {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytes.Clone(storageID), nil
|
return bytes.Clone(storageID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
|
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
|
||||||
|
|
|
@ -95,7 +95,7 @@ func compactDB(db *bbolt.DB) error {
|
||||||
NoSync: true,
|
NoSync: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't open new metabase to compact: %w", err)
|
return fmt.Errorf("open new metabase to compact: %w", err)
|
||||||
}
|
}
|
||||||
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
|
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
|
||||||
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
|
||||||
|
@ -292,7 +292,7 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i
|
||||||
}
|
}
|
||||||
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
|
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
return fmt.Errorf("parse expiration epoch: %w", err)
|
||||||
}
|
}
|
||||||
expirationEpochBucket := b.Bucket(attrValue)
|
expirationEpochBucket := b.Bucket(attrValue)
|
||||||
attrKeyValueC := expirationEpochBucket.Cursor()
|
attrKeyValueC := expirationEpochBucket.Cursor()
|
||||||
|
@ -399,7 +399,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
attr, ok := attributeFromAttributeBucket(key)
|
attr, ok := attributeFromAttributeBucket(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
return nil, fmt.Errorf("parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
|
||||||
}
|
}
|
||||||
if !IsAtrributeIndexed(attr) {
|
if !IsAtrributeIndexed(attr) {
|
||||||
keysToDrop = append(keysToDrop, key)
|
keysToDrop = append(keysToDrop, key)
|
||||||
|
@ -407,7 +407,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
|
||||||
}
|
}
|
||||||
contID, ok := cidFromAttributeBucket(key)
|
contID, ok := cidFromAttributeBucket(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
return nil, fmt.Errorf("parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
|
||||||
}
|
}
|
||||||
info, err := cs.Info(contID)
|
info, err := cs.Info(contID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -231,11 +231,11 @@ func parseExpirationEpochKey(key []byte) (uint64, cid.ID, oid.ID, error) {
|
||||||
epoch := binary.BigEndian.Uint64(key)
|
epoch := binary.BigEndian.Uint64(key)
|
||||||
var cnr cid.ID
|
var cnr cid.ID
|
||||||
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
|
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
|
||||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (container ID): %w", err)
|
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (container ID): %w", err)
|
||||||
}
|
}
|
||||||
var obj oid.ID
|
var obj oid.ID
|
||||||
if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
|
if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
|
||||||
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (object ID): %w", err)
|
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (object ID): %w", err)
|
||||||
}
|
}
|
||||||
return epoch, cnr, obj, nil
|
return epoch, cnr, obj, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
|
||||||
|
|
||||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create auxiliary bucket: %w", err)
|
return fmt.Errorf("create auxiliary bucket: %w", err)
|
||||||
}
|
}
|
||||||
return b.Put(versionKey, data)
|
return b.Put(versionKey, data)
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (t *boltForest) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
|
return fmt.Errorf("set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.mode = m
|
t.mode = m
|
||||||
|
@ -128,7 +128,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
|
||||||
readOnly := m.ReadOnly()
|
readOnly := m.ReadOnly()
|
||||||
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
|
return metaerr.Wrap(fmt.Errorf("create dir %s for the pilorama: %w", t.path, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := *bbolt.DefaultOptions
|
opts := *bbolt.DefaultOptions
|
||||||
|
@ -139,7 +139,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
|
||||||
|
|
||||||
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
t.db, err = bbolt.Open(t.path, t.perm, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(fmt.Errorf("can't open the pilorama DB: %w", err))
|
return metaerr.Wrap(fmt.Errorf("open the pilorama DB: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
t.db.MaxBatchSize = t.maxBatchSize
|
t.db.MaxBatchSize = t.maxBatchSize
|
||||||
|
@ -419,10 +419,7 @@ func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID stri
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
i, node, err := t.getPathPrefix(bTree, attr, path)
|
i, node := t.getPathPrefix(bTree, attr, path)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
|
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||||
lm = make([]Move, len(path)-i+1)
|
lm = make([]Move, len(path)-i+1)
|
||||||
|
@ -980,10 +977,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
|
||||||
|
|
||||||
b := treeRoot.Bucket(dataBucket)
|
b := treeRoot.Bucket(dataBucket)
|
||||||
|
|
||||||
i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
|
i, curNodes := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if i < len(path)-1 {
|
if i < len(path)-1 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1360,7 +1354,7 @@ func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, err
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err))
|
return nil, metaerr.Wrap(fmt.Errorf("list trees: %w", err))
|
||||||
}
|
}
|
||||||
success = true
|
success = true
|
||||||
return ids, nil
|
return ids, nil
|
||||||
|
@ -1504,7 +1498,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
|
||||||
|
|
||||||
var contID cidSDK.ID
|
var contID cidSDK.ID
|
||||||
if err := contID.Decode(k[:32]); err != nil {
|
if err := contID.Decode(k[:32]); err != nil {
|
||||||
return fmt.Errorf("failed to decode containerID: %w", err)
|
return fmt.Errorf("decode container ID: %w", err)
|
||||||
}
|
}
|
||||||
res.Items = append(res.Items, ContainerIDTreeID{
|
res.Items = append(res.Items, ContainerIDTreeID{
|
||||||
CID: contID,
|
CID: contID,
|
||||||
|
@ -1512,8 +1506,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
|
||||||
})
|
})
|
||||||
|
|
||||||
if len(res.Items) == batchSize {
|
if len(res.Items) == batchSize {
|
||||||
res.NextPageToken = make([]byte, len(k))
|
res.NextPageToken = bytes.Clone(k)
|
||||||
copy(res.NextPageToken, k)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1526,7 +1519,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) {
|
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node) {
|
||||||
c := bTree.Cursor()
|
c := bTree.Cursor()
|
||||||
|
|
||||||
var curNodes []Node
|
var curNodes []Node
|
||||||
|
@ -1549,14 +1542,14 @@ func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr strin
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(nextNodes) == 0 {
|
if len(nextNodes) == 0 {
|
||||||
return i, curNodes, nil
|
return i, curNodes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(path), nextNodes, nil
|
return len(path), nextNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
|
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node) {
|
||||||
c := bTree.Cursor()
|
c := bTree.Cursor()
|
||||||
|
|
||||||
var curNode Node
|
var curNode Node
|
||||||
|
@ -1576,10 +1569,10 @@ loop:
|
||||||
childKey, value = c.Next()
|
childKey, value = c.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
return i, curNode, nil
|
return i, curNode
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(path), curNode, nil
|
return len(path), curNode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
|
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -84,8 +85,7 @@ func (f *memoryForest) TreeAddByPath(_ context.Context, d CIDDescriptor, treeID
|
||||||
s.operations = append(s.operations, op)
|
s.operations = append(s.operations, op)
|
||||||
}
|
}
|
||||||
|
|
||||||
mCopy := make([]KeyValue, len(m))
|
mCopy := slices.Clone(m)
|
||||||
copy(mCopy, m)
|
|
||||||
op := s.do(&Move{
|
op := s.do(&Move{
|
||||||
Parent: node,
|
Parent: node,
|
||||||
Meta: Meta{
|
Meta: Meta{
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||||
|
|
||||||
size, err := s.metaBase.ContainerSize(prm.cnr)
|
size, err := s.metaBase.ContainerSize(prm.cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ContainerSizeRes{}, fmt.Errorf("could not get container size: %w", err)
|
return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ContainerSizeRes{
|
return ContainerSizeRes{
|
||||||
|
@ -71,7 +71,7 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
|
||||||
|
|
||||||
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err)
|
return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ContainerCountRes{
|
return ContainerCountRes{
|
||||||
|
|
|
@ -38,7 +38,7 @@ func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err err
|
||||||
|
|
||||||
err = s.SetMode(ctx, mode.DegradedReadOnly)
|
err = s.SetMode(ctx, mode.DegradedReadOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
return fmt.Errorf("switch to mode %s", mode.DegradedReadOnly)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
||||||
for j := i + 1; j < len(components); j++ {
|
for j := i + 1; j < len(components); j++ {
|
||||||
if err := components[j].Open(ctx, m); err != nil {
|
if err := components[j].Open(ctx, m); err != nil {
|
||||||
// Other components must be opened, fail.
|
// Other components must be opened, fail.
|
||||||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
return fmt.Errorf("open %T: %w", components[j], err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = s.handleMetabaseFailure(ctx, "open", err)
|
err = s.handleMetabaseFailure(ctx, "open", err)
|
||||||
|
@ -83,7 +83,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("could not open %T: %w", component, err)
|
return fmt.Errorf("open %T: %w", component, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -184,7 +184,7 @@ func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("could not initialize %T: %w", component, err)
|
return fmt.Errorf("initialize %T: %w", component, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -205,7 +205,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
|
|
||||||
err := s.metaBase.Reset()
|
err := s.metaBase.Reset()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not reset metabase: %w", err)
|
return fmt.Errorf("reset metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
withCount := true
|
withCount := true
|
||||||
|
@ -254,12 +254,12 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
|
|
||||||
err = errors.Join(egErr, itErr)
|
err = errors.Join(egErr, itErr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not put objects to the meta: %w", err)
|
return fmt.Errorf("put objects to the meta: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.metaBase.SyncCounters()
|
err = s.metaBase.SyncCounters()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not sync object counters: %w", err)
|
return fmt.Errorf("sync object counters: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
success = true
|
success = true
|
||||||
|
@ -318,7 +318,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
|
||||||
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
var lock objectSDK.Lock
|
var lock objectSDK.Lock
|
||||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
return fmt.Errorf("unmarshal lock content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||||
|
@ -328,7 +328,7 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
err := s.metaBase.Lock(ctx, cnr, id, locked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not lock objects: %w", err)
|
return fmt.Errorf("lock objects: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -337,7 +337,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
tombstone := objectSDK.NewTombstone()
|
tombstone := objectSDK.NewTombstone()
|
||||||
|
|
||||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
return fmt.Errorf("unmarshal tombstone content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tombAddr := object.AddressOf(obj)
|
tombAddr := object.AddressOf(obj)
|
||||||
|
@ -358,7 +358,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
|
|
||||||
_, err := s.metaBase.Inhume(ctx, inhumePrm)
|
_, err := s.metaBase.Inhume(ctx, inhumePrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not inhume objects: %w", err)
|
return fmt.Errorf("inhume objects: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,7 +175,7 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
|
||||||
|
|
||||||
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
mExRes, err := s.metaBase.StorageID(ctx, mPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
return nil, true, fmt.Errorf("fetch blobovnicza id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
storageID := mExRes.StorageID()
|
storageID := mExRes.StorageID()
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||||
modeDegraded := s.GetMode().NoMetabase()
|
modeDegraded := s.GetMode().NoMetabase()
|
||||||
if !modeDegraded {
|
if !modeDegraded {
|
||||||
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
|
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
|
||||||
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
err = fmt.Errorf("read shard id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||||
|
|
||||||
if len(idFromMetabase) == 0 && !modeDegraded {
|
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||||
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
||||||
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
|
err = errors.Join(err, fmt.Errorf("write shard id to metabase: %w", setErr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|
|
@ -109,7 +109,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
|
||||||
|
|
||||||
lst, err := s.metaBase.Containers(ctx)
|
lst, err := s.metaBase.Containers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("can't list stored containers: %w", err)
|
return res, fmt.Errorf("list stored containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
filters := objectSDK.NewSearchFilters()
|
filters := objectSDK.NewSearchFilters()
|
||||||
|
@ -149,7 +149,7 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
|
||||||
|
|
||||||
containers, err := s.metaBase.Containers(ctx)
|
containers, err := s.metaBase.Containers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err)
|
return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ListContainersRes{
|
return ListContainersRes{
|
||||||
|
@ -180,7 +180,7 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
||||||
metaPrm.SetCursor(prm.cursor)
|
metaPrm.SetCursor(prm.cursor)
|
||||||
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
|
res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err)
|
return ListWithCursorRes{}, fmt.Errorf("get list of objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return ListWithCursorRes{
|
return ListWithCursorRes{
|
||||||
|
@ -208,7 +208,7 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
|
||||||
metaPrm.Handler = prm.Handler
|
metaPrm.Handler = prm.Handler
|
||||||
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not iterate over containers: %w", err)
|
return fmt.Errorf("iterate over containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -235,7 +235,7 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
||||||
metaPrm.Handler = prm.Handler
|
metaPrm.Handler = prm.Handler
|
||||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not iterate over objects: %w", err)
|
return fmt.Errorf("iterate over objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -258,7 +258,7 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
|
||||||
metaPrm.ContainerID = prm.ContainerID
|
metaPrm.ContainerID = prm.ContainerID
|
||||||
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
|
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
return 0, fmt.Errorf("count alive objects in bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return count, nil
|
return count, nil
|
||||||
|
|
|
@ -81,7 +81,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
|
|
||||||
res, err = s.blobStor.Put(ctx, putPrm)
|
res, err = s.blobStor.Put(ctx, putPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
return PutRes{}, fmt.Errorf("put object to BLOB storage: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// may we need to handle this case in a special way
|
// may we need to handle this case in a special way
|
||||||
// since the object has been successfully written to BlobStor
|
// since the object has been successfully written to BlobStor
|
||||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
return PutRes{}, fmt.Errorf("put object to metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Inserted {
|
if res.Inserted {
|
||||||
|
|
|
@ -67,7 +67,7 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
||||||
|
|
||||||
mRes, err := s.metaBase.Select(ctx, selectPrm)
|
mRes, err := s.metaBase.Select(ctx, selectPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return SelectRes{}, fmt.Errorf("could not select objects from metabase: %w", err)
|
return SelectRes{}, fmt.Errorf("select objects from metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
|
|
|
@ -94,7 +94,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
return metaerr.Wrap(c.initCounters())
|
c.initCounters()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
|
|
|
@ -30,7 +30,7 @@ func IterateDB(db *bbolt.DB, f func(oid.Address) error) error {
|
||||||
return b.ForEach(func(k, _ []byte) error {
|
return b.ForEach(func(k, _ []byte) error {
|
||||||
err := addr.DecodeString(string(k))
|
err := addr.DecodeString(string(k))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse object address: %w", err)
|
return fmt.Errorf("parse object address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return f(addr)
|
return f(addr)
|
||||||
|
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Reference in a new issue