Compare commits

...

58 commits

Author SHA1 Message Date
a788d44773 [#1570] cli: Use array type for attributes parameters
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-01-28 11:15:30 +03:00
603015d029 [#1570] cli: Use array type for --range parameter to object hash
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-01-28 11:15:30 +03:00
30e14d50ef
[#1612] Makefile: Update golangci-lint
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-01-24 11:58:21 +03:00
951a7ee1c7 [#1605] policer: Do not mutate slice under iteration
Nothing wrong with it, besides being difficult to read.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-21 05:34:54 +00:00
0bcbeb26b2 [#1605] policer: Simplify processRepNodes() checks
Current flow is hard to reason about, #1601 is a notorious example of
accidental complexity.
1. Remove multiple nested ifs, use depth=1.
2. Process each status exactly once, hopefully preventing bugs like
   #1601.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-21 05:34:54 +00:00
c98357606b
[#1606] Use slices.Clone()/bytes.Clone() where possible
gopatch:
```
@@
var from, to expression
@@
+import "bytes"
-to := make([]byte, len(from))
-copy(to, from)
+to := bytes.Clone(from)

@@
var from, to expression
@@
+import "bytes"
-to = make([]byte, len(from))
-copy(to, from)
+to = bytes.Clone(from)

@@
var from, to, typ expression
@@
+import "slices"
-to := make([]typ, len(from))
-copy(to, from)
+to := slices.Clone(from)

@@
var from, to, typ expression
@@
+import "slices"
-to = make([]typ, len(from))
-copy(to, from)
+to = slices.Clone(from)
```

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-17 14:50:14 +03:00
80de5d70bf [#1593] node: Fix initialization of ape_chain cache
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-01-17 08:58:47 +00:00
57efa0bc8e
[#1604] policer: Properly handle maintenance nodes
Consider `REP 1 REP 1` placement (selects/filters are omitted).
The placement is `[1, 2], [1, 0]`. We are the 0-th node.
Node 1 is under maintenance, so we do not replicate object
on the node 2. In the second replication group node 1 is under maintenance,
but current caching logic considers it as "replica holder" and removes
local copy. Voilà, we have DL if the object is missing from the node 1.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-16 16:37:52 +03:00
26e0c82fb8
[#1604] policer/test: Add test for MAINTENANCE runtime status
The node can have MAINTENANCE status in the network map, but can also be
ONLINE while responding with MAINTENANCE. These are 2 different code
paths, let's test them separately.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-16 16:37:16 +03:00
4538ccb12a
[#1604] policer: Do not process the same node twice
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-16 16:37:16 +03:00
84e1599997
[#1604] policer: Remove one-line helpers
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-16 16:37:16 +03:00
5a270e2e61
[#1604] policer: Use status instead of bool value in node cache
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-16 16:37:16 +03:00
436d65d784 [#1591] Build and host OCI images on our own infra
Similar to TrueCloudLab/frostfs-s3-gw#587
this PR introduces a CI pipeline that builds Docker images and pushes them
to our selfhosted registry.

Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2025-01-16 07:46:53 +00:00
c3c034ecca [#1601] util: Correctly parse 'root' name for container resources
* Convert `root/*` to `//`;
* Add unit-test case for parses to check parsing correctness.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-01-15 12:13:02 +00:00
05fd999162
[#1600] fstree: Handle incomplete writes
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-14 14:52:35 +03:00
eff95bd632
[#1598] engine: Drop unnecessary result structs
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-14 11:15:21 +03:00
fb928616cc
[#1598] golangci: Enable unparam linter
To drop unnecessary parameters and return values.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-14 09:06:47 +03:00
4d5ae59a52
[#1598] golangci: Enable unconvert linters
To drop unnecessary conversions.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-14 09:06:40 +03:00
a9f27e074b [#1243] object: Look for X-Headers within origin before APE check
* X-Headers can be found in `origin` field of `MetaHeader` if the request
  has been forwarded from non-container node.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-01-13 12:07:27 +00:00
6c51f48aab [#1596] metrics: Create public aliases for internal engine metrics
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-01-13 10:05:01 +00:00
a2485637bb
[#1593] node/config_example: Add description of morph/cache_ttl=0 behavior
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-01-10 15:13:10 +03:00
09faca034c
[#1593] node: Fix initialization of frostfsid cache
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-01-10 15:01:36 +03:00
ceac1c8709
[#1594] dev: Remove unused parameter 'FROSTFS_MORPH_INACTIVITY_TIMEOUT'
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-01-09 20:52:24 +03:00
f7e75b13b0 [#1506] ape_manager: Await tx persist before returning response
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 12:04:21 +00:00
198aaebc94 [#1506] morph: Simplify WaitTxHalt() signature
Avoid dependency on `morph/client` package because of `InvokeRes`.
Make signature resemble `WaitAny()` method of `waiter.Waiter` from neo-go.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 12:04:21 +00:00
85af6bcd5c [#1506] ape: Use contract reader in ListMorphRuleChains()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 12:04:21 +00:00
8a658de0b2 [#1506] ape: Do not create cosigners slice on each contract invocation
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 12:04:21 +00:00
3900b92927
Revert "[#1492] metabase: Ensure Unmarshal() is called on a cloned slice"
This reverts commit 8ed7a676d5.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 14:34:20 +03:00
5ccb3394b4
[#1592] go.mod: Update sdk-go
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 14:34:16 +03:00
dc410fca90 [#1590] adm: Accept many accounts in proxy-* commands
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 07:51:54 +00:00
cddcd73f04 [#1590] adm: Make --account flag required in proxy-* commands
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-09 07:51:54 +00:00
d7fcc5ce30 [#1586] objsvc: Allow to send search response in multiple messages
Previously, `ln` was only set once, so search has really worked for
small number of objects.

Fix panic:
```
panic: runtime error: slice bounds out of range [:43690] with capacity 21238
goroutine 6859775 [running]:
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object.(*searchStreamMsgSizeCtrl).Send(0xc001eec8d0, 0xc005734000)
        git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/transport_splitter.go:173 +0x1f0
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2.(*streamWriter).WriteIDs(0xc000520320, {0xc00eb1a000, 0x4fd9c, 0x7fd6475a9a68?})
        git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2/streamer.go:28 +0x155
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*uniqueIDWriter).WriteIDs(0xc001386420, {0xc00eb1a000?, 0xc0013ea9c0?, 0x113eef3?})
        git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/util.go:62 +0x202
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*execCtx).writeIDList(0xc00011aa38?, {0xc00eb1a000?, 0xc001eec9f0?, 0xc0008f4380?})
        git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/exec.go:68 +0x91
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*execCtx).executeLocal(0xc0008f4380, {0x176c538, 0xc001eec9f0})
        git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/local.go:18 +0x16b
```

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-28 12:29:22 +00:00
c0221d76e6 [#1577] node/container: Fix typo
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-28 12:05:01 +03:00
242f0095d0 [#1577] container: Reduce iterations through container list
* Separated iteration through container ids from `ContainersOf()`
  so that it could be reused.
* When listing containers we used to iterate through the
  the whole list of containers twice: first when reading from
  a contract, then when sending them. Now we can send batches
  of containers when reading from the contract.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-27 15:30:26 +03:00
6fe34d266a [#1577] morph: Fix typo
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-27 14:03:19 +03:00
fa08bfa553
[#1583] metabase/test: Update TestLisObjectsWithCursor
Update this test following recent changes to ensure
that `(*DB).ListWithCursor` skips expired objects.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-12-26 14:39:50 +03:00
0da998ef50
[#1583] metabase: Skip expired objects in ListWithCursor
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-12-26 14:39:49 +03:00
e44782473a [#1512] object: Fix writePart for EC-container
* Immediatly return after `ObjectAlreadyRemoved` error.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-12-26 11:27:55 +00:00
9cd1bcef06 [#1512] object: Make raw PutSingle check status within response
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-12-26 11:27:55 +00:00
ca0a33ea0f [#465] objsvc: Set NETMAP_EPOCH xheader for auxiliary requests
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-26 09:17:58 +00:00
f6c5222952 [#1581] services/session: Use user.ID.EncodeToString() where possible
gopatch:
```
@@
var id expression
@@
-base58.Encode(id.WalletBytes())
+id.EncodeToString()
```

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-25 18:09:36 +00:00
ea868e09f8
[#1582] adm: Use int64 type and the default value for --till flag
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-25 14:22:28 +03:00
31d3d299bf
[#1582] adm: Unify promps for reading a password
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-25 14:22:28 +03:00
b5b4f78b49
[#1582] adm: Allow using the default account in deposit-notary
It has never worked, actually.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-25 14:22:28 +03:00
2832f44437 [#1531] metrics: Rename app_info metric
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-23 10:40:18 +00:00
7c3bcb0f44
[#1578] Makefile: Refill GAS with a single command in env-up
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-23 11:17:22 +03:00
e64871c3fd
[#1578] adm: Allow to transfer GAS to multiple recepients
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-23 11:17:22 +03:00
303cd35a01
[#1578] adm: Remove unnecessary comments in RefillGasCmd
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-23 11:17:22 +03:00
bb9ba1bce2
[#1578] adm: Remove bool flag from refillGas()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-23 11:17:22 +03:00
db03742d33
[#1578] adm: Reword help message for morph refill-gas
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-23 11:17:22 +03:00
148d68933b [#1573] node: Simplify bootstrapWithState()
After #1382 we have no need to use lambdas.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-20 08:17:05 +00:00
51ee132ea3
[#1342] network/cache: Add node address to error multiClient
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2024-12-18 19:27:35 +03:00
226dd25dd0 [#1568] pilorama: Replace "containerID" with "container ID" in the error message
It is "container ID" in every other place.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-18 15:52:26 +00:00
bd0197eaa8 [#1568] storage: Remove "could not/can't/failed to" from error messages
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-18 15:52:26 +00:00
e44b84c18c
[#1569] cli: Remove unnecessary variable after refactoring
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-18 10:17:04 +03:00
bed49e6ace
[#1569] cli: Make --range flag required in object hash
Previously, `object head` was used if no range was provided.
This is wrong on multiple levels:
1. We print an error if the checksum is missing in header,
   even though taking hash is possible.
2. We silently ignore --salt parameter.
3. `--range` is required for Object.RANGEHASH RPC, custom logic for one
   specific usecase has no value.

So we make it required and make CLI command follow more closely
the FrostFS API.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-18 10:17:04 +03:00
df05057ed4 [#1452] container: Add ListStream method
* Added new method for listing containers to container service.
  It opens stream and sends containers in batches.

* Added TransportSplitter wrapper around ExecutionService to
  split container ID list read from contract in parts that are
  smaller than grpc max message size. Batch size can be changed
  in node configuration file (as in example config file).

* Changed `container list` implementaion in cli: now ListStream
  is called by default. Old List is called only if ListStream
  is not implemented.

* Changed `internalclient.ListContainersPrm`.`Account` to
  `OwnerID` since `client.PrmContainerList`.`Account` was
  renamed to `OwnerID` in sdk.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-17 16:22:43 +03:00
b6c8ebf493 [#1453] container: Replace sort.Slice with slices.SortFunc
* Replaced `sort.Slice` with `slices.SortFunc` in
  `ListContainersRes.SortedIDList()` as it is a bit faster,
  according to 15102e6dfd.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-17 13:33:43 +03:00
141 changed files with 1371 additions and 836 deletions

View file

@ -0,0 +1,28 @@
name: OCI image
on:
push:
workflow_dispatch:
jobs:
image:
name: Build container images
runs-on: docker
container: git.frostfs.info/truecloudlab/env:oci-image-builder-bookworm
steps:
- name: Clone git repo
uses: actions/checkout@v3
- name: Build OCI image
run: make images
- name: Push image to OCI registry
run: |
echo "$REGISTRY_PASSWORD" \
| docker login --username truecloudlab --password-stdin git.frostfs.info
make push-images
if: >-
startsWith(github.ref, 'refs/tags/v') &&
(github.event_name == 'workflow_dispatch' || github.event_name == 'push')
env:
REGISTRY_PASSWORD: ${{secrets.FORGEJO_OCI_REGISTRY_PUSH_TOKEN}}

View file

@ -89,5 +89,7 @@ linters:
- protogetter - protogetter
- intrange - intrange
- tenv - tenv
- unconvert
- unparam
disable-all: true disable-all: true
fast: false fast: false

View file

@ -8,7 +8,7 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')" HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.22 GO_VERSION ?= 1.22
LINT_VERSION ?= 1.62.0 LINT_VERSION ?= 1.62.2
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8 TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
PROTOC_VERSION ?= 25.0 PROTOC_VERSION ?= 25.0
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go) PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
@ -139,6 +139,15 @@ images: image-storage image-ir image-cli image-adm
# Build dirty local Docker images # Build dirty local Docker images
dirty-images: image-dirty-storage image-dirty-ir image-dirty-cli image-dirty-adm dirty-images: image-dirty-storage image-dirty-ir image-dirty-cli image-dirty-adm
# Push FrostFS components' docker image to the registry
push-image-%:
@echo "⇒ Publish FrostFS $* docker image "
@docker push $(HUB_IMAGE)-$*:$(HUB_TAG)
# Push all Docker images to the registry
.PHONY: push-images
push-images: push-image-storage push-image-ir push-image-cli push-image-adm
# Run `make %` in Golang container # Run `make %` in Golang container
docker/%: docker/%:
docker run --rm -t \ docker run --rm -t \
@ -270,10 +279,12 @@ env-up: all
echo "Frostfs contracts not found"; exit 1; \ echo "Frostfs contracts not found"; exit 1; \
fi fi
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH} ${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet01.json --gas 10.0 ${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --gas 10.0 \
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet02.json --gas 10.0 --storage-wallet ./dev/storage/wallet01.json \
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet03.json --gas 10.0 --storage-wallet ./dev/storage/wallet02.json \
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet04.json --gas 10.0 --storage-wallet ./dev/storage/wallet03.json \
--storage-wallet ./dev/storage/wallet04.json
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \ @if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
make locode-download; \ make locode-download; \
fi fi

View file

@ -253,7 +253,7 @@ func frostfsidListNamespaces(cmd *cobra.Command, _ []string) {
reader := frostfsidrpclient.NewReader(inv, hash) reader := frostfsidrpclient.NewReader(inv, hash)
sessionID, it, err := reader.ListNamespaces() sessionID, it, err := reader.ListNamespaces()
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err) commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err) commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
namespaces, err := frostfsidclient.ParseNamespaces(items) namespaces, err := frostfsidclient.ParseNamespaces(items)
@ -305,7 +305,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListNamespaceSubjects(ns) sessionID, it, err := reader.ListNamespaceSubjects(ns)
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err) commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, iteratorBatchSize, sessionID)) subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, sessionID))
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err) commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) }) sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
@ -319,7 +319,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListSubjects() sessionID, it, err := reader.ListSubjects()
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err) commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err) commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
subj, err := frostfsidclient.ParseSubject(items) subj, err := frostfsidclient.ParseSubject(items)
@ -365,7 +365,7 @@ func frostfsidListGroups(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListGroups(ns) sessionID, it, err := reader.ListGroups(ns)
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err) commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err) commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
groups, err := frostfsidclient.ParseGroups(items) groups, err := frostfsidclient.ParseGroups(items)
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err) commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
@ -415,7 +415,7 @@ func frostfsidListGroupSubjects(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID)) sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err) commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err) commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err) subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
@ -492,17 +492,17 @@ func (f *frostfsidClient) sendWaitRes() (*state.AppExecResult, error) {
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil) return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
} }
func readIterator(inv *invoker.Invoker, iter *result.Iterator, batchSize int, sessionID uuid.UUID) ([]stackitem.Item, error) { func readIterator(inv *invoker.Invoker, iter *result.Iterator, sessionID uuid.UUID) ([]stackitem.Item, error) {
var shouldStop bool var shouldStop bool
res := make([]stackitem.Item, 0) res := make([]stackitem.Item, 0)
for !shouldStop { for !shouldStop {
items, err := inv.TraverseIterator(sessionID, iter, batchSize) items, err := inv.TraverseIterator(sessionID, iter, iteratorBatchSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res = append(res, items...) res = append(res, items...)
shouldStop = len(items) < batchSize shouldStop = len(items) < iteratorBatchSize
} }
return res, nil return res, nil

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
@ -141,41 +140,11 @@ func addMultisigAccount(w *wallet.Wallet, m int, name, password string, pubs key
} }
func generateStorageCreds(cmd *cobra.Command, _ []string) error { func generateStorageCreds(cmd *cobra.Command, _ []string) error {
return refillGas(cmd, storageGasConfigFlag, true) walletPath, _ := cmd.Flags().GetString(commonflags.StorageWalletFlag)
} w, err := wallet.NewWallet(walletPath)
func refillGas(cmd *cobra.Command, gasFlag string, createWallet bool) (err error) {
// storage wallet path is not part of the config
storageWalletPath, _ := cmd.Flags().GetString(commonflags.StorageWalletFlag)
// wallet address is not part of the config
walletAddress, _ := cmd.Flags().GetString(walletAddressFlag)
var gasReceiver util.Uint160
if len(walletAddress) != 0 {
gasReceiver, err = address.StringToUint160(walletAddress)
if err != nil { if err != nil {
return fmt.Errorf("invalid wallet address %s: %w", walletAddress, err) return fmt.Errorf("create wallet: %w", err)
} }
} else {
if storageWalletPath == "" {
return fmt.Errorf("missing wallet path (use '--%s <out.json>')", commonflags.StorageWalletFlag)
}
var w *wallet.Wallet
if createWallet {
w, err = wallet.NewWallet(storageWalletPath)
} else {
w, err = wallet.NewWalletFromFile(storageWalletPath)
}
if err != nil {
return fmt.Errorf("can't create wallet: %w", err)
}
if createWallet {
var password string
label, _ := cmd.Flags().GetString(storageWalletLabelFlag) label, _ := cmd.Flags().GetString(storageWalletLabelFlag)
password, err := config.GetStoragePassword(viper.GetViper(), label) password, err := config.GetStoragePassword(viper.GetViper(), label)
@ -190,11 +159,10 @@ func refillGas(cmd *cobra.Command, gasFlag string, createWallet bool) (err error
if err := w.CreateAccount(label, password); err != nil { if err := w.CreateAccount(label, password); err != nil {
return fmt.Errorf("can't create account: %w", err) return fmt.Errorf("can't create account: %w", err)
} }
return refillGas(cmd, storageGasConfigFlag, w.Accounts[0].ScriptHash())
} }
gasReceiver = w.Accounts[0].Contract.ScriptHash() func refillGas(cmd *cobra.Command, gasFlag string, gasReceivers ...util.Uint160) (err error) {
}
gasStr := viper.GetString(gasFlag) gasStr := viper.GetString(gasFlag)
gasAmount, err := helper.ParseGASAmount(gasStr) gasAmount, err := helper.ParseGASAmount(gasStr)
@ -208,9 +176,11 @@ func refillGas(cmd *cobra.Command, gasFlag string, createWallet bool) (err error
} }
bw := io.NewBufBinWriter() bw := io.NewBufBinWriter()
for _, gasReceiver := range gasReceivers {
emit.AppCall(bw.BinWriter, gas.Hash, "transfer", callflag.All, emit.AppCall(bw.BinWriter, gas.Hash, "transfer", callflag.All,
wCtx.CommitteeAcc.Contract.ScriptHash(), gasReceiver, int64(gasAmount), nil) wCtx.CommitteeAcc.Contract.ScriptHash(), gasReceiver, int64(gasAmount), nil)
emit.Opcodes(bw.BinWriter, opcode.ASSERT) emit.Opcodes(bw.BinWriter, opcode.ASSERT)
}
if bw.Err != nil { if bw.Err != nil {
return fmt.Errorf("BUG: invalid transfer arguments: %w", bw.Err) return fmt.Errorf("BUG: invalid transfer arguments: %w", bw.Err)
} }

View file

@ -1,7 +1,12 @@
package generate package generate
import ( import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -33,7 +38,27 @@ var (
_ = viper.BindPFlag(commonflags.RefillGasAmountFlag, cmd.Flags().Lookup(commonflags.RefillGasAmountFlag)) _ = viper.BindPFlag(commonflags.RefillGasAmountFlag, cmd.Flags().Lookup(commonflags.RefillGasAmountFlag))
}, },
RunE: func(cmd *cobra.Command, _ []string) error { RunE: func(cmd *cobra.Command, _ []string) error {
return refillGas(cmd, commonflags.RefillGasAmountFlag, false) storageWalletPaths, _ := cmd.Flags().GetStringArray(commonflags.StorageWalletFlag)
walletAddresses, _ := cmd.Flags().GetStringArray(walletAddressFlag)
var gasReceivers []util.Uint160
for _, walletAddress := range walletAddresses {
addr, err := address.StringToUint160(walletAddress)
if err != nil {
return fmt.Errorf("invalid wallet address %s: %w", walletAddress, err)
}
gasReceivers = append(gasReceivers, addr)
}
for _, storageWalletPath := range storageWalletPaths {
w, err := wallet.NewWalletFromFile(storageWalletPath)
if err != nil {
return fmt.Errorf("can't create wallet: %w", err)
}
gasReceivers = append(gasReceivers, w.Accounts[0].Contract.ScriptHash())
}
return refillGas(cmd, commonflags.RefillGasAmountFlag, gasReceivers...)
}, },
} }
GenerateAlphabetCmd = &cobra.Command{ GenerateAlphabetCmd = &cobra.Command{
@ -50,10 +75,10 @@ var (
func initRefillGasCmd() { func initRefillGasCmd() {
RefillGasCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc) RefillGasCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
RefillGasCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc) RefillGasCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
RefillGasCmd.Flags().String(commonflags.StorageWalletFlag, "", "Path to storage node wallet") RefillGasCmd.Flags().StringArray(commonflags.StorageWalletFlag, nil, "Path to storage node wallet")
RefillGasCmd.Flags().String(walletAddressFlag, "", "Address of wallet") RefillGasCmd.Flags().StringArray(walletAddressFlag, nil, "Address of wallet")
RefillGasCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Additional amount of GAS to transfer") RefillGasCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Additional amount of GAS to transfer")
RefillGasCmd.MarkFlagsMutuallyExclusive(walletAddressFlag, commonflags.StorageWalletFlag) RefillGasCmd.MarkFlagsOneRequired(walletAddressFlag, commonflags.StorageWalletFlag)
} }
func initGenerateStorageCmd() { func initGenerateStorageCmd() {

View file

@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
@ -41,7 +40,8 @@ func depositNotary(cmd *cobra.Command, _ []string) error {
} }
accHash := w.GetChangeAddress() accHash := w.GetChangeAddress()
if addr, err := cmd.Flags().GetString(walletAccountFlag); err == nil { addr, _ := cmd.Flags().GetString(walletAccountFlag)
if addr != "" {
accHash, err = address.StringToUint160(addr) accHash, err = address.StringToUint160(addr)
if err != nil { if err != nil {
return fmt.Errorf("invalid address: %s", addr) return fmt.Errorf("invalid address: %s", addr)
@ -73,17 +73,10 @@ func depositNotary(cmd *cobra.Command, _ []string) error {
return err return err
} }
till := int64(defaultNotaryDepositLifetime) till, _ := cmd.Flags().GetInt64(notaryDepositTillFlag)
tillStr, err := cmd.Flags().GetString(notaryDepositTillFlag) if till <= 0 {
if err != nil {
return err
}
if tillStr != "" {
till, err = strconv.ParseInt(tillStr, 10, 64)
if err != nil || till <= 0 {
return errInvalidNotaryDepositLifetime return errInvalidNotaryDepositLifetime
} }
}
return transferGas(cmd, acc, accHash, gasAmount, till) return transferGas(cmd, acc, accHash, gasAmount, till)
} }

View file

@ -20,7 +20,7 @@ func initDepositoryNotaryCmd() {
DepositCmd.Flags().String(commonflags.StorageWalletFlag, "", "Path to storage node wallet") DepositCmd.Flags().String(commonflags.StorageWalletFlag, "", "Path to storage node wallet")
DepositCmd.Flags().String(walletAccountFlag, "", "Wallet account address") DepositCmd.Flags().String(walletAccountFlag, "", "Wallet account address")
DepositCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Amount of GAS to deposit") DepositCmd.Flags().String(commonflags.RefillGasAmountFlag, "", "Amount of GAS to deposit")
DepositCmd.Flags().String(notaryDepositTillFlag, "", "Notary deposit duration in blocks") DepositCmd.Flags().Int64(notaryDepositTillFlag, defaultNotaryDepositLifetime, "Notary deposit duration in blocks")
} }
func init() { func init() {

View file

@ -20,23 +20,32 @@ const (
accountAddressFlag = "account" accountAddressFlag = "account"
) )
func addProxyAccount(cmd *cobra.Command, _ []string) { func parseAddresses(cmd *cobra.Command) []util.Uint160 {
acc, _ := cmd.Flags().GetString(accountAddressFlag) var addrs []util.Uint160
accs, _ := cmd.Flags().GetStringArray(accountAddressFlag)
for _, acc := range accs {
addr, err := address.StringToUint160(acc) addr, err := address.StringToUint160(acc)
commonCmd.ExitOnErr(cmd, "invalid account: %w", err) commonCmd.ExitOnErr(cmd, "invalid account: %w", err)
err = processAccount(cmd, addr, "addAccount")
addrs = append(addrs, addr)
}
return addrs
}
func addProxyAccount(cmd *cobra.Command, _ []string) {
addrs := parseAddresses(cmd)
err := processAccount(cmd, addrs, "addAccount")
commonCmd.ExitOnErr(cmd, "processing error: %w", err) commonCmd.ExitOnErr(cmd, "processing error: %w", err)
} }
func removeProxyAccount(cmd *cobra.Command, _ []string) { func removeProxyAccount(cmd *cobra.Command, _ []string) {
acc, _ := cmd.Flags().GetString(accountAddressFlag) addrs := parseAddresses(cmd)
addr, err := address.StringToUint160(acc) err := processAccount(cmd, addrs, "removeAccount")
commonCmd.ExitOnErr(cmd, "invalid account: %w", err)
err = processAccount(cmd, addr, "removeAccount")
commonCmd.ExitOnErr(cmd, "processing error: %w", err) commonCmd.ExitOnErr(cmd, "processing error: %w", err)
} }
func processAccount(cmd *cobra.Command, addr util.Uint160, method string) error { func processAccount(cmd *cobra.Command, addrs []util.Uint160, method string) error {
wCtx, err := helper.NewInitializeContext(cmd, viper.GetViper()) wCtx, err := helper.NewInitializeContext(cmd, viper.GetViper())
if err != nil { if err != nil {
return fmt.Errorf("can't initialize context: %w", err) return fmt.Errorf("can't initialize context: %w", err)
@ -54,7 +63,9 @@ func processAccount(cmd *cobra.Command, addr util.Uint160, method string) error
} }
bw := io.NewBufBinWriter() bw := io.NewBufBinWriter()
for _, addr := range addrs {
emit.AppCall(bw.BinWriter, proxyHash, method, callflag.All, addr) emit.AppCall(bw.BinWriter, proxyHash, method, callflag.All, addr)
}
if err := wCtx.SendConsensusTx(bw.Bytes()); err != nil { if err := wCtx.SendConsensusTx(bw.Bytes()); err != nil {
return err return err

View file

@ -29,13 +29,15 @@ var (
func initProxyAddAccount() { func initProxyAddAccount() {
AddAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc) AddAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
AddAccountCmd.Flags().String(accountAddressFlag, "", "Wallet address string") AddAccountCmd.Flags().StringArray(accountAddressFlag, nil, "Wallet address string")
_ = AddAccountCmd.MarkFlagRequired(accountAddressFlag)
AddAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc) AddAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
} }
func initProxyRemoveAccount() { func initProxyRemoveAccount() {
RemoveAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc) RemoveAccountCmd.Flags().StringP(commonflags.EndpointFlag, commonflags.EndpointFlagShort, "", commonflags.EndpointFlagDesc)
RemoveAccountCmd.Flags().String(accountAddressFlag, "", "Wallet address string") RemoveAccountCmd.Flags().StringArray(accountAddressFlag, nil, "Wallet address string")
_ = AddAccountCmd.MarkFlagRequired(accountAddressFlag)
RemoveAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc) RemoveAccountCmd.Flags().String(commonflags.AlphabetWalletsFlag, "", commonflags.AlphabetWalletsFlagDesc)
} }

View file

@ -11,6 +11,7 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"strconv" "strconv"
"strings" "strings"
"text/template" "text/template"
@ -105,7 +106,7 @@ func storageConfig(cmd *cobra.Command, args []string) {
fatalOnErr(errors.New("can't find account in wallet")) fatalOnErr(errors.New("can't find account in wallet"))
} }
c.Wallet.Password, err = input.ReadPassword(fmt.Sprintf("Account password for %s: ", c.Wallet.Account)) c.Wallet.Password, err = input.ReadPassword(fmt.Sprintf("Enter password for %s > ", c.Wallet.Account))
fatalOnErr(err) fatalOnErr(err)
err = acc.Decrypt(c.Wallet.Password, keys.NEP2ScryptParams()) err = acc.Decrypt(c.Wallet.Password, keys.NEP2ScryptParams())
@ -410,8 +411,7 @@ func initClient(rpc []string) *rpcclient.Client {
var c *rpcclient.Client var c *rpcclient.Client
var err error var err error
shuffled := make([]string, len(rpc)) shuffled := slices.Clone(rpc)
copy(shuffled, rpc)
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
for _, endpoint := range shuffled { for _, endpoint := range shuffled {

View file

@ -9,7 +9,6 @@ import (
"io" "io"
"os" "os"
"slices" "slices"
"sort"
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
@ -78,13 +77,31 @@ func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContain
// SortedIDList returns sorted list of identifiers of user's containers. // SortedIDList returns sorted list of identifiers of user's containers.
func (x ListContainersRes) SortedIDList() []cid.ID { func (x ListContainersRes) SortedIDList() []cid.ID {
list := x.cliRes.Containers() list := x.cliRes.Containers()
sort.Slice(list, func(i, j int) bool { slices.SortFunc(list, func(lhs, rhs cid.ID) int {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString())
return strings.Compare(lhs, rhs) < 0
}) })
return list return list
} }
func ListContainersStream(ctx context.Context, prm ListContainersPrm, processCnr func(id cid.ID) bool) (err error) {
cliPrm := &client.PrmContainerListStream{
XHeaders: prm.XHeaders,
OwnerID: prm.OwnerID,
Session: prm.Session,
}
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
if err != nil {
return fmt.Errorf("init container list: %w", err)
}
err = rdr.Iterate(processCnr)
if err != nil {
return fmt.Errorf("read container list: %w", err)
}
return
}
// PutContainerPrm groups parameters of PutContainer operation. // PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct { type PutContainerPrm struct {
Client *client.Client Client *client.Client

View file

@ -52,7 +52,7 @@ func genereateAPEOverride(cmd *cobra.Command, _ []string) {
outputPath, _ := cmd.Flags().GetString(outputFlag) outputPath, _ := cmd.Flags().GetString(outputFlag)
if outputPath != "" { if outputPath != "" {
err := os.WriteFile(outputPath, []byte(overrideMarshalled), 0o644) err := os.WriteFile(outputPath, overrideMarshalled, 0o644)
commonCmd.ExitOnErr(cmd, "dump error: %w", err) commonCmd.ExitOnErr(cmd, "dump error: %w", err)
} else { } else {
fmt.Print("\n") fmt.Print("\n")

View file

@ -6,8 +6,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
// flags of list command. // flags of list command.
@ -51,34 +54,52 @@ var listContainersCmd = &cobra.Command{
var prm internalclient.ListContainersPrm var prm internalclient.ListContainersPrm
prm.SetClient(cli) prm.SetClient(cli)
prm.Account = idUser prm.OwnerID = idUser
res, err := internalclient.ListContainers(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
prmGet := internalclient.GetContainerPrm{ prmGet := internalclient.GetContainerPrm{
Client: cli, Client: cli,
} }
var containerIDs []cid.ID
containerIDs := res.SortedIDList() err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) bool {
for _, cnrID := range containerIDs { printContainer(cmd, prmGet, id)
if flagVarListName == "" && !flagVarListPrintAttr { return false
cmd.Println(cnrID.String()) })
continue if err == nil {
return
} }
prmGet.ClientParams.ContainerID = &cnrID if e, ok := status.FromError(err); ok && e.Code() == codes.Unimplemented {
res, err := internalclient.ListContainers(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
containerIDs = res.SortedIDList()
} else {
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}
for _, cnrID := range containerIDs {
printContainer(cmd, prmGet, cnrID)
}
},
}
func printContainer(cmd *cobra.Command, prmGet internalclient.GetContainerPrm, id cid.ID) {
if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(id.String())
return
}
prmGet.ClientParams.ContainerID = &id
res, err := internalclient.GetContainer(cmd.Context(), prmGet) res, err := internalclient.GetContainer(cmd.Context(), prmGet)
if err != nil { if err != nil {
cmd.Printf(" failed to read attributes: %v\n", err) cmd.Printf(" failed to read attributes: %v\n", err)
continue return
} }
cnr := res.Container() cnr := res.Container()
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName { if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
continue return
} }
cmd.Println(cnrID.String()) cmd.Println(id.String())
if flagVarListPrintAttr { if flagVarListPrintAttr {
cnr.IterateUserAttributes(func(key, val string) { cnr.IterateUserAttributes(func(key, val string) {
@ -86,8 +107,6 @@ var listContainersCmd = &cobra.Command{
}) })
} }
} }
},
}
func initContainerListContainersCmd() { func initContainerListContainersCmd() {
commonflags.Init(listContainersCmd) commonflags.Init(listContainersCmd)

View file

@ -23,11 +23,11 @@ type policyPlaygroundREPL struct {
nodes map[string]netmap.NodeInfo nodes map[string]netmap.NodeInfo
} }
func newPolicyPlaygroundREPL(cmd *cobra.Command) (*policyPlaygroundREPL, error) { func newPolicyPlaygroundREPL(cmd *cobra.Command) *policyPlaygroundREPL {
return &policyPlaygroundREPL{ return &policyPlaygroundREPL{
cmd: cmd, cmd: cmd,
nodes: map[string]netmap.NodeInfo{}, nodes: map[string]netmap.NodeInfo{},
}, nil }
} }
func (repl *policyPlaygroundREPL) handleLs(args []string) error { func (repl *policyPlaygroundREPL) handleLs(args []string) error {
@ -246,8 +246,7 @@ var policyPlaygroundCmd = &cobra.Command{
Long: `A REPL for testing placement policies. Long: `A REPL for testing placement policies.
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`, If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
Run: func(cmd *cobra.Command, _ []string) { Run: func(cmd *cobra.Command, _ []string) {
repl, err := newPolicyPlaygroundREPL(cmd) repl := newPolicyPlaygroundREPL(cmd)
commonCmd.ExitOnErr(cmd, "could not create policy playground: %w", err)
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run()) commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
}, },
} }

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -42,7 +41,9 @@ func initObjectHashCmd() {
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage) flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
_ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag) _ = objectHashCmd.MarkFlagRequired(commonflags.OIDFlag)
flags.String("range", "", "Range to take hash from in the form offset1:length1,...") flags.StringSlice("range", nil, "Range to take hash from in the form offset1:length1,...")
_ = objectHashCmd.MarkFlagRequired("range")
flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'") flags.String("type", hashSha256, "Hash type. Either 'sha256' or 'tz'")
flags.String(getRangeHashSaltFlag, "", "Salt in hex format") flags.String(getRangeHashSaltFlag, "", "Salt in hex format")
} }
@ -66,36 +67,6 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
pk := key.GetOrGenerate(cmd) pk := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC) cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
tz := typ == hashTz
fullHash := len(ranges) == 0
if fullHash {
var headPrm internalclient.HeadObjectPrm
headPrm.SetClient(cli)
Prepare(cmd, &headPrm)
headPrm.SetAddress(objAddr)
// get hash of full payload through HEAD (may be user can do it through dedicated command?)
res, err := internalclient.HeadObject(cmd.Context(), headPrm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
var cs checksum.Checksum
var csSet bool
if tz {
cs, csSet = res.Header().PayloadHomomorphicHash()
} else {
cs, csSet = res.Header().PayloadChecksum()
}
if csSet {
cmd.Println(hex.EncodeToString(cs.Value()))
} else {
cmd.Println("Missing checksum in object header.")
}
return
}
var hashPrm internalclient.HashPayloadRangesPrm var hashPrm internalclient.HashPayloadRangesPrm
hashPrm.SetClient(cli) hashPrm.SetClient(cli)
Prepare(cmd, &hashPrm) Prepare(cmd, &hashPrm)
@ -104,7 +75,7 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
hashPrm.SetSalt(salt) hashPrm.SetSalt(salt)
hashPrm.SetRanges(ranges) hashPrm.SetRanges(ranges)
if tz { if typ == hashTz {
hashPrm.TZ() hashPrm.TZ()
} }

View file

@ -46,7 +46,7 @@ func initObjectPatchCmd() {
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage) flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag) _ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
flags.String(newAttrsFlagName, "", "New object attributes in form of Key1=Value1,Key2=Value2") flags.StringSlice(newAttrsFlagName, nil, "New object attributes in form of Key1=Value1,Key2=Value2")
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.") flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length") flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.") flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
@ -99,11 +99,9 @@ func patch(cmd *cobra.Command, _ []string) {
} }
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) { func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
var rawAttrs []string rawAttrs, err := cmd.Flags().GetStringSlice(newAttrsFlagName)
if err != nil {
raw := cmd.Flag(newAttrsFlagName).Value.String() return nil, err
if len(raw) != 0 {
rawAttrs = strings.Split(raw, ",")
} }
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes

View file

@ -50,7 +50,7 @@ func initObjectPutCmd() {
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage) flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
flags.String("attributes", "", "User attributes in form of Key1=Value1,Key2=Value2") flags.StringSlice("attributes", nil, "User attributes in form of Key1=Value1,Key2=Value2")
flags.Bool("disable-filename", false, "Do not set well-known filename attribute") flags.Bool("disable-filename", false, "Do not set well-known filename attribute")
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute") flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object") flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
@ -214,11 +214,9 @@ func getAllObjectAttributes(cmd *cobra.Command) []objectSDK.Attribute {
} }
func parseObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) { func parseObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
var rawAttrs []string rawAttrs, err := cmd.Flags().GetStringSlice("attributes")
if err != nil {
raw := cmd.Flag("attributes").Value.String() return nil, err
if len(raw) != 0 {
rawAttrs = strings.Split(raw, ",")
} }
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes

View file

@ -38,7 +38,7 @@ func initObjectRangeCmd() {
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage) flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag) _ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
flags.String("range", "", "Range to take data from in the form offset:length") flags.StringSlice("range", nil, "Range to take data from in the form offset:length")
flags.String(fileFlag, "", "File to write object payload to. Default: stdout.") flags.String(fileFlag, "", "File to write object payload to. Default: stdout.")
flags.Bool(rawFlag, false, rawFlagDesc) flags.Bool(rawFlag, false, rawFlagDesc)
} }
@ -195,11 +195,10 @@ func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
} }
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) { func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
v := cmd.Flag("range").Value.String() vs, err := cmd.Flags().GetStringSlice("range")
if len(v) == 0 { if len(vs) == 0 || err != nil {
return nil, nil return nil, err
} }
vs := strings.Split(v, ",")
rs := make([]objectSDK.Range, len(vs)) rs := make([]objectSDK.Range, len(vs))
for i := range vs { for i := range vs {
before, after, found := strings.Cut(vs[i], rangeSep) before, after, found := strings.Cut(vs[i], rangeSep)

View file

@ -124,10 +124,7 @@ func (v *BucketsView) loadNodeChildren(
path := parentBucket.Path path := parentBucket.Path
parser := parentBucket.NextParser parser := parentBucket.NextParser
buffer, err := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize) buffer := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
if err != nil {
return err
}
for item := range buffer { for item := range buffer {
if item.err != nil { if item.err != nil {
@ -135,6 +132,7 @@ func (v *BucketsView) loadNodeChildren(
} }
bucket := item.val bucket := item.val
var err error
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil) bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
if err != nil { if err != nil {
return err return err
@ -180,10 +178,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
defer cancel() defer cancel()
// Check the current bucket's nested buckets if exist // Check the current bucket's nested buckets if exist
bucketsBuffer, err := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize) bucketsBuffer := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
if err != nil {
return false, err
}
for item := range bucketsBuffer { for item := range bucketsBuffer {
if item.err != nil { if item.err != nil {
@ -191,6 +186,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
} }
b := item.val b := item.val
var err error
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil) b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
if err != nil { if err != nil {
return false, err return false, err
@ -206,10 +202,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
} }
// Check the current bucket's nested records if exist // Check the current bucket's nested records if exist
recordsBuffer, err := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize) recordsBuffer := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
if err != nil {
return false, err
}
for item := range recordsBuffer { for item := range recordsBuffer {
if item.err != nil { if item.err != nil {
@ -217,6 +210,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
} }
r := item.val r := item.val
var err error
r.Entry, _, err = bucket.NextParser(r.Key, r.Value) r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
if err != nil { if err != nil {
return false, err return false, err

View file

@ -35,7 +35,7 @@ func resolvePath(tx *bbolt.Tx, path [][]byte) (*bbolt.Bucket, error) {
func load[T any]( func load[T any](
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int, ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
filter func(key, value []byte) bool, transform func(key, value []byte) T, filter func(key, value []byte) bool, transform func(key, value []byte) T,
) (<-chan Item[T], error) { ) <-chan Item[T] {
buffer := make(chan Item[T], bufferSize) buffer := make(chan Item[T], bufferSize)
go func() { go func() {
@ -77,13 +77,13 @@ func load[T any](
} }
}() }()
return buffer, nil return buffer
} }
func LoadBuckets( func LoadBuckets(
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int, ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
) (<-chan Item[*Bucket], error) { ) <-chan Item[*Bucket] {
buffer, err := load( buffer := load(
ctx, db, path, bufferSize, ctx, db, path, bufferSize,
func(_, value []byte) bool { func(_, value []byte) bool {
return value == nil return value == nil
@ -98,17 +98,14 @@ func LoadBuckets(
} }
}, },
) )
if err != nil {
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
}
return buffer, nil return buffer
} }
func LoadRecords( func LoadRecords(
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int, ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
) (<-chan Item[*Record], error) { ) <-chan Item[*Record] {
buffer, err := load( buffer := load(
ctx, db, path, bufferSize, ctx, db, path, bufferSize,
func(_, value []byte) bool { func(_, value []byte) bool {
return value != nil return value != nil
@ -124,11 +121,8 @@ func LoadRecords(
} }
}, },
) )
if err != nil {
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
}
return buffer, nil return buffer
} }
// HasBuckets checks if a bucket has nested buckets. It relies on assumption // HasBuckets checks if a bucket has nested buckets. It relies on assumption
@ -137,24 +131,21 @@ func HasBuckets(ctx context.Context, db *bbolt.DB, path [][]byte) (bool, error)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
buffer, err := load( buffer := load(
ctx, db, path, 1, ctx, db, path, 1,
nil, nil,
func(_, value []byte) []byte { return value }, func(_, value []byte) []byte { return value },
) )
if err != nil {
return false, err
}
x, ok := <-buffer x, ok := <-buffer
if !ok { if !ok {
return false, nil return false, nil
} }
if x.err != nil { if x.err != nil {
return false, err return false, x.err
} }
if x.val != nil { if x.val != nil {
return false, err return false, nil
} }
return true, nil return true, nil
} }

View file

@ -62,10 +62,7 @@ func (v *RecordsView) Mount(ctx context.Context) error {
ctx, v.onUnmount = context.WithCancel(ctx) ctx, v.onUnmount = context.WithCancel(ctx)
tempBuffer, err := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize) tempBuffer := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
if err != nil {
return err
}
v.buffer = make(chan *Record, v.ui.loadBufferSize) v.buffer = make(chan *Record, v.ui.loadBufferSize)
go func() { go func() {
@ -73,11 +70,12 @@ func (v *RecordsView) Mount(ctx context.Context) error {
for item := range tempBuffer { for item := range tempBuffer {
if item.err != nil { if item.err != nil {
v.ui.stopOnError(err) v.ui.stopOnError(item.err)
break break
} }
record := item.val record := item.val
var err error
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value) record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
if err != nil { if err != nil {
v.ui.stopOnError(err) v.ui.stopOnError(err)

View file

@ -19,6 +19,7 @@ func initAPEManagerService(c *cfg) {
c.cfgObject.cfgAccessPolicyEngine.policyContractHash) c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
execsvc := apemanager.New(c.cfgObject.cnrSource, contractStorage, execsvc := apemanager.New(c.cfgObject.cnrSource, contractStorage,
c.cfgMorph.client,
apemanager.WithLogger(c.log)) apemanager.WithLogger(c.log))
sigsvc := apemanager.NewSignService(&c.key.PrivateKey, execsvc) sigsvc := apemanager.NewSignService(&c.key.PrivateKey, execsvc)
auditSvc := apemanager.NewAuditService(sigsvc, c.log, c.audit) auditSvc := apemanager.NewAuditService(sigsvc, c.log, c.audit)

View file

@ -609,6 +609,7 @@ type cfgContainer struct {
parsers map[event.Type]event.NotificationParser parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers workerPool util.WorkerPool // pool for asynchronous handlers
containerBatchSize uint32
} }
type cfgFrostfsID struct { type cfgFrostfsID struct {
@ -697,8 +698,7 @@ func initCfg(appCfg *config.Config) *cfg {
netState.metrics = c.metricsCollector netState.metrics = c.metricsCollector
logPrm, err := c.loggerPrm() logPrm := c.loggerPrm()
fatalOnErr(err)
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook() logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
log, err := logger.NewLogger(logPrm) log, err := logger.NewLogger(logPrm)
fatalOnErr(err) fatalOnErr(err)
@ -853,7 +853,7 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
func initCfgGRPC() cfgGRPC { func initCfgGRPC() cfgGRPC {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
return cfgGRPC{ return cfgGRPC{
maxChunkSize: maxChunkSize, maxChunkSize: maxChunkSize,
@ -1058,7 +1058,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
return sh return sh
} }
func (c *cfg) loggerPrm() (*logger.Prm, error) { func (c *cfg) loggerPrm() *logger.Prm {
// check if it has been inited before // check if it has been inited before
if c.dynamicConfiguration.logger == nil { if c.dynamicConfiguration.logger == nil {
c.dynamicConfiguration.logger = new(logger.Prm) c.dynamicConfiguration.logger = new(logger.Prm)
@ -1077,7 +1077,7 @@ func (c *cfg) loggerPrm() (*logger.Prm, error) {
} }
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
return c.dynamicConfiguration.logger, nil return c.dynamicConfiguration.logger
} }
func (c *cfg) LocalAddress() network.AddressGroup { func (c *cfg) LocalAddress() network.AddressGroup {
@ -1146,7 +1146,7 @@ func initAccessPolicyEngine(ctx context.Context, c *cfg) {
c.cfgObject.cfgAccessPolicyEngine.policyContractHash) c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
cacheSize := morphconfig.APEChainCacheSize(c.appCfg) cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
if cacheSize > 0 { if cacheSize > 0 && c.cfgMorph.cacheTTL > 0 {
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL) morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
} }
@ -1219,9 +1219,9 @@ func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract // bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
// with the binary-encoded information from the current node's configuration. // with the binary-encoded information from the current node's configuration.
// The state is set using the provided setter which MUST NOT be nil. // The state is set using the provided setter which MUST NOT be nil.
func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.NodeInfo)) error { func (c *cfg) bootstrapWithState(ctx context.Context, state netmap.NodeState) error {
ni := c.cfgNodeInfo.localInfo ni := c.cfgNodeInfo.localInfo
stateSetter(&ni) ni.SetStatus(state)
prm := nmClient.AddPeerPrm{} prm := nmClient.AddPeerPrm{}
prm.SetNodeInfo(ni) prm.SetNodeInfo(ni)
@ -1231,9 +1231,7 @@ func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.N
// bootstrapOnline calls cfg.bootstrapWithState with "online" state. // bootstrapOnline calls cfg.bootstrapWithState with "online" state.
func bootstrapOnline(ctx context.Context, c *cfg) error { func bootstrapOnline(ctx context.Context, c *cfg) error {
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) { return c.bootstrapWithState(ctx, netmap.Online)
ni.SetStatus(netmap.Online)
})
} }
// bootstrap calls bootstrapWithState with: // bootstrap calls bootstrapWithState with:
@ -1244,9 +1242,7 @@ func (c *cfg) bootstrap(ctx context.Context) error {
st := c.cfgNetmap.state.controlNetmapStatus() st := c.cfgNetmap.state.controlNetmapStatus()
if st == control.NetmapStatus_MAINTENANCE { if st == control.NetmapStatus_MAINTENANCE {
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState) c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) { return c.bootstrapWithState(ctx, netmap.Maintenance)
ni.SetStatus(netmap.Maintenance)
})
} }
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState, c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
@ -1337,11 +1333,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
// Logger // Logger
logPrm, err := c.loggerPrm() logPrm := c.loggerPrm()
if err != nil {
c.log.Error(ctx, logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
return
}
components := c.getComponents(ctx, logPrm) components := c.getComponents(ctx, logPrm)

View file

@ -1,6 +1,7 @@
package config package config
import ( import (
"slices"
"strings" "strings"
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config" configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
@ -52,6 +53,5 @@ func (x *Config) Value(name string) any {
// It supports only one level of nesting and is intended to be used // It supports only one level of nesting and is intended to be used
// to provide default values. // to provide default values.
func (x *Config) SetDefault(from *Config) { func (x *Config) SetDefault(from *Config) {
x.defaultPath = make([]string, len(from.path)) x.defaultPath = slices.Clone(from.path)
copy(x.defaultPath, from.path)
} }

View file

@ -0,0 +1,27 @@
package containerconfig
import "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
const (
subsection = "container"
listStreamSubsection = "list_stream"
// ContainerBatchSizeDefault represents the maximum amount of containers to send via stream at once.
ContainerBatchSizeDefault = 1000
)
// ContainerBatchSize returns the value of "batch_size" config parameter
// from "list_stream" subsection of "container" section.
//
// Returns ContainerBatchSizeDefault if the value is missing or if
// the value is not positive integer.
func ContainerBatchSize(c *config.Config) uint32 {
if c.Sub(subsection).Sub(listStreamSubsection).Value("batch_size") == nil {
return ContainerBatchSizeDefault
}
size := config.Uint32Safe(c.Sub(subsection).Sub(listStreamSubsection), "batch_size")
if size == 0 {
return ContainerBatchSizeDefault
}
return size
}

View file

@ -0,0 +1,27 @@
package containerconfig_test
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"github.com/stretchr/testify/require"
)
func TestContainerSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
empty := configtest.EmptyConfig()
require.Equal(t, uint32(containerconfig.ContainerBatchSizeDefault), containerconfig.ContainerBatchSize(empty))
})
const path = "../../../../config/example/node"
fileConfigTest := func(c *config.Config) {
require.Equal(t, uint32(500), containerconfig.ContainerBatchSize(c))
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}

View file

@ -198,7 +198,7 @@ func (l PersistentPolicyRulesConfig) Path() string {
// //
// Returns PermDefault if the value is not a positive number. // Returns PermDefault if the value is not a positive number.
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode { func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
p := config.UintSafe((*config.Config)(l.cfg), "perm") p := config.UintSafe(l.cfg, "perm")
if p == 0 { if p == 0 {
p = PermDefault p = PermDefault
} }
@ -210,7 +210,7 @@ func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
// //
// Returns false if the value is not a boolean. // Returns false if the value is not a boolean.
func (l PersistentPolicyRulesConfig) NoSync() bool { func (l PersistentPolicyRulesConfig) NoSync() bool {
return config.BoolSafe((*config.Config)(l.cfg), "no_sync") return config.BoolSafe(l.cfg, "no_sync")
} }
// CompatibilityMode returns true if need to run node in compatibility with previous versions mode. // CompatibilityMode returns true if need to run node in compatibility with previous versions mode.

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"net" "net"
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
@ -42,11 +43,12 @@ func initContainerService(_ context.Context, c *cfg) {
fatalOnErr(err) fatalOnErr(err)
cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg) cacheSize := morphconfig.FrostfsIDCacheSize(c.appCfg)
if cacheSize > 0 { if cacheSize > 0 && c.cfgMorph.cacheTTL > 0 {
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id")) frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
} }
c.shared.frostfsidClient = frostfsIDSubjectProvider c.shared.frostfsidClient = frostfsIDSubjectProvider
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides( defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(), c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
@ -56,7 +58,9 @@ func initContainerService(_ context.Context, c *cfg) {
&c.key.PrivateKey, &c.key.PrivateKey,
containerService.NewAPEServer(defaultChainRouter, cnrRdr, containerService.NewAPEServer(defaultChainRouter, cnrRdr,
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient, newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), containerService.NewSplitterService(
c.cfgContainer.containerBatchSize, c.respSvc,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
), ),
) )
service = containerService.NewAuditService(service, c.log, c.audit) service = containerService.NewAuditService(service, c.log, c.audit)
@ -218,6 +222,7 @@ type morphContainerReader struct {
lister interface { lister interface {
ContainersOf(*user.ID) ([]cid.ID, error) ContainersOf(*user.ID) ([]cid.ID, error)
IterateContainersOf(*user.ID, func(cid.ID) error) error
} }
} }
@ -233,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
return x.lister.ContainersOf(id) return x.lister.ContainersOf(id)
} }
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
return x.lister.IterateContainersOf(id, processCID)
}
type morphContainerWriter struct { type morphContainerWriter struct {
neoClient *cntClient.Client neoClient *cntClient.Client
} }

View file

@ -151,7 +151,7 @@ func makeNotaryDeposit(ctx context.Context, c *cfg) (util.Uint256, uint32, error
} }
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error { func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256, vub uint32) error {
if err := c.cfgMorph.client.WaitTxHalt(ctx, client.InvokeRes{Hash: tx, VUB: vub}); err != nil { if err := c.cfgMorph.client.WaitTxHalt(ctx, vub, tx); err != nil {
return err return err
} }

View file

@ -86,7 +86,7 @@ func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
} }
} }
s.setControlNetmapStatus(control.NetmapStatus(ctrlNetSt)) s.setControlNetmapStatus(ctrlNetSt)
} }
// sets the current node state to the given value. Subsequent cfg.bootstrap // sets the current node state to the given value. Subsequent cfg.bootstrap
@ -423,7 +423,7 @@ func (c *cfg) updateNetMapState(ctx context.Context, stateSetter func(*nmClient.
if err != nil { if err != nil {
return err return err
} }
return c.cfgNetmap.wrapper.Morph().WaitTxHalt(ctx, res) return c.cfgNetmap.wrapper.Morph().WaitTxHalt(ctx, res.VUB, res.Hash)
} }
type netInfo struct { type netInfo struct {

View file

@ -215,8 +215,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
prm.MarkAsGarbage(addr) prm.MarkAsGarbage(addr)
prm.WithForceRemoval() prm.WithForceRemoval()
_, err := ls.Inhume(ctx, prm) return ls.Inhume(ctx, prm)
return err
} }
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor) remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
@ -266,8 +265,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
var inhumePrm engine.InhumePrm var inhumePrm engine.InhumePrm
inhumePrm.MarkAsGarbage(addr) inhumePrm.MarkAsGarbage(addr)
_, err := ls.Inhume(ctx, inhumePrm) if err := ls.Inhume(ctx, inhumePrm); err != nil {
if err != nil {
c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage, c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
zap.Error(err), zap.Error(err),
) )
@ -476,8 +474,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
prm.WithTarget(tombstone, addrs...) prm.WithTarget(tombstone, addrs...)
_, err := e.engine.Inhume(ctx, prm) return e.engine.Inhume(ctx, prm)
return err
} }
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error { func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {

View file

@ -83,6 +83,9 @@ FROSTFS_POLICER_HEAD_TIMEOUT=15s
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
FROSTFS_REPLICATOR_POOL_SIZE=10 FROSTFS_REPLICATOR_POOL_SIZE=10
# Container service section
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
# Object service section # Object service section
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100 FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200

View file

@ -124,6 +124,11 @@
"pool_size": 10, "pool_size": 10,
"put_timeout": "15s" "put_timeout": "15s"
}, },
"container": {
"list_stream": {
"batch_size": "500"
}
},
"object": { "object": {
"delete": { "delete": {
"tombstone_lifetime": 10 "tombstone_lifetime": 10

View file

@ -79,7 +79,8 @@ contracts: # side chain NEOFS contract script hashes; optional, override values
morph: morph:
dial_timeout: 30s # timeout for side chain NEO RPC client connection dial_timeout: 30s # timeout for side chain NEO RPC client connection
cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls). Negative value disables caching. cache_ttl: 15s # Sidechain cache TTL value (min interval between similar calls).
# Negative value disables caching. A zero value sets the default value.
# Default value: block time. It is recommended to have this value less or equal to block time. # Default value: block time. It is recommended to have this value less or equal to block time.
# Cached entities: containers, container lists, eACL tables. # Cached entities: containers, container lists, eACL tables.
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache. container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
@ -108,6 +109,10 @@ replicator:
put_timeout: 15s # timeout for the Replicator PUT remote operation put_timeout: 15s # timeout for the Replicator PUT remote operation
pool_size: 10 # maximum amount of concurrent replications pool_size: 10 # maximum amount of concurrent replications
container:
list_stream:
batch_size: 500 # container_batch_size is the maximum amount of containers to send via stream at once
object: object:
delete: delete:
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs tombstone_lifetime: 10 # tombstone "local" lifetime in epochs

View file

@ -42,7 +42,6 @@
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s", "FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws", "FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0", "FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet01.json", "FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet01.json",
"FROSTFS_NODE_WALLET_PASSWORD":"", "FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080", "FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
@ -98,7 +97,6 @@
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s", "FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws", "FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0", "FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet02.json", "FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet02.json",
"FROSTFS_NODE_WALLET_PASSWORD":"", "FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8082", "FROSTFS_NODE_ADDRESSES":"127.0.0.1:8082",
@ -154,7 +152,6 @@
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s", "FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws", "FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0", "FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet03.json", "FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet03.json",
"FROSTFS_NODE_WALLET_PASSWORD":"", "FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8084", "FROSTFS_NODE_ADDRESSES":"127.0.0.1:8084",
@ -210,7 +207,6 @@
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s", "FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws", "FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0", "FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet04.json", "FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet04.json",
"FROSTFS_NODE_WALLET_PASSWORD":"", "FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8086", "FROSTFS_NODE_ADDRESSES":"127.0.0.1:8086",

View file

@ -95,19 +95,15 @@ $ git push origin ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
## Post-release ## Post-release
### Prepare and push images to a Docker Hub (if not automated) ### Prepare and push images to a Docker registry (automated)
Create Docker images for all applications and push them into Docker Hub Create Docker images for all applications and push them into container registry
(requires [organization](https://hub.docker.com/u/truecloudlab) privileges) (executed automatically in Forgejo Actions upon pushing a release tag):
```shell ```shell
$ git checkout ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION} $ git checkout ${FROSTFS_TAG_PREFIX}${FROSTFS_REVISION}
$ make images $ make images
$ docker push truecloudlab/frostfs-storage:${FROSTFS_REVISION} $ make push-images
$ docker push truecloudlab/frostfs-storage-testnet:${FROSTFS_REVISION}
$ docker push truecloudlab/frostfs-ir:${FROSTFS_REVISION}
$ docker push truecloudlab/frostfs-cli:${FROSTFS_REVISION}
$ docker push truecloudlab/frostfs-adm:${FROSTFS_REVISION}
``` ```
### Make a proper release (if not automated) ### Make a proper release (if not automated)

32
go.mod
View file

@ -8,7 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250109084609-328d214d2d76
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
@ -27,7 +27,7 @@ require (
github.com/klauspost/compress v1.17.4 github.com/klauspost/compress v1.17.4
github.com/mailru/easyjson v0.7.7 github.com/mailru/easyjson v0.7.7
github.com/mr-tron/base58 v1.2.0 github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1 github.com/multiformats/go-multiaddr v0.14.0
github.com/nspcc-dev/neo-go v0.106.3 github.com/nspcc-dev/neo-go v0.106.3
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0 github.com/panjf2000/ants/v2 v2.9.0
@ -40,15 +40,15 @@ require (
github.com/ssgreg/journald v1.0.0 github.com/ssgreg/journald v1.0.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.10 go.etcd.io/bbolt v1.3.10
go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel v1.31.0
go.opentelemetry.io/otel/trace v1.28.0 go.opentelemetry.io/otel/trace v1.31.0
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/sync v0.7.0 golang.org/x/sync v0.10.0
golang.org/x/sys v0.22.0 golang.org/x/sys v0.28.0
golang.org/x/term v0.21.0 golang.org/x/term v0.27.0
google.golang.org/grpc v1.66.2 google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.34.2 google.golang.org/protobuf v1.36.1
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
@ -119,15 +119,15 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.24.0 // indirect golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.26.0 // indirect golang.org/x/net v0.30.0 // indirect
golang.org/x/text v0.16.0 // indirect golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect rsc.io/tmplfunc v0.0.3 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -146,7 +146,6 @@ const (
ClientCantGetBlockchainHeight = "can't get blockchain height" ClientCantGetBlockchainHeight = "can't get blockchain height"
ClientCantGetBlockchainHeight243 = "can't get blockchain height" ClientCantGetBlockchainHeight243 = "can't get blockchain height"
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool" EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
EventCouldNotStartListenToEvents = "could not start listen to events"
EventStopEventListenerByError = "stop event listener by error" EventStopEventListenerByError = "stop event listener by error"
EventStopEventListenerByContext = "stop event listener by context" EventStopEventListenerByContext = "stop event listener by context"
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel" EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
@ -384,7 +383,6 @@ const (
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown" FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing" FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
FrostFSNodeConfigurationReading = "configuration reading" FrostFSNodeConfigurationReading = "configuration reading"
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
FrostFSNodeTracingConfigationUpdated = "tracing configation updated" FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update" FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration" FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"

View file

@ -12,6 +12,7 @@ type ApplicationInfo struct {
func NewApplicationInfo(version string) *ApplicationInfo { func NewApplicationInfo(version string) *ApplicationInfo {
appInfo := &ApplicationInfo{ appInfo := &ApplicationInfo{
versionValue: metrics.NewGaugeVec(prometheus.GaugeOpts{ versionValue: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "app_info", Name: "app_info",
Help: "General information about the application.", Help: "General information about the application.",
}, []string{"version"}), }, []string{"version"}),

View file

@ -31,9 +31,7 @@ type RPCActorProvider interface {
type ProxyVerificationContractStorage struct { type ProxyVerificationContractStorage struct {
rpcActorProvider RPCActorProvider rpcActorProvider RPCActorProvider
acc *wallet.Account cosigners []actor.SignerAccount
proxyScriptHash util.Uint160
policyScriptHash util.Uint160 policyScriptHash util.Uint160
} }
@ -41,12 +39,27 @@ type ProxyVerificationContractStorage struct {
var _ ProxyAdaptedContractStorage = (*ProxyVerificationContractStorage)(nil) var _ ProxyAdaptedContractStorage = (*ProxyVerificationContractStorage)(nil)
func NewProxyVerificationContractStorage(rpcActorProvider RPCActorProvider, key *keys.PrivateKey, proxyScriptHash, policyScriptHash util.Uint160) *ProxyVerificationContractStorage { func NewProxyVerificationContractStorage(rpcActorProvider RPCActorProvider, key *keys.PrivateKey, proxyScriptHash, policyScriptHash util.Uint160) *ProxyVerificationContractStorage {
acc := wallet.NewAccountFromPrivateKey(key)
return &ProxyVerificationContractStorage{ return &ProxyVerificationContractStorage{
rpcActorProvider: rpcActorProvider, rpcActorProvider: rpcActorProvider,
acc: wallet.NewAccountFromPrivateKey(key), cosigners: []actor.SignerAccount{
{
proxyScriptHash: proxyScriptHash, Signer: transaction.Signer{
Account: proxyScriptHash,
Scopes: transaction.CustomContracts,
AllowedContracts: []util.Uint160{policyScriptHash},
},
Account: notary.FakeContractAccount(proxyScriptHash),
},
{
Signer: transaction.Signer{
Account: acc.Contract.ScriptHash(),
Scopes: transaction.CalledByEntry,
},
Account: acc,
},
},
policyScriptHash: policyScriptHash, policyScriptHash: policyScriptHash,
} }
@ -64,7 +77,7 @@ func (n *contractStorageActorAdapter) GetRPCInvoker() invoker.RPCInvoke {
func (contractStorage *ProxyVerificationContractStorage) newContractStorageActor() (policy_morph.ContractStorageActor, error) { func (contractStorage *ProxyVerificationContractStorage) newContractStorageActor() (policy_morph.ContractStorageActor, error) {
rpcActor := contractStorage.rpcActorProvider.GetRPCActor() rpcActor := contractStorage.rpcActorProvider.GetRPCActor()
act, err := actor.New(rpcActor, cosigners(contractStorage.acc, contractStorage.proxyScriptHash, contractStorage.policyScriptHash)) act, err := actor.New(rpcActor, contractStorage.cosigners)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,31 +111,16 @@ func (contractStorage *ProxyVerificationContractStorage) RemoveMorphRuleChain(na
// ListMorphRuleChains lists morph rule chains from Policy contract using both Proxy contract and storage account as consigners. // ListMorphRuleChains lists morph rule chains from Policy contract using both Proxy contract and storage account as consigners.
func (contractStorage *ProxyVerificationContractStorage) ListMorphRuleChains(name chain.Name, target engine.Target) ([]*chain.Chain, error) { func (contractStorage *ProxyVerificationContractStorage) ListMorphRuleChains(name chain.Name, target engine.Target) ([]*chain.Chain, error) {
// contractStorageActor is reconstructed per each method invocation because RPCActor's (that is, basically, WSClient) connection may get invalidated, but rpcActor := contractStorage.rpcActorProvider.GetRPCActor()
// ProxyVerificationContractStorage does not manage reconnections. inv := &invokerAdapter{Invoker: invoker.New(rpcActor, nil), rpcInvoker: rpcActor}
contractStorageActor, err := contractStorage.newContractStorageActor() return policy_morph.NewContractStorageReader(inv, contractStorage.policyScriptHash).ListMorphRuleChains(name, target)
if err != nil {
return nil, err
}
return policy_morph.NewContractStorage(contractStorageActor, contractStorage.policyScriptHash).ListMorphRuleChains(name, target)
} }
func cosigners(acc *wallet.Account, proxyScriptHash, policyScriptHash util.Uint160) []actor.SignerAccount { type invokerAdapter struct {
return []actor.SignerAccount{ *invoker.Invoker
{ rpcInvoker invoker.RPCInvoke
Signer: transaction.Signer{
Account: proxyScriptHash,
Scopes: transaction.CustomContracts,
AllowedContracts: []util.Uint160{policyScriptHash},
},
Account: notary.FakeContractAccount(proxyScriptHash),
},
{
Signer: transaction.Signer{
Account: acc.Contract.ScriptHash(),
Scopes: transaction.CalledByEntry,
},
Account: acc,
},
} }
func (n *invokerAdapter) GetRPCInvoker() invoker.RPCInvoke {
return n.rpcInvoker
} }

View file

@ -38,10 +38,7 @@ import (
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper, func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
alphaSync event.Handler, alphaSync event.Handler,
) error { ) error {
locodeValidator, err := s.newLocodeValidator(cfg) locodeValidator := s.newLocodeValidator(cfg)
if err != nil {
return err
}
netSettings := (*networkSettings)(s.netmapClient) netSettings := (*networkSettings)(s.netmapClient)
@ -51,6 +48,7 @@ func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
poolSize := cfg.GetInt("workers.netmap") poolSize := cfg.GetInt("workers.netmap")
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize)) s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
var err error
s.netmapProcessor, err = netmap.New(&netmap.Params{ s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log, Log: s.log,
Metrics: s.irMetrics, Metrics: s.irMetrics,

View file

@ -9,7 +9,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
) )
func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, error) { func (s *Server) newLocodeValidator(cfg *viper.Viper) netmap.NodeValidator {
locodeDB := locodebolt.New(locodebolt.Prm{ locodeDB := locodebolt.New(locodebolt.Prm{
Path: cfg.GetString("locode.db.path"), Path: cfg.GetString("locode.db.path"),
}, },
@ -21,7 +21,7 @@ func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, err
return irlocode.New(irlocode.Prm{ return irlocode.New(irlocode.Prm{
DB: (*locodeBoltDBWrapper)(locodeDB), DB: (*locodeBoltDBWrapper)(locodeDB),
}), nil })
} }
type locodeBoltEntryWrapper struct { type locodeBoltEntryWrapper struct {

View file

@ -129,7 +129,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
}) })
}) })
if err != nil { if err != nil {
return fmt.Errorf("can't determine DB size: %w", err) return fmt.Errorf("determine DB size: %w", err)
} }
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly { if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items)) b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
@ -140,7 +140,7 @@ func (b *Blobovnicza) initializeCounters(ctx context.Context) error {
return saveItemsCount(tx, items) return saveItemsCount(tx, items)
}); err != nil { }); err != nil {
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items)) b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err) return fmt.Errorf("save blobovnicza's size and items count: %w", err)
} }
b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items)) b.log.Debug(ctx, logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
} }

View file

@ -146,7 +146,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
if prm.ignoreErrors { if prm.ignoreErrors {
return nil return nil
} }
return fmt.Errorf("could not decode address key: %w", err) return fmt.Errorf("decode address key: %w", err)
} }
} }

View file

@ -19,7 +19,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var errObjectIsDeleteProtected = errors.New("object is delete protected") var (
errObjectIsDeleteProtected = errors.New("object is delete protected")
deleteRes = common.DeleteRes{}
)
// Delete deletes object from blobovnicza tree. // Delete deletes object from blobovnicza tree.
// //
@ -43,17 +46,17 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
defer span.End() defer span.End()
if b.readOnly { if b.readOnly {
return common.DeleteRes{}, common.ErrReadOnly return deleteRes, common.ErrReadOnly
} }
if b.rebuildGuard.TryRLock() { if b.rebuildGuard.TryRLock() {
defer b.rebuildGuard.RUnlock() defer b.rebuildGuard.RUnlock()
} else { } else {
return common.DeleteRes{}, errRebuildInProgress return deleteRes, errRebuildInProgress
} }
if b.deleteProtectedObjects.Contains(prm.Address) { if b.deleteProtectedObjects.Contains(prm.Address) {
return common.DeleteRes{}, errObjectIsDeleteProtected return deleteRes, errObjectIsDeleteProtected
} }
var bPrm blobovnicza.DeletePrm var bPrm blobovnicza.DeletePrm
@ -98,7 +101,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
if err == nil && !objectFound { if err == nil && !objectFound {
// not found in any blobovnicza // not found in any blobovnicza
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) return deleteRes, logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
success = err == nil success = err == nil
@ -112,7 +115,7 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
shBlz := b.getBlobovnicza(ctx, blzPath) shBlz := b.getBlobovnicza(ctx, blzPath)
blz, err := shBlz.Open(ctx) blz, err := shBlz.Open(ctx)
if err != nil { if err != nil {
return common.DeleteRes{}, err return deleteRes, err
} }
defer shBlz.Close(ctx) defer shBlz.Close(ctx)
@ -122,5 +125,5 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
// removes object from blobovnicza and returns common.DeleteRes. // removes object from blobovnicza and returns common.DeleteRes.
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) { func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
_, err := blz.Delete(ctx, prm) _, err := blz.Delete(ctx, prm)
return common.DeleteRes{}, err return deleteRes, err
} }

View file

@ -115,13 +115,13 @@ func (b *Blobovniczas) getObject(ctx context.Context, blz *blobovnicza.Blobovnic
// decompress the data // decompress the data
data, err := b.compression.Decompress(res.Object()) data, err := b.compression.Decompress(res.Object())
if err != nil { if err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err) return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
} }
// unmarshal the object // unmarshal the object
obj := objectSDK.New() obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil { if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err) return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
} }
return common.GetRes{Object: obj, RawData: data}, nil return common.GetRes{Object: obj, RawData: data}, nil

View file

@ -130,13 +130,13 @@ func (b *Blobovniczas) getObjectRange(ctx context.Context, blz *blobovnicza.Blob
// decompress the data // decompress the data
data, err := b.compression.Decompress(res.Object()) data, err := b.compression.Decompress(res.Object())
if err != nil { if err != nil {
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err) return common.GetRangeRes{}, fmt.Errorf("decompress object data: %w", err)
} }
// unmarshal the object // unmarshal the object
obj := objectSDK.New() obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil { if err := obj.Unmarshal(data); err != nil {
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err) return common.GetRangeRes{}, fmt.Errorf("unmarshal the object: %w", err)
} }
from := prm.Range.GetOffset() from := prm.Range.GetOffset()

View file

@ -49,7 +49,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
zap.String("root_path", b.rootPath)) zap.String("root_path", b.rootPath))
return nil return nil
} }
return fmt.Errorf("could not decompress object data: %w", err) return fmt.Errorf("decompress object data: %w", err)
} }
if prm.Handler != nil { if prm.Handler != nil {
@ -82,7 +82,7 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
zap.String("root_path", b.rootPath)) zap.String("root_path", b.rootPath))
return false, nil return false, nil
} }
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err) return false, fmt.Errorf("open blobovnicza %s: %w", p, err)
} }
defer shBlz.Close(ctx) defer shBlz.Close(ctx)
@ -249,6 +249,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
} }
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
sysPath := filepath.Join(b.rootPath, path) sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath) entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode

View file

@ -69,10 +69,10 @@ func (b *sharedDB) Open(ctx context.Context) (*blobovnicza.Blobovnicza, error) {
)...) )...)
if err := blz.Open(ctx); err != nil { if err := blz.Open(ctx); err != nil {
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err) return nil, fmt.Errorf("open blobovnicza %s: %w", b.path, err)
} }
if err := blz.Init(ctx); err != nil { if err := blz.Init(ctx); err != nil {
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err) return nil, fmt.Errorf("init blobovnicza %s: %w", b.path, err)
} }
b.refCount++ b.refCount++
@ -127,7 +127,7 @@ func (b *sharedDB) CloseAndRemoveFile(ctx context.Context) error {
zap.String("id", b.path), zap.String("id", b.path),
zap.Error(err), zap.Error(err),
) )
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err) return fmt.Errorf("close blobovnicza (path = %s): %w", b.path, err)
} }
b.refCount = 0 b.refCount = 0

View file

@ -538,7 +538,7 @@ func (t *FSTree) countFiles() (uint64, uint64, error) {
}, },
) )
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) return 0, 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
} }
return count, size, nil return count, size, nil
@ -577,7 +577,7 @@ func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
}, },
) )
if err != nil { if err != nil {
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) return 0, fmt.Errorf("walk through %s directory: %w", t.RootPath, err)
} }
success = true success = true
return result, nil return result, nil

View file

@ -136,6 +136,6 @@ func (w *genericWriter) removeWithCounter(p string, size uint64) error {
if err := os.Remove(p); err != nil { if err := os.Remove(p); err != nil {
return err return err
} }
w.fileCounter.Dec(uint64(size)) w.fileCounter.Dec(size)
return nil return nil
} }

View file

@ -69,10 +69,13 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
if err != nil { if err != nil {
return err return err
} }
written := 0
tmpPath := "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10) tmpPath := "/proc/self/fd/" + strconv.FormatUint(uint64(fd), 10)
n, err := unix.Write(fd, data) n, err := unix.Write(fd, data)
if err == nil { for err == nil {
if n == len(data) { written += n
if written == len(data) {
err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW) err = unix.Linkat(unix.AT_FDCWD, tmpPath, unix.AT_FDCWD, p, unix.AT_SYMLINK_FOLLOW)
if err == nil { if err == nil {
w.fileCounter.Inc(uint64(len(data))) w.fileCounter.Inc(uint64(len(data)))
@ -80,9 +83,23 @@ func (w *linuxWriter) writeFile(p string, data []byte) error {
if errors.Is(err, unix.EEXIST) { if errors.Is(err, unix.EEXIST) {
err = nil err = nil
} }
} else { break
err = errors.New("incomplete write")
} }
// From man 2 write:
// https://www.man7.org/linux/man-pages/man2/write.2.html
//
// Note that a successful write() may transfer fewer than count
// bytes. Such partial writes can occur for various reasons; for
// example, because there was insufficient space on the disk device
// to write all of the requested bytes, or because a blocked write()
// to a socket, pipe, or similar was interrupted by a signal handler
// after it had transferred some, but before it had transferred all
// of the requested bytes. In the event of a partial write, the
// caller can make another write() call to transfer the remaining
// bytes. The subsequent call will either transfer further bytes or
// may result in an error (e.g., if the disk is now full).
n, err = unix.Write(fd, data[written:])
} }
errClose := unix.Close(fd) errClose := unix.Close(fd)
if err != nil { if err != nil {
@ -114,7 +131,7 @@ func (w *linuxWriter) removeFile(p string, size uint64) error {
return logicerr.Wrap(new(apistatus.ObjectNotFound)) return logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
if err == nil { if err == nil {
w.fileCounter.Dec(uint64(size)) w.fileCounter.Dec(size)
} }
return err return err
} }

View file

@ -0,0 +1,42 @@
//go:build linux && integration
package fstree
import (
"context"
"errors"
"os"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
)
func TestENOSPC(t *testing.T) {
dir, err := os.MkdirTemp(t.TempDir(), "ramdisk")
require.NoError(t, err)
f, err := os.CreateTemp(t.TempDir(), "ramdisk_*")
require.NoError(t, err)
err = unix.Mount(f.Name(), dir, "tmpfs", 0, "size=1M")
if errors.Is(err, unix.EPERM) {
t.Skipf("skip size tests: no permission to mount: %v", err)
return
}
require.NoError(t, err)
defer func() {
require.NoError(t, unix.Unmount(dir, 0))
}()
fst := New(WithPath(dir), WithDepth(1))
require.NoError(t, fst.Open(mode.ComponentReadWrite))
require.NoError(t, fst.Init())
_, err = fst.Put(context.Background(), common.PutPrm{
RawData: make([]byte, 10<<20),
})
require.ErrorIs(t, err, common.ErrNoSpace)
}

View file

@ -47,13 +47,13 @@ func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes,
// Decompress the data. // Decompress the data.
var err error var err error
if data, err = s.compression.Decompress(data); err != nil { if data, err = s.compression.Decompress(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err) return common.GetRes{}, fmt.Errorf("decompress object data: %w", err)
} }
// Unmarshal the SDK object. // Unmarshal the SDK object.
obj := objectSDK.New() obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil { if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err) return common.GetRes{}, fmt.Errorf("unmarshal the object: %w", err)
} }
return common.GetRes{Object: obj, RawData: data}, nil return common.GetRes{Object: obj, RawData: data}, nil
@ -133,11 +133,11 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
elem := common.IterationElement{ elem := common.IterationElement{
ObjectData: v, ObjectData: v,
} }
if err := elem.Address.DecodeString(string(k)); err != nil { if err := elem.Address.DecodeString(k); err != nil {
if req.IgnoreErrors { if req.IgnoreErrors {
continue continue
} }
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err)) return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, k, err))
} }
var err error var err error
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil { if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {

View file

@ -27,7 +27,7 @@ func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
} }
} }
if err != nil { if err != nil {
return fmt.Errorf("can't set blobstor mode (old=%s, new=%s): %w", b.mode, m, err) return fmt.Errorf("set blobstor mode (old=%s, new=%s): %w", b.mode, m, err)
} }
b.mode = m b.mode = m

View file

@ -52,7 +52,7 @@ func (b *BlobStor) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, e
// marshal object // marshal object
data, err := prm.Object.Marshal() data, err := prm.Object.Marshal()
if err != nil { if err != nil {
return common.PutRes{}, fmt.Errorf("could not marshal the object: %w", err) return common.PutRes{}, fmt.Errorf("marshal the object: %w", err)
} }
prm.RawData = data prm.RawData = data
} }

View file

@ -48,8 +48,8 @@ func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm)
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)() defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e.containerSize(ctx, prm) res = e.containerSize(ctx, prm)
return err return nil
}) })
return return
@ -69,7 +69,7 @@ func ContainerSize(ctx context.Context, e *StorageEngine, id cid.ID) (uint64, er
return res.Size(), nil return res.Size(), nil
} }
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) { func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
var csPrm shard.ContainerSizePrm var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr) csPrm.SetContainerID(prm.cnr)
@ -96,8 +96,8 @@ func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm)
defer elapsed("ListContainers", e.metrics.AddMethodDuration)() defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e.listContainers(ctx) res = e.listContainers(ctx)
return err return nil
}) })
return return
@ -115,7 +115,7 @@ func ListContainers(ctx context.Context, e *StorageEngine) ([]cid.ID, error) {
return res.Containers(), nil return res.Containers(), nil
} }
func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes, error) { func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes {
uniqueIDs := make(map[string]cid.ID) uniqueIDs := make(map[string]cid.ID)
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
@ -142,5 +142,5 @@ func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes,
return ListContainersRes{ return ListContainersRes{
containers: result, containers: result,
}, nil }
} }

View file

@ -95,7 +95,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
err := eg.Wait() err := eg.Wait()
close(errCh) close(errCh)
if err != nil { if err != nil {
return fmt.Errorf("failed to initialize shards: %w", err) return fmt.Errorf("initialize shards: %w", err)
} }
for res := range errCh { for res := range errCh {
@ -117,7 +117,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
continue continue
} }
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err) return fmt.Errorf("initialize shard %s: %w", res.id, res.err)
} }
} }
@ -320,7 +320,7 @@ loop:
for _, newID := range shardsToAdd { for _, newID := range shardsToAdd {
sh, err := e.createShard(ctx, rcfg.shards[newID]) sh, err := e.createShard(ctx, rcfg.shards[newID])
if err != nil { if err != nil {
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err) return fmt.Errorf("add new shard with '%s' metabase path: %w", newID, err)
} }
idStr := sh.ID().String() idStr := sh.ID().String()
@ -331,13 +331,13 @@ loop:
} }
if err != nil { if err != nil {
_ = sh.Close(ctx) _ = sh.Close(ctx)
return fmt.Errorf("could not init %s shard: %w", idStr, err) return fmt.Errorf("init %s shard: %w", idStr, err)
} }
err = e.addShard(sh) err = e.addShard(sh)
if err != nil { if err != nil {
_ = sh.Close(ctx) _ = sh.Close(ctx)
return fmt.Errorf("could not add %s shard: %w", idStr, err) return fmt.Errorf("add %s shard: %w", idStr, err)
} }
e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr)) e.log.Info(ctx, logs.EngineAddedNewShard, zap.String("id", idStr))

View file

@ -24,9 +24,6 @@ type DeletePrm struct {
forceRemoval bool forceRemoval bool
} }
// DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct{}
// WithAddress is a Delete option to set the addresses of the objects to delete. // WithAddress is a Delete option to set the addresses of the objects to delete.
// //
// Option is required. // Option is required.
@ -51,7 +48,7 @@ func (p *DeletePrm) WithForceRemoval() {
// NOTE: Marks any object to be deleted (despite any prohibitions // NOTE: Marks any object to be deleted (despite any prohibitions
// on operations with that object) if WithForceRemoval option has // on operations with that object) if WithForceRemoval option has
// been provided. // been provided.
func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRes, err error) { func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()), attribute.String("address", prm.addr.EncodeToString()),
@ -60,15 +57,12 @@ func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRe
defer span.End() defer span.End()
defer elapsed("Delete", e.metrics.AddMethodDuration)() defer elapsed("Delete", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { return e.execIfNotBlocked(func() error {
res, err = e.delete(ctx, prm) return e.delete(ctx, prm)
return err
}) })
return
} }
func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) error {
var locked struct { var locked struct {
is bool is bool
} }
@ -126,14 +120,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
}) })
if locked.is { if locked.is {
return DeleteRes{}, new(apistatus.ObjectLocked) return new(apistatus.ObjectLocked)
} }
if splitInfo != nil { if splitInfo != nil {
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID()) e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
} }
return DeleteRes{}, nil return nil
} }
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) { func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {

View file

@ -70,8 +70,7 @@ func TestDeleteBigObject(t *testing.T) {
deletePrm.WithForceRemoval() deletePrm.WithForceRemoval()
deletePrm.WithAddress(addrParent) deletePrm.WithAddress(addrParent)
_, err := e.Delete(context.Background(), deletePrm) require.NoError(t, e.Delete(context.Background(), deletePrm))
require.NoError(t, err)
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
@ -141,8 +140,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
deletePrm.WithForceRemoval() deletePrm.WithForceRemoval()
deletePrm.WithAddress(addrParent) deletePrm.WithAddress(addrParent)
_, err := e.Delete(context.Background(), deletePrm) require.NoError(t, e.Delete(context.Background(), deletePrm))
require.NoError(t, err)
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
@ -153,7 +151,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
// delete physical // delete physical
var delPrm shard.DeletePrm var delPrm shard.DeletePrm
delPrm.SetAddresses(addrParent) delPrm.SetAddresses(addrParent)
_, err = s1.Delete(context.Background(), delPrm) _, err := s1.Delete(context.Background(), delPrm)
require.NoError(t, err) require.NoError(t, err)
delPrm.SetAddresses(addrLink) delPrm.SetAddresses(addrLink)

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"slices"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -255,8 +256,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro
copyShards := func() []pooledShard { copyShards := func() []pooledShard {
mtx.RLock() mtx.RLock()
defer mtx.RUnlock() defer mtx.RUnlock()
t := make([]pooledShard, len(shards)) t := slices.Clone(shards)
copy(t, shards)
return t return t
} }
eg.Go(func() error { eg.Go(func() error {
@ -578,7 +578,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) { func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
if prm.TreeHandler == nil { if prm.TreeHandler == nil {
return false, "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID()) return false, "", fmt.Errorf("evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
} }
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh) return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
@ -724,7 +724,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
shards := make([]pooledShard, 0, len(e.shards)) shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards { for id := range e.shards {
shards = append(shards, pooledShard{ shards = append(shards, pooledShard{
hashedShard: hashedShard(e.shards[id]), hashedShard: e.shards[id],
pool: e.shardPools[id], pool: e.shardPools[id],
}) })
} }

View file

@ -3,6 +3,7 @@ package engine
import ( import (
"context" "context"
"fmt" "fmt"
"slices"
"sync" "sync"
"time" "time"
@ -123,8 +124,7 @@ func (s *EvacuationState) DeepCopy() *EvacuationState {
if s == nil { if s == nil {
return nil return nil
} }
shardIDs := make([]string, len(s.shardIDs)) shardIDs := slices.Clone(s.shardIDs)
copy(shardIDs, s.shardIDs)
return &EvacuationState{ return &EvacuationState{
shardIDs: shardIDs, shardIDs: shardIDs,

View file

@ -27,9 +27,6 @@ type InhumePrm struct {
forceRemoval bool forceRemoval bool
} }
// InhumeRes encapsulates results of inhume operation.
type InhumeRes struct{}
// WithTarget sets a list of objects that should be inhumed and tombstone address // WithTarget sets a list of objects that should be inhumed and tombstone address
// as the reason for inhume operation. // as the reason for inhume operation.
// //
@ -67,23 +64,20 @@ var errInhumeFailure = errors.New("inhume operation failed")
// with that object) if WithForceRemoval option has been provided. // with that object) if WithForceRemoval option has been provided.
// //
// Returns an error if executions are blocked (see BlockExecution). // Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) { func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume") ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume")
defer span.End() defer span.End()
defer elapsed("Inhume", e.metrics.AddMethodDuration)() defer elapsed("Inhume", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { return e.execIfNotBlocked(func() error {
res, err = e.inhume(ctx, prm) return e.inhume(ctx, prm)
return err
}) })
return
} }
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) error {
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval) addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
if err != nil { if err != nil {
return InhumeRes{}, err return err
} }
var shPrm shard.InhumePrm var shPrm shard.InhumePrm
@ -107,7 +101,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
zap.String("shard_id", shardID), zap.String("shard_id", shardID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
) )
return InhumeRes{}, errInhumeFailure return errInhumeFailure
} }
if _, err := sh.Inhume(ctx, shPrm); err != nil { if _, err := sh.Inhume(ctx, shPrm); err != nil {
@ -119,11 +113,11 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
default: default:
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err) e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
} }
return InhumeRes{}, err return err
} }
} }
return InhumeRes{}, nil return nil
} }
// groupObjectsByShard groups objects based on the shard(s) they are stored on. // groupObjectsByShard groups objects based on the shard(s) they are stored on.

View file

@ -55,7 +55,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, false, fs) addrs, err := Select(context.Background(), e, cnr, false, fs)
@ -85,7 +85,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, false, fs) addrs, err := Select(context.Background(), e, cnr, false, fs)
@ -128,7 +128,7 @@ func TestStorageEngine_ECInhume(t *testing.T) {
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress) inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved var alreadyRemoved *apistatus.ObjectAlreadyRemoved
@ -173,7 +173,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
var prm InhumePrm var prm InhumePrm
prm.WithTarget(ts, object.AddressOf(obj)) prm.WithTarget(ts, object.AddressOf(obj))
_, err := engine.Inhume(context.Background(), prm) err := engine.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -182,7 +182,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
var prm InhumePrm var prm InhumePrm
prm.MarkAsGarbage(object.AddressOf(obj)) prm.MarkAsGarbage(object.AddressOf(obj))
_, err := engine.Inhume(context.Background(), prm) err := engine.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -237,7 +237,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
prm.WithTarget(ts, addrs...) prm.WithTarget(ts, addrs...)
b.StartTimer() b.StartTimer()
_, err := engine.Inhume(context.Background(), prm) err := engine.Inhume(context.Background(), prm)
require.NoError(b, err) require.NoError(b, err)
b.StopTimer() b.StopTimer()
} }

View file

@ -114,7 +114,7 @@ func TestLockUserScenario(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objAddr) inhumePrm.WithTarget(tombAddr, objAddr)
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 4. // 4.
@ -127,7 +127,7 @@ func TestLockUserScenario(t *testing.T) {
inhumePrm.WithTarget(tombForLockAddr, lockerAddr) inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorIs(t, err, meta.ErrLockObjectRemoval) require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5. // 5.
@ -136,7 +136,7 @@ func TestLockUserScenario(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objAddr) inhumePrm.WithTarget(tombAddr, objAddr)
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
return err == nil return err == nil
}, 30*time.Second, time.Second) }, 30*time.Second, time.Second)
} }
@ -200,7 +200,7 @@ func TestLockExpiration(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj)) inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 3. // 3.
@ -212,7 +212,7 @@ func TestLockExpiration(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj)) inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
return err == nil return err == nil
}, 30*time.Second, time.Second) }, 30*time.Second, time.Second)
} }
@ -270,12 +270,12 @@ func TestLockForceRemoval(t *testing.T) {
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 4. // 4.
@ -283,13 +283,12 @@ func TestLockForceRemoval(t *testing.T) {
deletePrm.WithAddress(objectcore.AddressOf(lock)) deletePrm.WithAddress(objectcore.AddressOf(lock))
deletePrm.WithForceRemoval() deletePrm.WithForceRemoval()
_, err = e.Delete(context.Background(), deletePrm) require.NoError(t, e.Delete(context.Background(), deletePrm))
require.NoError(t, err)
// 5. // 5.
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
} }

View file

@ -7,34 +7,12 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
) )
type MetricRegister interface { type (
AddMethodDuration(method string, d time.Duration) MetricRegister = metrics.EngineMetrics
GCMetrics = metrics.GCMetrics
SetObjectCounter(shardID, objectType string, v uint64) WriteCacheMetrics = metrics.WriteCacheMetrics
AddToObjectCounter(shardID, objectType string, delta int) NullBool = metrics.NullBool
)
SetMode(shardID string, mode mode.Mode)
AddToContainerSize(cnrID string, size int64)
DeleteContainerSize(cnrID string)
DeleteContainerCount(cnrID string)
AddToPayloadCounter(shardID string, size int64)
IncErrorCounter(shardID string)
ClearErrorCounter(shardID string)
DeleteShardMetrics(shardID string)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics
}
func elapsed(method string, addFunc func(method string, d time.Duration)) func() { func elapsed(method string, addFunc func(method string, d time.Duration)) func() {
t := time.Now() t := time.Now()
@ -77,8 +55,8 @@ type (
var ( var (
_ MetricRegister = noopMetrics{} _ MetricRegister = noopMetrics{}
_ metrics.WriteCacheMetrics = noopWriteCacheMetrics{} _ WriteCacheMetrics = noopWriteCacheMetrics{}
_ metrics.GCMetrics = noopGCMetrics{} _ GCMetrics = noopGCMetrics{}
) )
func (noopMetrics) AddMethodDuration(string, time.Duration) {} func (noopMetrics) AddMethodDuration(string, time.Duration) {}
@ -99,8 +77,8 @@ func (noopMetrics) IncRefillObjectsCount(string, string, int, bool) {}
func (noopMetrics) SetRefillPercent(string, string, uint32) {} func (noopMetrics) SetRefillPercent(string, string, uint32) {}
func (noopMetrics) SetRefillStatus(string, string, string) {} func (noopMetrics) SetRefillStatus(string, string, string) {}
func (noopMetrics) SetEvacuationInProgress(string, bool) {} func (noopMetrics) SetEvacuationInProgress(string, bool) {}
func (noopMetrics) WriteCache() metrics.WriteCacheMetrics { return noopWriteCacheMetrics{} } func (noopMetrics) WriteCache() WriteCacheMetrics { return noopWriteCacheMetrics{} }
func (noopMetrics) GC() metrics.GCMetrics { return noopGCMetrics{} } func (noopMetrics) GC() GCMetrics { return noopGCMetrics{} }
func (noopWriteCacheMetrics) AddMethodDuration(string, string, string, string, bool, time.Duration) {} func (noopWriteCacheMetrics) AddMethodDuration(string, string, string, string, bool, time.Duration) {}
func (noopWriteCacheMetrics) SetActualCount(string, string, string, uint64) {} func (noopWriteCacheMetrics) SetActualCount(string, string, string, uint64) {}

View file

@ -54,19 +54,17 @@ func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRe
defer elapsed("Select", e.metrics.AddMethodDuration)() defer elapsed("Select", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e._select(ctx, prm) res = e._select(ctx, prm)
return err return nil
}) })
return return
} }
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, error) { func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes {
addrList := make([]oid.Address, 0) addrList := make([]oid.Address, 0)
uniqueMap := make(map[string]struct{}) uniqueMap := make(map[string]struct{})
var outError error
var shPrm shard.SelectPrm var shPrm shard.SelectPrm
shPrm.SetContainerID(prm.cnr, prm.indexedContainer) shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
shPrm.SetFilters(prm.filters) shPrm.SetFilters(prm.filters)
@ -90,7 +88,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
return SelectRes{ return SelectRes{
addrList: addrList, addrList: addrList,
}, outError }
} }
// List returns `limit` available physically storage object addresses in engine. // List returns `limit` available physically storage object addresses in engine.
@ -100,14 +98,14 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) { func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
defer elapsed("List", e.metrics.AddMethodDuration)() defer elapsed("List", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e.list(ctx, limit) res = e.list(ctx, limit)
return err return nil
}) })
return return
} }
func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) { func (e *StorageEngine) list(ctx context.Context, limit uint64) SelectRes {
addrList := make([]oid.Address, 0, limit) addrList := make([]oid.Address, 0, limit)
uniqueMap := make(map[string]struct{}) uniqueMap := make(map[string]struct{})
ln := uint64(0) ln := uint64(0)
@ -136,7 +134,7 @@ func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, erro
return SelectRes{ return SelectRes{
addrList: addrList, addrList: addrList,
}, nil }
} }
// Select selects objects from local storage using provided filters. // Select selects objects from local storage using provided filters.

View file

@ -108,12 +108,12 @@ func (m *metricsWithID) SetEvacuationInProgress(value bool) {
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) { func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
sh, err := e.createShard(ctx, opts) sh, err := e.createShard(ctx, opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create a shard: %w", err) return nil, fmt.Errorf("create a shard: %w", err)
} }
err = e.addShard(sh) err = e.addShard(sh)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err) return nil, fmt.Errorf("add %s shard: %w", sh.ID().String(), err)
} }
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode()) e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
@ -124,7 +124,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) { func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
id, err := generateShardID() id, err := generateShardID()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err) return nil, fmt.Errorf("generate shard ID: %w", err)
} }
opts = e.appendMetrics(id, opts) opts = e.appendMetrics(id, opts)
@ -180,7 +180,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil { if err != nil {
return fmt.Errorf("could not create pool: %w", err) return fmt.Errorf("create pool: %w", err)
} }
strID := sh.ID().String() strID := sh.ID().String()
@ -272,7 +272,7 @@ func (e *StorageEngine) sortShards(objAddr interface{ EncodeToString() string })
h := hrw.StringHash(objAddr.EncodeToString()) h := hrw.StringHash(objAddr.EncodeToString())
shards := make([]hashedShard, 0, len(e.shards)) shards := make([]hashedShard, 0, len(e.shards))
for _, sh := range e.shards { for _, sh := range e.shards {
shards = append(shards, hashedShard(sh)) shards = append(shards, sh)
} }
hrw.SortHasherSliceByValue(shards, h) hrw.SortHasherSliceByValue(shards, h)
return shards return shards
@ -285,7 +285,7 @@ func (e *StorageEngine) unsortedShards() []hashedShard {
shards := make([]hashedShard, 0, len(e.shards)) shards := make([]hashedShard, 0, len(e.shards))
for _, sh := range e.shards { for _, sh := range e.shards {
shards = append(shards, hashedShard(sh)) shards = append(shards, sh)
} }
return shards return shards
@ -374,7 +374,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
zap.Error(err), zap.Error(err),
) )
multiErrGuard.Lock() multiErrGuard.Lock()
multiErr = errors.Join(multiErr, fmt.Errorf("could not change shard (id:%s) mode to disabled: %w", sh.ID(), err)) multiErr = errors.Join(multiErr, fmt.Errorf("change shard (id:%s) mode to disabled: %w", sh.ID(), err))
multiErrGuard.Unlock() multiErrGuard.Unlock()
} }
@ -385,7 +385,7 @@ func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedS
zap.Error(err), zap.Error(err),
) )
multiErrGuard.Lock() multiErrGuard.Lock()
multiErr = errors.Join(multiErr, fmt.Errorf("could not close removed shard (id:%s): %w", sh.ID(), err)) multiErr = errors.Join(multiErr, fmt.Errorf("close removed shard (id:%s): %w", sh.ID(), err))
multiErrGuard.Unlock() multiErrGuard.Unlock()
} }
return nil return nil

View file

@ -56,7 +56,7 @@ func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) {
return result, err return result, err
} }
func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) { func (db *DB) ContainerSize(id cid.ID) (uint64, error) {
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
@ -64,21 +64,22 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
return 0, ErrDegradedMode return 0, ErrDegradedMode
} }
err = db.boltDB.View(func(tx *bbolt.Tx) error { var size uint64
size, err = db.containerSize(tx, id) err := db.boltDB.View(func(tx *bbolt.Tx) error {
size = db.containerSize(tx, id)
return err return nil
}) })
return size, metaerr.Wrap(err) return size, metaerr.Wrap(err)
} }
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) { func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) uint64 {
containerVolume := tx.Bucket(containerVolumeBucketName) containerVolume := tx.Bucket(containerVolumeBucketName)
key := make([]byte, cidSize) key := make([]byte, cidSize)
id.Encode(key) id.Encode(key)
return parseContainerSize(containerVolume.Get(key)), nil return parseContainerSize(containerVolume.Get(key))
} }
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool { func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {

View file

@ -54,7 +54,7 @@ func (db *DB) Open(ctx context.Context, m mode.Mode) error {
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error { func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
if err != nil { if err != nil {
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err) return fmt.Errorf("create dir %s for metabase: %w", db.info.Path, err)
} }
db.log.Debug(ctx, logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path)) db.log.Debug(ctx, logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
@ -73,7 +73,7 @@ func (db *DB) openBolt(ctx context.Context) error {
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
if err != nil { if err != nil {
return fmt.Errorf("can't open boltDB database: %w", err) return fmt.Errorf("open boltDB database: %w", err)
} }
db.boltDB.MaxBatchDelay = db.boltBatchDelay db.boltDB.MaxBatchDelay = db.boltBatchDelay
db.boltDB.MaxBatchSize = db.boltBatchSize db.boltDB.MaxBatchSize = db.boltBatchSize
@ -145,27 +145,27 @@ func (db *DB) init(reset bool) error {
if reset { if reset {
err := tx.DeleteBucket(name) err := tx.DeleteBucket(name)
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("could not delete static bucket %s: %w", k, err) return fmt.Errorf("delete static bucket %s: %w", k, err)
} }
} }
_, err := tx.CreateBucketIfNotExists(name) _, err := tx.CreateBucketIfNotExists(name)
if err != nil { if err != nil {
return fmt.Errorf("could not create static bucket %s: %w", k, err) return fmt.Errorf("create static bucket %s: %w", k, err)
} }
} }
for _, b := range deprecatedBuckets { for _, b := range deprecatedBuckets {
err := tx.DeleteBucket(b) err := tx.DeleteBucket(b)
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err) return fmt.Errorf("delete deprecated bucket %s: %w", string(b), err)
} }
} }
if !reset { // counters will be recalculated by refill metabase if !reset { // counters will be recalculated by refill metabase
err = syncCounter(tx, false) err = syncCounter(tx, false)
if err != nil { if err != nil {
return fmt.Errorf("could not sync object counter: %w", err) return fmt.Errorf("sync object counter: %w", err)
} }
return nil return nil

View file

@ -238,26 +238,26 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
} }
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil { if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err) return fmt.Errorf("increase phy object counter: %w", err)
} }
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil { if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err) return fmt.Errorf("increase logical object counter: %w", err)
} }
if isUserObject { if isUserObject {
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil { if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
return fmt.Errorf("could not increase user object counter: %w", err) return fmt.Errorf("increase user object counter: %w", err)
} }
} }
return db.incContainerObjectCounter(tx, cnrID, isUserObject) return db.incContainerObjectCounter(tx, cnrID, isUserObject)
} }
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
b := tx.Bucket(shardInfoBucket) b := tx.Bucket(shardInfoBucket)
if b == nil { if b == nil {
return nil return nil
} }
return db.updateShardObjectCounterBucket(b, typ, delta, inc) return db.updateShardObjectCounterBucket(b, typ, delta, false)
} }
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error { func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
@ -362,7 +362,7 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject
func syncCounter(tx *bbolt.Tx, force bool) error { func syncCounter(tx *bbolt.Tx, force bool) error {
shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket) shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket)
if err != nil { if err != nil {
return fmt.Errorf("could not get shard info bucket: %w", err) return fmt.Errorf("get shard info bucket: %w", err)
} }
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
len(shardInfoB.Get(objectLogicCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
@ -375,7 +375,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName) containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName)
if err != nil { if err != nil {
return fmt.Errorf("could not get container counter bucket: %w", err) return fmt.Errorf("get container counter bucket: %w", err)
} }
var addr oid.Address var addr oid.Address
@ -428,7 +428,7 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
return nil return nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("could not iterate objects: %w", err) return fmt.Errorf("iterate objects: %w", err)
} }
return setObjectCounters(counters, shardInfoB, containerCounterB) return setObjectCounters(counters, shardInfoB, containerCounterB)
@ -448,7 +448,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
value := containerCounterValue(count) value := containerCounterValue(count)
err := containerCounterB.Put(key, value) err := containerCounterB.Put(key, value)
if err != nil { if err != nil {
return fmt.Errorf("could not update phy container object counter: %w", err) return fmt.Errorf("update phy container object counter: %w", err)
} }
} }
phyData := make([]byte, 8) phyData := make([]byte, 8)
@ -456,7 +456,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
err := shardInfoB.Put(objectPhyCounterKey, phyData) err := shardInfoB.Put(objectPhyCounterKey, phyData)
if err != nil { if err != nil {
return fmt.Errorf("could not update phy object counter: %w", err) return fmt.Errorf("update phy object counter: %w", err)
} }
logData := make([]byte, 8) logData := make([]byte, 8)
@ -464,7 +464,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
err = shardInfoB.Put(objectLogicCounterKey, logData) err = shardInfoB.Put(objectLogicCounterKey, logData)
if err != nil { if err != nil {
return fmt.Errorf("could not update logic object counter: %w", err) return fmt.Errorf("update logic object counter: %w", err)
} }
userData := make([]byte, 8) userData := make([]byte, 8)
@ -472,7 +472,7 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
err = shardInfoB.Put(objectUserCounterKey, userData) err = shardInfoB.Put(objectUserCounterKey, userData)
if err != nil { if err != nil {
return fmt.Errorf("could not update user object counter: %w", err) return fmt.Errorf("update user object counter: %w", err)
} }
return nil return nil
@ -492,7 +492,7 @@ func parseContainerCounterKey(buf []byte) (cid.ID, error) {
} }
var cnrID cid.ID var cnrID cid.ID
if err := cnrID.Decode(buf); err != nil { if err := cnrID.Decode(buf); err != nil {
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err) return cid.ID{}, fmt.Errorf("decode container ID: %w", err)
} }
return cnrID, nil return cnrID, nil
} }

View file

@ -161,28 +161,28 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error { func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
if res.phyCount > 0 { if res.phyCount > 0 {
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false) err := db.decShardObjectCounter(tx, phy, res.phyCount)
if err != nil { if err != nil {
return fmt.Errorf("could not decrease phy object counter: %w", err) return fmt.Errorf("decrease phy object counter: %w", err)
} }
} }
if res.logicCount > 0 { if res.logicCount > 0 {
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false) err := db.decShardObjectCounter(tx, logical, res.logicCount)
if err != nil { if err != nil {
return fmt.Errorf("could not decrease logical object counter: %w", err) return fmt.Errorf("decrease logical object counter: %w", err)
} }
} }
if res.userCount > 0 { if res.userCount > 0 {
err := db.updateShardObjectCounter(tx, user, res.userCount, false) err := db.decShardObjectCounter(tx, user, res.userCount)
if err != nil { if err != nil {
return fmt.Errorf("could not decrease user object counter: %w", err) return fmt.Errorf("decrease user object counter: %w", err)
} }
} }
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil { if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
return fmt.Errorf("could not decrease container object counter: %w", err) return fmt.Errorf("decrease container object counter: %w", err)
} }
return nil return nil
} }
@ -259,7 +259,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
if garbageBKT != nil { if garbageBKT != nil {
err := garbageBKT.Delete(addrKey) err := garbageBKT.Delete(addrKey)
if err != nil { if err != nil {
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
} }
} }
return deleteSingleResult{}, nil return deleteSingleResult{}, nil
@ -280,7 +280,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
if garbageBKT != nil { if garbageBKT != nil {
err := garbageBKT.Delete(addrKey) err := garbageBKT.Delete(addrKey)
if err != nil { if err != nil {
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) return deleteSingleResult{}, fmt.Errorf("remove from garbage bucket: %w", err)
} }
} }
@ -308,7 +308,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
// remove object // remove object
err = db.deleteObject(tx, obj, false) err = db.deleteObject(tx, obj, false)
if err != nil { if err != nil {
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err) return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
} }
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil { if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
@ -335,12 +335,12 @@ func (db *DB) deleteObject(
err = updateListIndexes(tx, obj, delListIndexItem) err = updateListIndexes(tx, obj, delListIndexItem)
if err != nil { if err != nil {
return fmt.Errorf("can't remove list indexes: %w", err) return fmt.Errorf("remove list indexes: %w", err)
} }
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem) err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
if err != nil { if err != nil {
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err) return fmt.Errorf("remove fake bucket tree indexes: %w", err)
} }
if isParent { if isParent {
@ -351,7 +351,7 @@ func (db *DB) deleteObject(
addrKey := addressKey(object.AddressOf(obj), key) addrKey := addressKey(object.AddressOf(obj), key)
err := garbageBKT.Delete(addrKey) err := garbageBKT.Delete(addrKey)
if err != nil { if err != nil {
return fmt.Errorf("could not remove from garbage bucket: %w", err) return fmt.Errorf("remove from garbage bucket: %w", err)
} }
} }
} }
@ -529,7 +529,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize)) addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
err := garbageBKT.Delete(addrKey) err := garbageBKT.Delete(addrKey)
if err != nil { if err != nil {
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err) return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
} }
} }
@ -567,7 +567,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize)) addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
err := garbageBKT.Delete(addrKey) err := garbageBKT.Delete(addrKey)
if err != nil { if err != nil {
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err) return fmt.Errorf("remove EC parent from garbage bucket: %w", err)
} }
} }

View file

@ -1,7 +1,6 @@
package meta package meta
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"time" "time"
@ -227,9 +226,9 @@ func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, e
splitInfo := objectSDK.NewSplitInfo() splitInfo := objectSDK.NewSplitInfo()
err := splitInfo.Unmarshal(bytes.Clone(rawSplitInfo)) err := splitInfo.Unmarshal(rawSplitInfo)
if err != nil { if err != nil {
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err) return nil, fmt.Errorf("unmarshal split info from root index: %w", err)
} }
return splitInfo, nil return splitInfo, nil

View file

@ -1,7 +1,6 @@
package meta package meta
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"time" "time"
@ -112,7 +111,7 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
// check in primary index // check in primary index
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key) data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key)
if len(data) != 0 { if len(data) != 0 {
return obj, obj.Unmarshal(bytes.Clone(data)) return obj, obj.Unmarshal(data)
} }
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
@ -123,13 +122,13 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
// if not found then check in tombstone index // if not found then check in tombstone index
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
if len(data) != 0 { if len(data) != 0 {
return obj, obj.Unmarshal(bytes.Clone(data)) return obj, obj.Unmarshal(data)
} }
// if not found then check in locker index // if not found then check in locker index
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key) data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
if len(data) != 0 { if len(data) != 0 {
return obj, obj.Unmarshal(bytes.Clone(data)) return obj, obj.Unmarshal(data)
} }
// if not found then check if object is a virtual // if not found then check if object is a virtual
@ -185,9 +184,9 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
child := objectSDK.New() child := objectSDK.New()
err = child.Unmarshal(bytes.Clone(data)) err = child.Unmarshal(data)
if err != nil { if err != nil {
return nil, fmt.Errorf("can't unmarshal child with parent: %w", err) return nil, fmt.Errorf("unmarshal child with parent: %w", err)
} }
par := child.Parent() par := child.Parent()
@ -219,7 +218,7 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key) objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
if len(objData) != 0 { if len(objData) != 0 {
obj := objectSDK.New() obj := objectSDK.New()
if err := obj.Unmarshal(bytes.Clone(objData)); err != nil { if err := obj.Unmarshal(objData); err != nil {
return err return err
} }
chunk := objectSDK.ECChunk{} chunk := objectSDK.ECChunk{}

View file

@ -177,7 +177,7 @@ type gcHandler struct {
func (g gcHandler) handleKV(k, _ []byte) error { func (g gcHandler) handleKV(k, _ []byte) error {
o, err := garbageFromKV(k) o, err := garbageFromKV(k)
if err != nil { if err != nil {
return fmt.Errorf("could not parse garbage object: %w", err) return fmt.Errorf("parse garbage object: %w", err)
} }
return g.h(o) return g.h(o)
@ -190,7 +190,7 @@ type graveyardHandler struct {
func (g graveyardHandler) handleKV(k, v []byte) error { func (g graveyardHandler) handleKV(k, v []byte) error {
o, err := graveFromKV(k, v) o, err := graveFromKV(k, v)
if err != nil { if err != nil {
return fmt.Errorf("could not parse grave: %w", err) return fmt.Errorf("parse grave: %w", err)
} }
return g.h(o) return g.h(o)
@ -240,7 +240,7 @@ func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *oid.Address)
func garbageFromKV(k []byte) (res GarbageObject, err error) { func garbageFromKV(k []byte) (res GarbageObject, err error) {
err = decodeAddressFromKey(&res.addr, k) err = decodeAddressFromKey(&res.addr, k)
if err != nil { if err != nil {
err = fmt.Errorf("could not parse address: %w", err) err = fmt.Errorf("parse address: %w", err)
} }
return return

View file

@ -342,10 +342,10 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
} }
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { if err := db.decShardObjectCounter(tx, logical, res.LogicInhumed()); err != nil {
return err return err
} }
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { if err := db.decShardObjectCounter(tx, user, res.UserInhumed()); err != nil {
return err return err
} }
@ -373,7 +373,7 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
if data != nil { if data != nil {
err := targetBucket.Delete(tombKey) err := targetBucket.Delete(tombKey)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err) return nil, nil, fmt.Errorf("remove grave with tombstone key: %w", err)
} }
} }

View file

@ -1,7 +1,6 @@
package meta package meta
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"strconv" "strconv"
@ -130,7 +129,7 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) e
} }
return b.ForEach(func(k, v []byte) error { return b.ForEach(func(k, v []byte) error {
if oid.Decode(k) == nil && obj.Unmarshal(bytes.Clone(v)) == nil { if oid.Decode(k) == nil && obj.Unmarshal(v) == nil {
return f(cid, oid, obj) return f(cid, oid, obj)
} }

View file

@ -87,7 +87,8 @@ type CountAliveObjectsInContainerPrm struct {
} }
// ListWithCursor lists physical objects available in metabase starting from // ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects. // cursor. Includes objects of all types. Does not include inhumed and expired
// objects.
// Use cursor value from response for consecutive requests. // Use cursor value from response for consecutive requests.
// //
// Returns ErrEndOfListing if there are no more objects to return or count // Returns ErrEndOfListing if there are no more objects to return or count
@ -143,6 +144,8 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
rawAddr := make([]byte, cidSize, addressKeySize) rawAddr := make([]byte, cidSize, addressKeySize)
currEpoch := db.epochState.CurrentEpoch()
loop: loop:
for ; name != nil; name, _ = c.Next() { for ; name != nil; name, _ = c.Next() {
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
@ -167,7 +170,7 @@ loop:
if bkt != nil { if bkt != nil {
copy(rawAddr, cidRaw) copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, count, cursor, threshold) result, count, cursor, threshold, currEpoch)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -185,8 +188,7 @@ loop:
if offset != nil { if offset != nil {
// new slice is much faster but less memory efficient // new slice is much faster but less memory efficient
// we need to copy, because offset exists during bbolt tx // we need to copy, because offset exists during bbolt tx
cursor.inBucketOffset = make([]byte, len(offset)) cursor.inBucketOffset = bytes.Clone(offset)
copy(cursor.inBucketOffset, offset)
} }
if len(result) == 0 { if len(result) == 0 {
@ -195,8 +197,7 @@ loop:
// new slice is much faster but less memory efficient // new slice is much faster but less memory efficient
// we need to copy, because bucketName exists during bbolt tx // we need to copy, because bucketName exists during bbolt tx
cursor.bucketName = make([]byte, len(bucketName)) cursor.bucketName = bytes.Clone(bucketName)
copy(cursor.bucketName, bucketName)
return result, cursor, nil return result, cursor, nil
} }
@ -212,6 +213,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
limit int, // stop listing at `limit` items in result limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately threshold bool, // ignore cursor and start immediately
currEpoch uint64,
) ([]objectcore.Info, []byte, *Cursor, error) { ) ([]objectcore.Info, []byte, *Cursor, error) {
if cursor == nil { if cursor == nil {
cursor = new(Cursor) cursor = new(Cursor)
@ -243,13 +245,19 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
continue continue
} }
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return nil, nil, nil, err
}
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
if !objectLocked(bkt.Tx(), cnt, obj) && hasExpEpoch && expEpoch < currEpoch {
continue
}
var isLinkingObj bool var isLinkingObj bool
var ecInfo *objectcore.ECInfo var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular { if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(bytes.Clone(v)); err != nil {
return nil, nil, nil, err
}
isLinkingObj = isLinkObject(&o) isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader() ecHeader := o.ECHeader()
if ecHeader != nil { if ecHeader != nil {
@ -413,7 +421,7 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, p
var ecInfo *objectcore.ECInfo var ecInfo *objectcore.ECInfo
if prm.ObjectType == objectSDK.TypeRegular { if prm.ObjectType == objectSDK.TypeRegular {
var o objectSDK.Object var o objectSDK.Object
if err := o.Unmarshal(bytes.Clone(v)); err != nil { if err := o.Unmarshal(v); err != nil {
return err return err
} }
isLinkingObj = isLinkObject(&o) isLinkingObj = isLinkObject(&o)

View file

@ -3,14 +3,17 @@ package meta_test
import ( import (
"context" "context"
"errors" "errors"
"strconv"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
@ -71,14 +74,16 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
func TestLisObjectsWithCursor(t *testing.T) { func TestLisObjectsWithCursor(t *testing.T) {
t.Parallel() t.Parallel()
db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }()
const ( const (
currEpoch = 100
expEpoch = currEpoch - 1
containers = 5 containers = 5
total = containers * 4 // regular + ts + child + lock total = containers * 6 // regular + ts + child + lock + non-expired regular + locked expired
) )
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
defer func() { require.NoError(t, db.Close(context.Background())) }()
expected := make([]object.Info, 0, total) expected := make([]object.Info, 0, total)
// fill metabase with objects // fill metabase with objects
@ -127,6 +132,26 @@ func TestLisObjectsWithCursor(t *testing.T) {
err = putBig(db, child) err = putBig(db, child)
require.NoError(t, err) require.NoError(t, err)
expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) expected = append(expected, object.Info{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
// add expired object (do not include into expected)
obj = testutil.GenerateObjectWithCID(containerID)
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
require.NoError(t, metaPut(db, obj, nil))
// add non-expired object (include into expected)
obj = testutil.GenerateObjectWithCID(containerID)
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(currEpoch))
require.NoError(t, metaPut(db, obj, nil))
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
// add locked expired object (include into expected)
obj = testutil.GenerateObjectWithCID(containerID)
objID := oidtest.ID()
obj.SetID(objID)
testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
require.NoError(t, metaPut(db, obj, nil))
require.NoError(t, db.Lock(context.Background(), containerID, oidtest.ID(), []oid.ID{objID}))
expected = append(expected, object.Info{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
} }
t.Run("success with various count", func(t *testing.T) { t.Run("success with various count", func(t *testing.T) {

View file

@ -19,7 +19,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
if !db.mode.NoMetabase() { if !db.mode.NoMetabase() {
if err := db.Close(ctx); err != nil { if err := db.Close(ctx); err != nil {
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err) return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
} }
} }
@ -31,7 +31,7 @@ func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
err = db.Init(ctx) err = db.Init(ctx)
} }
if err != nil { if err != nil {
return fmt.Errorf("can't set metabase mode (old=%s, new=%s): %w", db.mode, m, err) return fmt.Errorf("set metabase mode (old=%s, new=%s): %w", db.mode, m, err)
} }
} }

View file

@ -1,7 +1,6 @@
package meta package meta
import ( import (
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"errors" "errors"
@ -180,18 +179,18 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
err := putUniqueIndexes(tx, obj, si, id) err := putUniqueIndexes(tx, obj, si, id)
if err != nil { if err != nil {
return fmt.Errorf("can't put unique indexes: %w", err) return fmt.Errorf("put unique indexes: %w", err)
} }
err = updateListIndexes(tx, obj, putListIndexItem) err = updateListIndexes(tx, obj, putListIndexItem)
if err != nil { if err != nil {
return fmt.Errorf("can't put list indexes: %w", err) return fmt.Errorf("put list indexes: %w", err)
} }
if indexAttributes { if indexAttributes {
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem) err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
if err != nil { if err != nil {
return fmt.Errorf("can't put fake bucket tree indexes: %w", err) return fmt.Errorf("put fake bucket tree indexes: %w", err)
} }
} }
@ -250,7 +249,7 @@ func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, ad
} }
rawObject, err := obj.CutPayload().Marshal() rawObject, err := obj.CutPayload().Marshal()
if err != nil { if err != nil {
return fmt.Errorf("can't marshal object header: %w", err) return fmt.Errorf("marshal object header: %w", err)
} }
return putUniqueIndexItem(tx, namedBucketItem{ return putUniqueIndexItem(tx, namedBucketItem{
name: bucketName, name: bucketName,
@ -320,7 +319,7 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []
return si.Marshal() return si.Marshal()
default: default:
oldSI := objectSDK.NewSplitInfo() oldSI := objectSDK.NewSplitInfo()
if err := oldSI.Unmarshal(bytes.Clone(old)); err != nil { if err := oldSI.Unmarshal(old); err != nil {
return nil, err return nil, err
} }
si = util.MergeSplitInfo(si, oldSI) si = util.MergeSplitInfo(si, oldSI)
@ -475,7 +474,7 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error { func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
bkt, err := createBucketLikelyExists(tx, item.name) bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil { if err != nil {
return fmt.Errorf("can't create index %v: %w", item.name, err) return fmt.Errorf("create index %v: %w", item.name, err)
} }
data, err := update(bkt.Get(item.key), item.val) data, err := update(bkt.Get(item.key), item.val)
@ -492,12 +491,12 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name) bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil { if err != nil {
return fmt.Errorf("can't create index %v: %w", item.name, err) return fmt.Errorf("create index %v: %w", item.name, err)
} }
fkbtRoot, err := createBucketLikelyExists(bkt, item.key) fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
if err != nil { if err != nil {
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err) return fmt.Errorf("create fake bucket tree index %v: %w", item.key, err)
} }
return fkbtRoot.Put(item.val, zeroValue) return fkbtRoot.Put(item.val, zeroValue)
@ -506,19 +505,19 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name) bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil { if err != nil {
return fmt.Errorf("can't create index %v: %w", item.name, err) return fmt.Errorf("create index %v: %w", item.name, err)
} }
lst, err := decodeList(bkt.Get(item.key)) lst, err := decodeList(bkt.Get(item.key))
if err != nil { if err != nil {
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err) return fmt.Errorf("decode leaf list %v: %w", item.key, err)
} }
lst = append(lst, item.val) lst = append(lst, item.val)
encodedLst, err := encodeList(lst) encodedLst, err := encodeList(lst)
if err != nil { if err != nil {
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err) return fmt.Errorf("encode leaf list %v: %w", item.key, err)
} }
return bkt.Put(item.key, encodedLst) return bkt.Put(item.key, encodedLst)

View file

@ -565,7 +565,7 @@ func groupFilters(filters objectSDK.SearchFilters, useAttributeIndex bool) (filt
case v2object.FilterHeaderContainerID: // support deprecated field case v2object.FilterHeaderContainerID: // support deprecated field
err := res.cnr.DecodeString(filters[i].Value()) err := res.cnr.DecodeString(filters[i].Value())
if err != nil { if err != nil {
return filterGroup{}, fmt.Errorf("can't parse container id: %w", err) return filterGroup{}, fmt.Errorf("parse container id: %w", err)
} }
res.withCnrFilter = true res.withCnrFilter = true

View file

@ -32,13 +32,13 @@ func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error
} }
if err := db.openDB(ctx, mode); err != nil { if err := db.openDB(ctx, mode); err != nil {
return nil, fmt.Errorf("failed to open metabase: %w", err) return nil, fmt.Errorf("open metabase: %w", err)
} }
id, err := db.readShardID() id, err := db.readShardID()
if cErr := db.close(); cErr != nil { if cErr := db.close(); cErr != nil {
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr)) err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
} }
return id, metaerr.Wrap(err) return id, metaerr.Wrap(err)
@ -70,7 +70,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
} }
if err := db.openDB(ctx, mode); err != nil { if err := db.openDB(ctx, mode); err != nil {
return fmt.Errorf("failed to open metabase: %w", err) return fmt.Errorf("open metabase: %w", err)
} }
err := db.writeShardID(id) err := db.writeShardID(id)
@ -79,7 +79,7 @@ func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) err
} }
if cErr := db.close(); cErr != nil { if cErr := db.close(); cErr != nil {
err = errors.Join(err, fmt.Errorf("failed to close metabase: %w", cErr)) err = errors.Join(err, fmt.Errorf("close metabase: %w", cErr))
} }
return metaerr.Wrap(err) return metaerr.Wrap(err)

View file

@ -35,7 +35,7 @@ func (r StorageIDRes) StorageID() []byte {
// StorageID returns storage descriptor for objects from the blobstor. // StorageID returns storage descriptor for objects from the blobstor.
// It is put together with the object can makes get/delete operation faster. // It is put together with the object can makes get/delete operation faster.
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) { func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (StorageIDRes, error) {
var ( var (
startedAt = time.Now() startedAt = time.Now()
success = false success = false
@ -53,32 +53,32 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
var res StorageIDRes
if db.mode.NoMetabase() { if db.mode.NoMetabase() {
return res, ErrDegradedMode return res, ErrDegradedMode
} }
err = db.boltDB.View(func(tx *bbolt.Tx) error { err := db.boltDB.View(func(tx *bbolt.Tx) error {
res.id, err = db.storageID(tx, prm.addr) res.id = db.storageID(tx, prm.addr)
return nil
return err
}) })
success = err == nil success = err == nil
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) { func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) []byte {
key := make([]byte, bucketKeySize) key := make([]byte, bucketKeySize)
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key)) smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
if smallBucket == nil { if smallBucket == nil {
return nil, nil return nil
} }
storageID := smallBucket.Get(objectKey(addr.Object(), key)) storageID := smallBucket.Get(objectKey(addr.Object(), key))
if storageID == nil { if storageID == nil {
return nil, nil return nil
} }
return bytes.Clone(storageID), nil return bytes.Clone(storageID)
} }
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation. // UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.

View file

@ -95,7 +95,7 @@ func compactDB(db *bbolt.DB) error {
NoSync: true, NoSync: true,
}) })
if err != nil { if err != nil {
return fmt.Errorf("can't open new metabase to compact: %w", err) return fmt.Errorf("open new metabase to compact: %w", err)
} }
if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil { if err := bbolt.Compact(dst, db, compactMaxTxSize); err != nil {
return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName))) return fmt.Errorf("compact metabase: %w", errors.Join(err, dst.Close(), os.Remove(tmpFileName)))
@ -292,7 +292,7 @@ func iterateExpirationAttributeKeyBucket(ctx context.Context, b *bbolt.Bucket, i
} }
expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64) expirationEpoch, err := strconv.ParseUint(string(attrValue), 10, 64)
if err != nil { if err != nil {
return fmt.Errorf("could not parse expiration epoch: %w", err) return fmt.Errorf("parse expiration epoch: %w", err)
} }
expirationEpochBucket := b.Bucket(attrValue) expirationEpochBucket := b.Bucket(attrValue)
attrKeyValueC := expirationEpochBucket.Cursor() attrKeyValueC := expirationEpochBucket.Cursor()
@ -399,7 +399,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
for _, key := range keys { for _, key := range keys {
attr, ok := attributeFromAttributeBucket(key) attr, ok := attributeFromAttributeBucket(key)
if !ok { if !ok {
return nil, fmt.Errorf("failed to parse attribute key from user attribute bucket key %s", hex.EncodeToString(key)) return nil, fmt.Errorf("parse attribute key from user attribute bucket key %s", hex.EncodeToString(key))
} }
if !IsAtrributeIndexed(attr) { if !IsAtrributeIndexed(attr) {
keysToDrop = append(keysToDrop, key) keysToDrop = append(keysToDrop, key)
@ -407,7 +407,7 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
} }
contID, ok := cidFromAttributeBucket(key) contID, ok := cidFromAttributeBucket(key)
if !ok { if !ok {
return nil, fmt.Errorf("failed to parse container ID from user attribute bucket key %s", hex.EncodeToString(key)) return nil, fmt.Errorf("parse container ID from user attribute bucket key %s", hex.EncodeToString(key))
} }
info, err := cs.Info(contID) info, err := cs.Info(contID)
if err != nil { if err != nil {

View file

@ -231,11 +231,11 @@ func parseExpirationEpochKey(key []byte) (uint64, cid.ID, oid.ID, error) {
epoch := binary.BigEndian.Uint64(key) epoch := binary.BigEndian.Uint64(key)
var cnr cid.ID var cnr cid.ID
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil { if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (container ID): %w", err) return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (container ID): %w", err)
} }
var obj oid.ID var obj oid.ID
if err := obj.Decode(key[epochSize+cidSize:]); err != nil { if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (object ID): %w", err) return 0, cid.ID{}, oid.ID{}, fmt.Errorf("decode expiration epoch to object key (object ID): %w", err)
} }
return epoch, cnr, obj, nil return epoch, cnr, obj, nil
} }

View file

@ -67,7 +67,7 @@ func updateVersion(tx *bbolt.Tx, version uint64) error {
b, err := tx.CreateBucketIfNotExists(shardInfoBucket) b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
if err != nil { if err != nil {
return fmt.Errorf("can't create auxiliary bucket: %w", err) return fmt.Errorf("create auxiliary bucket: %w", err)
} }
return b.Put(versionKey, data) return b.Put(versionKey, data)
} }

View file

@ -106,7 +106,7 @@ func (t *boltForest) SetMode(ctx context.Context, m mode.Mode) error {
} }
} }
if err != nil { if err != nil {
return fmt.Errorf("can't set pilorama mode (old=%s, new=%s): %w", t.mode, m, err) return fmt.Errorf("set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
} }
t.mode = m t.mode = m
@ -128,7 +128,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
readOnly := m.ReadOnly() readOnly := m.ReadOnly()
err := util.MkdirAllX(filepath.Dir(t.path), t.perm) err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
if err != nil { if err != nil {
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err)) return metaerr.Wrap(fmt.Errorf("create dir %s for the pilorama: %w", t.path, err))
} }
opts := *bbolt.DefaultOptions opts := *bbolt.DefaultOptions
@ -139,7 +139,7 @@ func (t *boltForest) openBolt(m mode.Mode) error {
t.db, err = bbolt.Open(t.path, t.perm, &opts) t.db, err = bbolt.Open(t.path, t.perm, &opts)
if err != nil { if err != nil {
return metaerr.Wrap(fmt.Errorf("can't open the pilorama DB: %w", err)) return metaerr.Wrap(fmt.Errorf("open the pilorama DB: %w", err))
} }
t.db.MaxBatchSize = t.maxBatchSize t.db.MaxBatchSize = t.maxBatchSize
@ -419,10 +419,7 @@ func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID stri
return err return err
} }
i, node, err := t.getPathPrefix(bTree, attr, path) i, node := t.getPathPrefix(bTree, attr, path)
if err != nil {
return err
}
ts := t.getLatestTimestamp(bLog, d.Position, d.Size) ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
lm = make([]Move, len(path)-i+1) lm = make([]Move, len(path)-i+1)
@ -980,10 +977,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
b := treeRoot.Bucket(dataBucket) b := treeRoot.Bucket(dataBucket)
i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1]) i, curNodes := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
if err != nil {
return err
}
if i < len(path)-1 { if i < len(path)-1 {
return nil return nil
} }
@ -1360,7 +1354,7 @@ func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, err
return nil return nil
}) })
if err != nil { if err != nil {
return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err)) return nil, metaerr.Wrap(fmt.Errorf("list trees: %w", err))
} }
success = true success = true
return ids, nil return ids, nil
@ -1504,7 +1498,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
var contID cidSDK.ID var contID cidSDK.ID
if err := contID.Decode(k[:32]); err != nil { if err := contID.Decode(k[:32]); err != nil {
return fmt.Errorf("failed to decode containerID: %w", err) return fmt.Errorf("decode container ID: %w", err)
} }
res.Items = append(res.Items, ContainerIDTreeID{ res.Items = append(res.Items, ContainerIDTreeID{
CID: contID, CID: contID,
@ -1512,8 +1506,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
}) })
if len(res.Items) == batchSize { if len(res.Items) == batchSize {
res.NextPageToken = make([]byte, len(k)) res.NextPageToken = bytes.Clone(k)
copy(res.NextPageToken, k)
break break
} }
} }
@ -1526,7 +1519,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
return &res, nil return &res, nil
} }
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) { func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node) {
c := bTree.Cursor() c := bTree.Cursor()
var curNodes []Node var curNodes []Node
@ -1549,14 +1542,14 @@ func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr strin
} }
if len(nextNodes) == 0 { if len(nextNodes) == 0 {
return i, curNodes, nil return i, curNodes
} }
} }
return len(path), nextNodes, nil return len(path), nextNodes
} }
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node) {
c := bTree.Cursor() c := bTree.Cursor()
var curNode Node var curNode Node
@ -1576,10 +1569,10 @@ loop:
childKey, value = c.Next() childKey, value = c.Next()
} }
return i, curNode, nil return i, curNode
} }
return len(path), curNode, nil return len(path), curNode
} }
func (t *boltForest) moveFromBytes(m *Move, data []byte) error { func (t *boltForest) moveFromBytes(m *Move, data []byte) error {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"slices"
"sort" "sort"
"strings" "strings"
@ -84,8 +85,7 @@ func (f *memoryForest) TreeAddByPath(_ context.Context, d CIDDescriptor, treeID
s.operations = append(s.operations, op) s.operations = append(s.operations, op)
} }
mCopy := make([]KeyValue, len(m)) mCopy := slices.Clone(m)
copy(mCopy, m)
op := s.do(&Move{ op := s.do(&Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{

View file

@ -36,7 +36,7 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
size, err := s.metaBase.ContainerSize(prm.cnr) size, err := s.metaBase.ContainerSize(prm.cnr)
if err != nil { if err != nil {
return ContainerSizeRes{}, fmt.Errorf("could not get container size: %w", err) return ContainerSizeRes{}, fmt.Errorf("get container size: %w", err)
} }
return ContainerSizeRes{ return ContainerSizeRes{
@ -71,7 +71,7 @@ func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (Cont
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID) counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
if err != nil { if err != nil {
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err) return ContainerCountRes{}, fmt.Errorf("get container counters: %w", err)
} }
return ContainerCountRes{ return ContainerCountRes{

View file

@ -38,7 +38,7 @@ func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err err
err = s.SetMode(ctx, mode.DegradedReadOnly) err = s.SetMode(ctx, mode.DegradedReadOnly)
if err != nil { if err != nil {
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly)) return fmt.Errorf("switch to mode %s", mode.DegradedReadOnly)
} }
return nil return nil
} }
@ -72,7 +72,7 @@ func (s *Shard) Open(ctx context.Context) error {
for j := i + 1; j < len(components); j++ { for j := i + 1; j < len(components); j++ {
if err := components[j].Open(ctx, m); err != nil { if err := components[j].Open(ctx, m); err != nil {
// Other components must be opened, fail. // Other components must be opened, fail.
return fmt.Errorf("could not open %T: %w", components[j], err) return fmt.Errorf("open %T: %w", components[j], err)
} }
} }
err = s.handleMetabaseFailure(ctx, "open", err) err = s.handleMetabaseFailure(ctx, "open", err)
@ -83,7 +83,7 @@ func (s *Shard) Open(ctx context.Context) error {
break break
} }
return fmt.Errorf("could not open %T: %w", component, err) return fmt.Errorf("open %T: %w", component, err)
} }
} }
return nil return nil
@ -184,7 +184,7 @@ func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
break break
} }
return fmt.Errorf("could not initialize %T: %w", component, err) return fmt.Errorf("initialize %T: %w", component, err)
} }
} }
return nil return nil
@ -205,7 +205,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
err := s.metaBase.Reset() err := s.metaBase.Reset()
if err != nil { if err != nil {
return fmt.Errorf("could not reset metabase: %w", err) return fmt.Errorf("reset metabase: %w", err)
} }
withCount := true withCount := true
@ -254,12 +254,12 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
err = errors.Join(egErr, itErr) err = errors.Join(egErr, itErr)
if err != nil { if err != nil {
return fmt.Errorf("could not put objects to the meta: %w", err) return fmt.Errorf("put objects to the meta: %w", err)
} }
err = s.metaBase.SyncCounters() err = s.metaBase.SyncCounters()
if err != nil { if err != nil {
return fmt.Errorf("could not sync object counters: %w", err) return fmt.Errorf("sync object counters: %w", err)
} }
success = true success = true
@ -318,7 +318,7 @@ func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address,
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error { func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
var lock objectSDK.Lock var lock objectSDK.Lock
if err := lock.Unmarshal(obj.Payload()); err != nil { if err := lock.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal lock content: %w", err) return fmt.Errorf("unmarshal lock content: %w", err)
} }
locked := make([]oid.ID, lock.NumberOfMembers()) locked := make([]oid.ID, lock.NumberOfMembers())
@ -328,7 +328,7 @@ func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) err
id, _ := obj.ID() id, _ := obj.ID()
err := s.metaBase.Lock(ctx, cnr, id, locked) err := s.metaBase.Lock(ctx, cnr, id, locked)
if err != nil { if err != nil {
return fmt.Errorf("could not lock objects: %w", err) return fmt.Errorf("lock objects: %w", err)
} }
return nil return nil
} }
@ -337,7 +337,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
tombstone := objectSDK.NewTombstone() tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil { if err := tombstone.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal tombstone content: %w", err) return fmt.Errorf("unmarshal tombstone content: %w", err)
} }
tombAddr := object.AddressOf(obj) tombAddr := object.AddressOf(obj)
@ -358,7 +358,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
_, err := s.metaBase.Inhume(ctx, inhumePrm) _, err := s.metaBase.Inhume(ctx, inhumePrm)
if err != nil { if err != nil {
return fmt.Errorf("could not inhume objects: %w", err) return fmt.Errorf("inhume objects: %w", err)
} }
return nil return nil
} }

View file

@ -175,7 +175,7 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
mExRes, err := s.metaBase.StorageID(ctx, mPrm) mExRes, err := s.metaBase.StorageID(ctx, mPrm)
if err != nil { if err != nil {
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) return nil, true, fmt.Errorf("fetch blobovnicza id from metabase: %w", err)
} }
storageID := mExRes.StorageID() storageID := mExRes.StorageID()

View file

@ -36,7 +36,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
modeDegraded := s.GetMode().NoMetabase() modeDegraded := s.GetMode().NoMetabase()
if !modeDegraded { if !modeDegraded {
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil { if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
err = fmt.Errorf("failed to read shard id from metabase: %w", err) err = fmt.Errorf("read shard id from metabase: %w", err)
} }
} }
@ -64,7 +64,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
if len(idFromMetabase) == 0 && !modeDegraded { if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr)) err = errors.Join(err, fmt.Errorf("write shard id to metabase: %w", setErr))
} }
} }
return return

View file

@ -109,7 +109,7 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
lst, err := s.metaBase.Containers(ctx) lst, err := s.metaBase.Containers(ctx)
if err != nil { if err != nil {
return res, fmt.Errorf("can't list stored containers: %w", err) return res, fmt.Errorf("list stored containers: %w", err)
} }
filters := objectSDK.NewSearchFilters() filters := objectSDK.NewSearchFilters()
@ -149,7 +149,7 @@ func (s *Shard) ListContainers(ctx context.Context, _ ListContainersPrm) (ListCo
containers, err := s.metaBase.Containers(ctx) containers, err := s.metaBase.Containers(ctx)
if err != nil { if err != nil {
return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err) return ListContainersRes{}, fmt.Errorf("get list of containers: %w", err)
} }
return ListContainersRes{ return ListContainersRes{
@ -180,7 +180,7 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
metaPrm.SetCursor(prm.cursor) metaPrm.SetCursor(prm.cursor)
res, err := s.metaBase.ListWithCursor(ctx, metaPrm) res, err := s.metaBase.ListWithCursor(ctx, metaPrm)
if err != nil { if err != nil {
return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err) return ListWithCursorRes{}, fmt.Errorf("get list of objects: %w", err)
} }
return ListWithCursorRes{ return ListWithCursorRes{
@ -208,7 +208,7 @@ func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContai
metaPrm.Handler = prm.Handler metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm) err := s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil { if err != nil {
return fmt.Errorf("could not iterate over containers: %w", err) return fmt.Errorf("iterate over containers: %w", err)
} }
return nil return nil
@ -235,7 +235,7 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
metaPrm.Handler = prm.Handler metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil { if err != nil {
return fmt.Errorf("could not iterate over objects: %w", err) return fmt.Errorf("iterate over objects: %w", err)
} }
return nil return nil
@ -258,7 +258,7 @@ func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAlive
metaPrm.ContainerID = prm.ContainerID metaPrm.ContainerID = prm.ContainerID
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm) count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
if err != nil { if err != nil {
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err) return 0, fmt.Errorf("count alive objects in bucket: %w", err)
} }
return count, nil return count, nil

View file

@ -81,7 +81,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
res, err = s.blobStor.Put(ctx, putPrm) res, err = s.blobStor.Put(ctx, putPrm)
if err != nil { if err != nil {
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err) return PutRes{}, fmt.Errorf("put object to BLOB storage: %w", err)
} }
} }
@ -94,7 +94,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
if err != nil { if err != nil {
// may we need to handle this case in a special way // may we need to handle this case in a special way
// since the object has been successfully written to BlobStor // since the object has been successfully written to BlobStor
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) return PutRes{}, fmt.Errorf("put object to metabase: %w", err)
} }
if res.Inserted { if res.Inserted {

View file

@ -67,7 +67,7 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
mRes, err := s.metaBase.Select(ctx, selectPrm) mRes, err := s.metaBase.Select(ctx, selectPrm)
if err != nil { if err != nil {
return SelectRes{}, fmt.Errorf("could not select objects from metabase: %w", err) return SelectRes{}, fmt.Errorf("select objects from metabase: %w", err)
} }
return SelectRes{ return SelectRes{

View file

@ -94,7 +94,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
if err != nil { if err != nil {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
return metaerr.Wrap(c.initCounters()) c.initCounters()
return nil
} }
// Init runs necessary services. // Init runs necessary services.

View file

@ -30,7 +30,7 @@ func IterateDB(db *bbolt.DB, f func(oid.Address) error) error {
return b.ForEach(func(k, _ []byte) error { return b.ForEach(func(k, _ []byte) error {
err := addr.DecodeString(string(k)) err := addr.DecodeString(string(k))
if err != nil { if err != nil {
return fmt.Errorf("could not parse object address: %w", err) return fmt.Errorf("parse object address: %w", err)
} }
return f(addr) return f(addr)

Some files were not shown because too many files have changed in this diff Show more