Compare commits

...
Sign in to create a new pull request.

81 commits

Author SHA1 Message Date
48930ec452 [#1703] cli: Allow reading RPC endpoint from config file
Allowed reading an RPC endpoint from a configuration file when
getting current epoch in the `object lock` and `bearer create`
commands.

Close #1703

Change-Id: Iea8509dff2893a02cb63f695d7f532eecd743ed8
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2025-04-14 15:40:27 +00:00
f37babdc54
[#1700] shard: Lock shard's mode mutex on close
To prevent race between GC handlers and close.

Change-Id: I06219230964f000f666a56158d3563c760518c3b
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-14 14:35:16 +03:00
fd37cea443
[#1700] engine: Drop unused block execution methods
`BlockExecution` and `ResumeExecution` were used only by unit test.
So drop them and simplify code.

Change-Id: Ib3de324617e8a27fc1f015542ac5e94df5c60a6e
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-14 14:35:15 +03:00
e80632884a
[#1700] config: Drop redundant check
Target config created on level above, so limiter is always nil.

Change-Id: I1896baae5b9ddeed339a7d2b022a9a886589d362
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-14 14:35:15 +03:00
5aaa3df533
[#1700] config: Move config struct to qos package
Change-Id: Ie642fff5cd1702cda00425628e11f3fd8c514798
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-14 14:35:14 +03:00
3be33b7117
[#1706] cli/playground: Mention 'help' in error message for invalid commands
Change-Id: Ica1112b907919a6d19fa1bf683f2a952c4c638e4
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-14 13:10:34 +03:00
29b4fbe451 [#1332] cli/playground: Add 'netmap-config' flag
Change-Id: I4342fb9a6da2a05c18ae4e0ad9f0c71550efc5ef
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-14 07:21:08 +00:00
0d36e93169 [#1332] cli/playground: Move command handler selection to separate function
Change-Id: I2dcbd85e61960c3cf141b815edab174e308ef858
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-13 11:49:45 +00:00
8e87cbee17 [#1689] ci: Move commit checker out of Jenkinsfile
Commit checker is now configured globally for all Gerrit repositories:
  TrueCloudLab/jenkins#16

This allows us to execute commit-checker independently from the rest of
CI suite and re-check commit message format without rerunning other
tests.

Change-Id: Ib8f899b856482a5dc5d03861171585415ff6b452
Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2025-04-12 15:42:45 +00:00
12fc7850dd [#1619] logger: Set tags for ir components
Change-Id: Ifab575bc2a3cd83c9001cd68fffaf94c91494043
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-04-11 17:27:27 +03:00
dfe2f9956a [#1619] logger: Filter entries by tags provided in config
Change-Id: Ia2a79d6cb2a5eb263fb2e6db3f9cf9f2a7d57118
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-04-11 17:27:27 +03:00
e06ecacf57
[#1705] engine: Use condition var for evacuation unit tests
To know exactly when the evacuation was completed,
a conditional variable was added.

Closes #1705

Change-Id: I86f6d7d2ad2b9759905b6b5e9341008cb74f5dfd
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-11 09:20:46 +03:00
64c1392513 [#1710] object: Sign response even if CloseAndRecv returns error
* Sign service wraps an error with status and sign a response even
  if error occurs from `CloseAndRecv` in `Put` and `Patch` methods.

Close #1710

Change-Id: I7e1d8fe00db53607fa6e04ebec9a29b87349f8a1
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-04-10 16:31:47 +03:00
dcfd895449 [#1710] object: Implement Unwrap() for errIncompletePut
* When sign service calls `SignResponse`, it tries to set v2 status
  to response by unwrapping an error to the possible depth. This wasn't
  applicable for `errIncompletePut` so far as it didn't implement
  `Unwrap()`. Thus, it wasn't able to find a correct status set in error.

Change-Id: I280c1806a008176854c55f13bf8688e5736ef941
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-04-10 16:31:47 +03:00
6730e27ae7
[#1712] adm: Drop rpc-endpoint flag from zombie scan
Morph addresses from config are used.

Change-Id: Id99f91defbbff442c308f30d219b9824b4c871de
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-10 10:06:31 +03:00
f7779adf71
[#1712] core: Extend object info string with EC header
Closes #1712

Change-Id: Ief4a960f7dece3359763113270d1ff5155f3f19e
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-10 10:06:31 +03:00
f93b96c601
[#1712] adm: Add maintenance zombie commands
Change-Id: I1b73e561a8daad67d0a8ffc0d293cbdd09aaab6b
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-10 10:06:30 +03:00
aed84b567c [#1689] linter: Bump 'golangci-lint' to v2.0.2
Change-Id: Ib546af43845014785f0debce429a37d62e616539
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-09 14:26:24 +00:00
fe29ed043a
[#1689] linter: Fix staticcheck warning: 'could use tagged switch on *'
Change-Id: Ia340ce1ccdd223eb87f7aefabfba62b7055f344d
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-09 16:55:35 +03:00
4f9d237042
[#1689] linter: Fix staticcheck warning: 'probably want to use time.Time.Equal instead'
Change-Id: Idb119d3f4f167c9e42ed48633d301185589553ed
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-09 16:55:35 +03:00
dfdeedfc6f
[#1689] linter: Fix staticcheck warning: 'could apply De Morgan's law'
Change-Id: Ife03172bad7d517dc99771250c3308a9fc0916b3
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-09 16:55:35 +03:00
2394ae6ce0
[#1689] linter: Fix staticcheck warning: 'could lift into loop condition'
Change-Id: I4ff3cda54861d857740203d6994872998a22d5d5
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-09 16:55:35 +03:00
c274bbeb7c
[#1689] linter: Fix staticcheck warning: 'methods on the same type should have the same receiver name'
Change-Id: I25e9432987f73061c1506a184a82065e37885861
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-09 16:55:35 +03:00
faec499b38
[#1689] linter: Fix staticcheck warning: 'variable naming format'
Change-Id: I8f8b63a6a5f9b6feb7c91f70fe8ac092575b145c
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-09 16:55:35 +03:00
46fd5e17b2
[#1332] cli/playground: Add help
Change-Id: I6160cfddf427b161619e4b96ceec8396b75c4d08
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-08 21:01:20 +03:00
17cba3387e
[#1332] cli/playground: Prevent prompt artifacts by writing to 'readline' stdout
Change-Id: I1c3cbb0b762f29c0995d3f6fc79bae5246ee7bc3
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-08 21:01:20 +03:00
766d9ec46b [#1693] cli/lens: Replace conditional panics with asserts
Change-Id: Id827da0cd9eef66efd806be6c9bc61044175a971
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-04-08 14:37:06 +03:00
0e1b01b15f [#1693] cli/adm: Replace conditional panics with asserts
Change-Id: I3a46f7ac6d9e4ff51bb490e6fcfc07957418f1a7
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-04-08 14:37:04 +03:00
4c03561aa2 [#1693] internal/assert: Add False and NoError checks
Change-Id: Ib3ab1671eeff8e8917673513477f158cadbb4287
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-04-08 14:34:53 +03:00
f4696e8964 [#1689] linter: Fix staticcheck warning: 'Use fmt.Fprintf(...) instead of WriteString(fmt.Sprintf(...))'
Change-Id: I253ab717885cb01b4a2e471147e883ee351be277
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-08 09:40:35 +00:00
b0ef737a74 [#1689] linter: Fix testifylint warning: 'len: use require.Len'
Change-Id: I7a08f09c169ac237647dcb20b0737f1c51c441ad
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-08 09:32:30 +00:00
6f7b6b65f3 [#1689] linter: Fix staticcheck warning: 'embedded field can be simplified'
Change-Id: I8f454f7d09973cdea096495c3949b88cdd01102e
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-08 09:32:24 +00:00
923f0acf8f [#1564] cli: Fix output of object nodes command
The object nodes command misleadingly reported the number of
"found data objects" as if it matched the actual expected amount,
which could be incorrect for EC objects.

Updated the output wording to explicitly distinguish between
currently available data objects and total objects per the EC
schema.

Change-Id: Ib36b89db58ae66d8978baf5a16b59435db9a068d
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-04-07 18:08:04 +03:00
9b5c1da40f
[#1679] linter: Bump 'golangci-lint' to v1.64.8
- Removed deprecated config option 'linters.govet.check-shadowing', replaced with enabling the 'shadow' linter.
- Removed usage of deprecated 'tenv' linter, replaced by 'usetesting'.

Change-Id: Ib1bd1ec83b0fd55a47e405b290bc2bc967b9389c
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-07 14:21:44 +03:00
b4b053cecd
[#1679] node: Fix 'gocognit' warning
Change-Id: I6e2a278af51869c05c306c2910ba85130e39532e
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-07 14:10:25 +03:00
0c5d74729c
[#1679] node: Fix 'revive' warning
Change-Id: I74ff6332b10f17a329c5d108d01d43002e92aafd
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-07 14:07:04 +03:00
c4f941a5f5
[#1689] client/netmap: Remove useless error-handling
No functional changes.

Change-Id: I3a53c992c3ce5e8c6db252abb09aa40626142a97
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-05 09:16:33 +03:00
d933609084
[#1689] client/netmap: Refactor Client.config()
There are problems with that code:
- explicit casts,
- `ErrConfigNotFound` which is not a part of a public API,
- hand-rolled assertions, even though neo-go already has everything we
  need.

So, remove the error, use `stackitem/Item.Try*()` methods for
conversions. Note, that readUint64Config() returns an error if the
parameter is missing. This is likely an error, but this behaviour is
preserved in this PR: `TryInteger()` returns error when applied to
`Null`. By contract, `TryBool()` returns false for `Null`, so this
PR introduces no functional changes.

Refs 82c7a50b8a/pkg/vm/stackitem/item.go (L418)

Change-Id: I445d28a7c6b5abb9a2bb97b57c0cc42d617e16f7
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-05 09:16:32 +03:00
3d771aa21c
[#1689] morph: Remove frostfsError type
It has no custom methods defined, only adds `frostfs error: `
prefix to the error message. The utility of this prefix is debatable,
failed invocations already have `invocation failed` prefix.

Change-Id: If25ebb3679497f3f10acde43b596c81d52351907
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-05 09:16:32 +03:00
52367dc9b2
[#1689] go.mod: Update sdk-go
Change-Id: I72052fe11e66e4c77f4aef6cb2c0f038aa7b0d1f
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-04 20:40:15 +03:00
979d4bb2ae [#1701] tree: Form $Tree:ID resource property for APE
* Make `verifyClient`, `checkAPE` receive `treeID` from request body;
* Make `newAPERequest` set `$Tree:ID` property
* Add unit-test to check if a rule for `$Tree:ID` works

Close #1701

Change-Id: I834fed366e8adfd4b5c07bf50aac09af6239991b
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-04-04 17:48:00 +03:00
fbc623f34e [#1701] go.mod: Bump policy-engine version
Change-Id: I7aa359bf235034d6459275d366a276d9930fa227
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-04-04 17:48:00 +03:00
5350632e01 [#1705] engine/test: Increase evacuation timeout
This test was flaky in CI probably because of runner load fluctuations.
Let's increase the timeout and see if the flakiness goes away.

(close #1705)

Change-Id: I76f96e3d6f4adb3d5de0e27b8ee6b47685236277
Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2025-04-04 14:41:56 +00:00
2938498b52
[#1689] adm: Fix NNS root availability check
After TrueCloudLab/frostfs-contract#117
we allow checking for root domain availability directly.
Before this commit, NNSRootRegistered() has always returned true, so the
actual root registration happened as a side-effect of the following
code, because NNS registers all parent domains, if they are missing.

Change-Id: Icf98f130e77d31b4af7b69697989183c1c8f6a56
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-04 15:33:52 +03:00
272128e61f
Revert "[#652] adm: Group independent stages in batches"
This reverts commit d00c606fee.

There are internal dependencies inside the last stage: first, we
register NNS root, only then register add records.
Revert for now, will revert back after more testing.

Change-Id: I760632b5628caf04849d4a64c714cf286051f357
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-04 14:11:43 +03:00
50dccff7c1 [#1633] morph/netmap: Merge node info in netmap with candidates list
Applicable for both cases: when node uses local cache for netmap and when it disabled.

Change-Id: I3050f537e20312a4b39e944aca763b77bd1e74c4
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-04-03 16:29:54 +03:00
634de97509
[#1704] metabase: Do not ignore errors by Delete
Change-Id: Ie7b89071a007f53f55879ff9e7e0c25d24ad5dbf
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-03 10:09:05 +03:00
2a6cdbdb72 [#1689] cli: Add split-header option for object patch command
* Make `split-header` option read binary- or JSON-encoded split-header;
* Use `PatchHeader` instead of `PatchAttributes`.

Change-Id: I50ae1bd93d4695657249dacbea981199a39e1a35
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-04-02 13:34:46 +00:00
11493d587b
[#579] config: Change config example to be compatible with YAML 1.2 standard
In accordance with the YAML 1.2 specification, octal numbers must begin with the 0o prefix.

Change-Id: Icb2e83a4aa75c1eb91decd0b7c9b146aaa9fb3e2
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2025-04-02 11:21:34 +03:00
b924ecb850 [#1689] object: Make patch streamer use ApplyHeaderPatch
Change-Id: I4fb94936621544f70ef4e08815c42efaa5ba846f
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-04-02 07:12:40 +00:00
e142d25fac
[#1700] gc: Wait for handlers on GC stopping
First wait for goroutine handles epoch events to not to get data race
on `gc.newEpochHandlers.cancelFunc`.

Then cancel handlers and wait for them.

Change-Id: I71f11f8526961f8356f582a95b10eb8340c0aedd
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 16:00:41 +03:00
bd1c18e117
[#1689] cli/tree: Copy dial options from the service code
There should be no `grpcs://` prefix in address and credentials should
be picked.

Change-Id: I58cdc98b079eac2c7db7dc088f4f131794a91b9f
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
b27f7d1d17
[#1689] treesvc: Use context dialer in synchronizeTree()
This dialer supports source-based routing and is already used in cache.

Change-Id: Ic7852edd2faea4e5d8667221e6f681cc82bb143a
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
3cd8080232
[#1689] treesvc: Fix dial options for TLS connections
There are two problems with the current approach:
1. For TLS connections we need different transport credentials.
2. grpc.NewClient() considers scheme from `URIAddr()` as a scheme for a
   resolver. `grpcs://` scheme doesn't exist, though, so the default one
   is picked. The default resolver (`dns://`) is in turn unable to parse the
   5edab9e554/internal/resolver/dns/dns_resolver.go (L405)
   The error  is `grpcs://192.168.198.248:8081:443: too many colons in address`.

Both problems don't exist in the SDK code, take it from there.

Change-Id: Ia1212050f539162a560796685efdc3f9cfbf80a0
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
a11b54ca15
[#1689] treesvc: Unify gRPC client creation for cache and sync
They connect to the same endpoints, the only difference is that
connection for synchronization is limited in lifetime and is closed
after the sync is finished. This is probably not intentional, as
synchronization was implemented before cache was introduced.
However, reusing dialTreeService() in sync.go has possible perfomance
implications, so is avoided for now.

Change-Id: I2e37befd783b4d873ff833969f932deded1195be
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
b112a92408
[#1689] treesvc: Create request after client is initialized
Make it easier to follow.

Change-Id: I40c4db77f015bb45cb25f16ce24e68188fc14380
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
19ca907223
[#1689] treesvc: Untie createConnection() from Service struct
Change-Id: I6212de4b81afe8c2516981a7bb2fea099c7df773
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-04-01 14:40:33 +03:00
f62d81e26a
[#1700] gc: Take mode mutex in locks handlers
Change-Id: I4408eae3aed936f85427b6246dcf727bd6813a0d
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 14:23:03 +03:00
27899598dc
[#1700] gc: Drop Event interface
There is only one event: new epoch.

Change-Id: I982f3650f7bc753ff2782393625452f0f8cdcc35
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 14:23:02 +03:00
bc6cc9ae2a
[#1700] engine: Print stacks on test request limiter
Change-Id: I4952769ca431d1049955823b41b99b0984b385fc
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 14:23:02 +03:00
6e1576cfdb [#1656] qos: Add tests for AdjustOutgoingIOTag Interceptors
Change-Id: If534e756b26cf7f202039d48ecdf554b4283728b
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-04-01 11:55:15 +03:00
a5bae6c5af
[#1699] qos: Allow to prohibit operations for IO tag
Change-Id: I2bee26885244e241d224860978b6de3526527e96
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 10:08:03 +03:00
5a13830a94
[#1699] mod: Bump frostfs-qos version
Change-Id: Ie5e708c0ca653596c6e3346aa286618868a5aee8
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-01 10:08:03 +03:00
dcb2b23a7d [#1656] qos: Add test for SetCriticalIOTag Interceptor
Change-Id: I4a55fcb84e6f65408a1c0120ac917e49e23354a1
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-31 18:21:48 +03:00
115aae7c34 [#1656] qos: Add tests for MaxActiveRPCLimiter Interceptors
Change-Id: Ib65890ae5aec34c34e15d4ec1f05952f74f1ad26
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-31 18:21:46 +03:00
12a0537a7a [#1689] ci: Add commit checker to Jenkinsfile
- Commit checker image is built from dco-go:
  TrueCloudLab/dco-go#14
- 'pull_request_target' branch is defined in Jenkins job:
  TrueCloudLab/jenkins#10
  TrueCloudLab/jenkins#11

Change-Id: Ib86c5749f9e084d736b868240c4b47014b02ba8d
Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2025-03-31 15:08:59 +00:00
30d4692c3e [#1640] go.mod: Bump version for frostfs-locode-db
Change-Id: Ic45ae77d6209c0097575fc8f89b076b22d50d149
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-31 10:29:41 +00:00
2254c8aff5 [#1689] go.mod: Bump SDK version
Change-Id: Ic946aa68c3d6da9e7d54363f8e9141c6547707d6
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-03-31 11:55:29 +03:00
d432bebef4 [#1689] go.mod: Bump frostfs-qos version
Change-Id: Iaa28da1a1e7b2f4ab7fd8ed661939eb38f4c7782
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-28 12:40:05 +00:00
d144abc977 [#1692] metabase: Remove useless count variable
It is always equal to `len(to)`.

Change-Id: Id7a4c26e9711216b78f96e6b2511efa0773e3471
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:16:01 +00:00
a2053870e2 [#1692] metabase: Use bucket cache in ListWithCursor()
No changes in speed, but unified approach:
```
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
                           │    master    │                 new                 │
                           │    sec/op    │    sec/op     vs base               │
ListWithCursor/1_item-8      6.067µ ±  8%   5.731µ ± 10%       ~ (p=0.052 n=10)
ListWithCursor/10_items-8    25.40µ ± 11%   26.12µ ± 13%       ~ (p=0.971 n=10)
ListWithCursor/100_items-8   210.7µ ±  9%   203.2µ ±  6%       ~ (p=0.280 n=10)
geomean                      31.90µ         31.22µ        -2.16%

                           │    master    │                  new                  │
                           │     B/op     │     B/op      vs base                 │
ListWithCursor/1_item-8      3.287Ki ± 0%   3.287Ki ± 0%       ~ (p=1.000 n=10) ¹
ListWithCursor/10_items-8    15.63Ki ± 0%   15.62Ki ± 0%       ~ (p=0.328 n=10)
ListWithCursor/100_items-8   138.1Ki ± 0%   138.1Ki ± 0%       ~ (p=0.340 n=10)
geomean                      19.21Ki        19.21Ki       -0.00%
¹ all samples are equal

                           │   master    │                 new                  │
                           │  allocs/op  │  allocs/op   vs base                 │
ListWithCursor/1_item-8       109.0 ± 0%    109.0 ± 0%       ~ (p=1.000 n=10) ¹
ListWithCursor/10_items-8     380.0 ± 0%    380.0 ± 0%       ~ (p=1.000 n=10) ¹
ListWithCursor/100_items-8   3.082k ± 0%   3.082k ± 0%       ~ (p=1.000 n=10) ¹
geomean                       503.5         503.5       +0.00%
¹ all samples are equal
```

Change-Id: Ic11673427615053656b2a60068a6d4dbd27af2cb
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:16:01 +00:00
d00c606fee [#652] adm: Group independent stages in batches
Each stage waits until transaction persists. This is needed to ensure
the next stage will see the result of the previous one. However, some of
the stages do not depend one on another, so we may execute them in
parallel.

`AwaitDisabled` flag is used to localize this batching on the code
level. We could've removed `AwaitTx()` from respective stages, but it
seems more error prone.

Close #652.

Change-Id: Ib9c6f6cd5e0db0f31aa1cda8e127b1fad5166336
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:15:21 +00:00
60446bb668 [#1689] adm/helper: Use proper nns bindings import
The one in `neo-go` is for another contract.

Change-Id: Ia1ac2da5e419b48801afdb26df72892d77344e0d
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:15:21 +00:00
bd8ab2d84a [#1689] adm: Remove useless switch in NNSIsAvailable()
After all the refactorings, there is no need to have custom behaviour
for local client.

Change-Id: I99e297cdeffff979524b3f89d3580ab5780e7681
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:15:21 +00:00
bce2f7bef0 [#1689] adm: Reuse neo.NewReader helper in transferNEOFinished()
Change-Id: I27980ed87436958cb4d27278e30e05da021d1506
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:10:37 +00:00
c2c05e2228 [#1689] adm: Reuse ReadOnlyInvoker in registerCandidateRange()
Change-Id: I544d10340825494b45a62700fa247404c18f746a
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:10:37 +00:00
0a38571a10 [#1689] adm: Simplify getCandidateRegisterPrice()
After all the refactoring, there is no more need to have custom branch
for the local client.

Change-Id: I274305b0c390578fb4583759135d3e7ce58873dc
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-03-28 12:10:31 +00:00
632bd8e38d [#1696] qos: Fix internal tag adjust
If request has no tag, but request's public key is netmap node's key or
one of allowed internal tag keys from config, then request must use
internal IO tag.

Change-Id: Iff93b626941a81b088d8999b3f2947f9501dcdf8
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-28 07:47:12 +00:00
3bbee1b554 [#1619] logger: Allow to set options for zap.Logger via logger.Prm
Change-Id: I8eed951c25d1ecf18b0aea62c6825be65a450085
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-27 15:41:37 +03:00
9358938222 [#1633] go.mod: Bump frostfs-sdk-go
Change-Id: I50c1a0d5b88e307402a5b1b2883bb9b9a357a2c7
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-26 15:03:17 +00:00
5470b205fd [#1619] gc: Fix metric frostfs_node.garbage_collector.marking_duration_seconds
Change-Id: I957f930d1babf179d0fb6de624a90f4fe9977862
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2025-03-26 14:14:26 +03:00
132 changed files with 3351 additions and 1148 deletions

2
.ci/Jenkinsfile vendored
View file

@ -79,5 +79,3 @@ async {
}
}
}
// TODO: dco check

View file

@ -1,101 +1,103 @@
# This file contains all available configuration options
# with their default values.
# options for analysis running
version: "2"
run:
# timeout for analysis, e.g. 30s, 5m, default is 1m
timeout: 20m
# include test files or not, default is true
tests: false
# output configuration options
output:
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
formats:
- format: tab
# all available settings of specific linters
linters-settings:
exhaustive:
# indicates that switch statements are to be considered exhaustive if a
# 'default' case is present, even if all enum members aren't listed in the
# switch
default-signifies-exhaustive: true
gci:
sections:
- standard
- default
custom-order: true
govet:
# report about shadowed variables
check-shadowing: false
staticcheck:
checks: ["all", "-SA1019"] # TODO Enable SA1019 after deprecated warning are fixed.
funlen:
lines: 80 # default 60
statements: 60 # default 40
gocognit:
min-complexity: 40 # default 30
importas:
no-unaliased: true
no-extra-aliases: false
alias:
pkg: git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object
alias: objectSDK
unused:
field-writes-are-uses: false
exported-fields-are-used: false
local-variables-are-used: false
custom:
truecloudlab-linters:
path: bin/linters/external_linters.so
original-url: git.frostfs.info/TrueCloudLab/linters.git
settings:
noliteral:
target-methods : ["reportFlushError", "reportError"]
disable-packages: ["codes", "err", "res","exec"]
constants-package: "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
tab:
path: stdout
colors: false
linters:
default: none
enable:
# mandatory linters
- govet
- revive
# some default golangci-lint linters
- errcheck
- gosimple
- godot
- ineffassign
- staticcheck
- typecheck
- unused
# extra linters
- bidichk
- durationcheck
- exhaustive
- containedctx
- contextcheck
- copyloopvar
- durationcheck
- errcheck
- exhaustive
- funlen
- gocognit
- godot
- importas
- ineffassign
- intrange
- misspell
- perfsprint
- predeclared
- protogetter
- reassign
- revive
- staticcheck
- testifylint
- truecloudlab-linters
- unconvert
- unparam
- unused
- usetesting
- whitespace
settings:
exhaustive:
default-signifies-exhaustive: true
funlen:
lines: 80
statements: 60
gocognit:
min-complexity: 40
importas:
alias:
- pkg: git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object
alias: objectSDK
no-unaliased: true
no-extra-aliases: false
staticcheck:
checks:
- all
- -QF1002
unused:
field-writes-are-uses: false
exported-fields-are-used: false
local-variables-are-used: false
custom:
truecloudlab-linters:
path: bin/linters/external_linters.so
original-url: git.frostfs.info/TrueCloudLab/linters.git
settings:
noliteral:
constants-package: git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs
disable-packages:
- codes
- err
- res
- exec
target-methods:
- reportFlushError
- reportError
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
paths:
- third_party$
- builtin$
- examples$
formatters:
enable:
- gci
- gofmt
- goimports
- misspell
- predeclared
- reassign
- whitespace
- containedctx
- funlen
- gocognit
- contextcheck
- importas
- truecloudlab-linters
- perfsprint
- testifylint
- protogetter
- intrange
- tenv
- unconvert
- unparam
disable-all: true
fast: false
settings:
gci:
sections:
- standard
- default
custom-order: true
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$

View file

@ -9,8 +9,8 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.23
LINT_VERSION ?= 1.62.2
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
LINT_VERSION ?= 2.0.2
TRUECLOUDLAB_LINT_VERSION ?= 0.0.10
PROTOC_VERSION ?= 25.0
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
PROTOC_OS_VERSION=osx-x86_64
@ -224,7 +224,7 @@ lint-install: $(BIN)
@@make -C $(TMP_DIR)/linters lib CGO_ENABLED=1 OUT_DIR=$(OUTPUT_LINT_DIR)
@rm -rf $(TMP_DIR)/linters
@rmdir $(TMP_DIR) 2>/dev/null || true
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install -trimpath github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
@CGO_ENABLED=1 GOBIN=$(LINT_DIR) go install -trimpath github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v$(LINT_VERSION)
# Run linters
lint:

View file

@ -0,0 +1,15 @@
package maintenance
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/maintenance/zombie"
"github.com/spf13/cobra"
)
var RootCmd = &cobra.Command{
Use: "maintenance",
Short: "Section for maintenance commands",
}
func init() {
RootCmd.AddCommand(zombie.Cmd)
}

View file

@ -0,0 +1,70 @@
package zombie
import (
"crypto/ecdsa"
"fmt"
"os"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
func getPrivateKey(cmd *cobra.Command, appCfg *config.Config) *ecdsa.PrivateKey {
keyDesc := viper.GetString(walletFlag)
if keyDesc == "" {
return &nodeconfig.Key(appCfg).PrivateKey
}
data, err := os.ReadFile(keyDesc)
commonCmd.ExitOnErr(cmd, "open wallet file: %w", err)
priv, err := keys.NewPrivateKeyFromBytes(data)
if err != nil {
w, err := wallet.NewWalletFromFile(keyDesc)
commonCmd.ExitOnErr(cmd, "provided key is incorrect, only wallet or binary key supported: %w", err)
return fromWallet(cmd, w, viper.GetString(addressFlag))
}
return &priv.PrivateKey
}
func fromWallet(cmd *cobra.Command, w *wallet.Wallet, addrStr string) *ecdsa.PrivateKey {
var (
addr util.Uint160
err error
)
if addrStr == "" {
addr = w.GetChangeAddress()
} else {
addr, err = flags.ParseAddress(addrStr)
commonCmd.ExitOnErr(cmd, "--address option must be specified and valid: %w", err)
}
acc := w.GetAccount(addr)
if acc == nil {
commonCmd.ExitOnErr(cmd, "--address option must be specified and valid: %w", fmt.Errorf("can't find wallet account for %s", addrStr))
}
pass, err := getPassword()
commonCmd.ExitOnErr(cmd, "invalid password for the encrypted key: %w", err)
commonCmd.ExitOnErr(cmd, "can't decrypt account: %w", acc.Decrypt(pass, keys.NEP2ScryptParams()))
return &acc.PrivateKey().PrivateKey
}
func getPassword() (string, error) {
// this check allows empty passwords
if viper.IsSet("password") {
return viper.GetString("password"), nil
}
return input.ReadPassword("Enter password > ")
}

View file

@ -0,0 +1,31 @@
package zombie
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
func list(cmd *cobra.Command, _ []string) {
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
appCfg := config.New(configFile, configDir, config.EnvPrefix)
storageEngine := newEngine(cmd, appCfg)
q := createQuarantine(cmd, storageEngine.DumpInfo())
var containerID *cid.ID
if cidStr, _ := cmd.Flags().GetString(cidFlag); cidStr != "" {
containerID = &cid.ID{}
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr))
}
commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(a oid.Address) error {
if containerID != nil && a.Container() != *containerID {
return nil
}
cmd.Println(a.EncodeToString())
return nil
}))
}

View file

@ -0,0 +1,46 @@
package zombie
import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
netmapClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"github.com/spf13/cobra"
)
func createMorphClient(cmd *cobra.Command, appCfg *config.Config) *client.Client {
addresses := morphconfig.RPCEndpoint(appCfg)
if len(addresses) == 0 {
commonCmd.ExitOnErr(cmd, "create morph client: %w", errors.New("no morph endpoints found"))
}
key := nodeconfig.Key(appCfg)
cli, err := client.New(cmd.Context(),
key,
client.WithDialTimeout(morphconfig.DialTimeout(appCfg)),
client.WithEndpoints(addresses...),
client.WithSwitchInterval(morphconfig.SwitchInterval(appCfg)),
)
commonCmd.ExitOnErr(cmd, "create morph client: %w", err)
return cli
}
func createContainerClient(cmd *cobra.Command, morph *client.Client) *cntClient.Client {
hs, err := morph.NNSContractAddress(client.NNSContainerContractName)
commonCmd.ExitOnErr(cmd, "resolve container contract hash: %w", err)
cc, err := cntClient.NewFromMorph(morph, hs, 0)
commonCmd.ExitOnErr(cmd, "create morph container client: %w", err)
return cc
}
func createNetmapClient(cmd *cobra.Command, morph *client.Client) *netmapClient.Client {
hs, err := morph.NNSContractAddress(client.NNSNetmapContractName)
commonCmd.ExitOnErr(cmd, "resolve netmap contract hash: %w", err)
cli, err := netmapClient.NewFromMorph(morph, hs, 0)
commonCmd.ExitOnErr(cmd, "create morph netmap client: %w", err)
return cli
}

View file

@ -0,0 +1,154 @@
package zombie
import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"strings"
"sync"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
type quarantine struct {
// mtx protects current field.
mtx sync.Mutex
current int
trees []*fstree.FSTree
}
func createQuarantine(cmd *cobra.Command, engineInfo engine.Info) *quarantine {
var paths []string
for _, sh := range engineInfo.Shards {
var storagePaths []string
for _, st := range sh.BlobStorInfo.SubStorages {
storagePaths = append(storagePaths, st.Path)
}
if len(storagePaths) == 0 {
continue
}
paths = append(paths, filepath.Join(commonPath(storagePaths), "quarantine"))
}
q, err := newQuarantine(paths)
commonCmd.ExitOnErr(cmd, "create quarantine: %w", err)
return q
}
func commonPath(paths []string) string {
if len(paths) == 0 {
return ""
}
if len(paths) == 1 {
return paths[0]
}
minLen := math.MaxInt
for _, p := range paths {
if len(p) < minLen {
minLen = len(p)
}
}
var sb strings.Builder
for i := range minLen {
for _, path := range paths[1:] {
if paths[0][i] != path[i] {
return sb.String()
}
}
sb.WriteByte(paths[0][i])
}
return sb.String()
}
func newQuarantine(paths []string) (*quarantine, error) {
var q quarantine
for i := range paths {
f := fstree.New(
fstree.WithDepth(1),
fstree.WithDirNameLen(1),
fstree.WithPath(paths[i]),
fstree.WithPerm(os.ModePerm),
)
if err := f.Open(mode.ComponentReadWrite); err != nil {
return nil, fmt.Errorf("open fstree %s: %w", paths[i], err)
}
if err := f.Init(); err != nil {
return nil, fmt.Errorf("init fstree %s: %w", paths[i], err)
}
q.trees = append(q.trees, f)
}
return &q, nil
}
func (q *quarantine) Get(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
for i := range q.trees {
res, err := q.trees[i].Get(ctx, common.GetPrm{Address: a})
if err != nil {
continue
}
return res.Object, nil
}
return nil, &apistatus.ObjectNotFound{}
}
func (q *quarantine) Delete(ctx context.Context, a oid.Address) error {
for i := range q.trees {
_, err := q.trees[i].Delete(ctx, common.DeletePrm{Address: a})
if err != nil {
continue
}
return nil
}
return &apistatus.ObjectNotFound{}
}
func (q *quarantine) Put(ctx context.Context, obj *objectSDK.Object) error {
data, err := obj.Marshal()
if err != nil {
return err
}
var prm common.PutPrm
prm.Address = objectcore.AddressOf(obj)
prm.Object = obj
prm.RawData = data
q.mtx.Lock()
current := q.current
q.current = (q.current + 1) % len(q.trees)
q.mtx.Unlock()
_, err = q.trees[current].Put(ctx, prm)
return err
}
func (q *quarantine) Iterate(ctx context.Context, f func(oid.Address) error) error {
var prm common.IteratePrm
prm.Handler = func(elem common.IterationElement) error {
return f(elem.Address)
}
for i := range q.trees {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, err := q.trees[i].Iterate(ctx, prm)
if err != nil {
return err
}
}
return nil
}

View file

@ -0,0 +1,55 @@
package zombie
import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
func remove(cmd *cobra.Command, _ []string) {
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
appCfg := config.New(configFile, configDir, config.EnvPrefix)
storageEngine := newEngine(cmd, appCfg)
q := createQuarantine(cmd, storageEngine.DumpInfo())
var containerID cid.ID
cidStr, _ := cmd.Flags().GetString(cidFlag)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr))
var objectID *oid.ID
oidStr, _ := cmd.Flags().GetString(oidFlag)
if oidStr != "" {
objectID = &oid.ID{}
commonCmd.ExitOnErr(cmd, "decode object ID string: %w", objectID.DecodeString(oidStr))
}
if objectID != nil {
var addr oid.Address
addr.SetContainer(containerID)
addr.SetObject(*objectID)
removeObject(cmd, q, addr)
} else {
commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(addr oid.Address) error {
if addr.Container() != containerID {
return nil
}
removeObject(cmd, q, addr)
return nil
}))
}
}
func removeObject(cmd *cobra.Command, q *quarantine, addr oid.Address) {
err := q.Delete(cmd.Context(), addr)
if errors.Is(err, new(apistatus.ObjectNotFound)) {
return
}
commonCmd.ExitOnErr(cmd, "remove object from quarantine: %w", err)
}

View file

@ -0,0 +1,69 @@
package zombie
import (
"crypto/sha256"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
func restore(cmd *cobra.Command, _ []string) {
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
appCfg := config.New(configFile, configDir, config.EnvPrefix)
storageEngine := newEngine(cmd, appCfg)
q := createQuarantine(cmd, storageEngine.DumpInfo())
morphClient := createMorphClient(cmd, appCfg)
cnrCli := createContainerClient(cmd, morphClient)
var containerID cid.ID
cidStr, _ := cmd.Flags().GetString(cidFlag)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", containerID.DecodeString(cidStr))
var objectID *oid.ID
oidStr, _ := cmd.Flags().GetString(oidFlag)
if oidStr != "" {
objectID = &oid.ID{}
commonCmd.ExitOnErr(cmd, "decode object ID string: %w", objectID.DecodeString(oidStr))
}
if objectID != nil {
var addr oid.Address
addr.SetContainer(containerID)
addr.SetObject(*objectID)
restoreObject(cmd, storageEngine, q, addr, cnrCli)
} else {
commonCmd.ExitOnErr(cmd, "iterate over quarantine: %w", q.Iterate(cmd.Context(), func(addr oid.Address) error {
if addr.Container() != containerID {
return nil
}
restoreObject(cmd, storageEngine, q, addr, cnrCli)
return nil
}))
}
}
func restoreObject(cmd *cobra.Command, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address, cnrCli *cntClient.Client) {
obj, err := q.Get(cmd.Context(), addr)
commonCmd.ExitOnErr(cmd, "get object from quarantine: %w", err)
rawCID := make([]byte, sha256.Size)
cid := addr.Container()
cid.Encode(rawCID)
cnr, err := cnrCli.Get(cmd.Context(), rawCID)
commonCmd.ExitOnErr(cmd, "get container: %w", err)
putPrm := engine.PutPrm{
Object: obj,
IsIndexedContainer: containerCore.IsIndexedContainer(cnr.Value),
}
commonCmd.ExitOnErr(cmd, "put object to storage engine: %w", storageEngine.Put(cmd.Context(), putPrm))
commonCmd.ExitOnErr(cmd, "remove object from quarantine: %w", q.Delete(cmd.Context(), addr))
}

View file

@ -0,0 +1,123 @@
package zombie
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
const (
flagBatchSize = "batch-size"
flagBatchSizeUsage = "Objects iteration batch size"
cidFlag = "cid"
cidFlagUsage = "Container ID"
oidFlag = "oid"
oidFlagUsage = "Object ID"
walletFlag = "wallet"
walletFlagShorthand = "w"
walletFlagUsage = "Path to the wallet or binary key"
addressFlag = "address"
addressFlagUsage = "Address of wallet account"
moveFlag = "move"
moveFlagUsage = "Move objects from storage engine to quarantine"
)
var (
Cmd = &cobra.Command{
Use: "zombie",
Short: "Zombie objects related commands",
}
scanCmd = &cobra.Command{
Use: "scan",
Short: "Scan storage engine for zombie objects and move them to quarantine",
Long: "",
PreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
_ = viper.BindPFlag(walletFlag, cmd.Flags().Lookup(walletFlag))
_ = viper.BindPFlag(addressFlag, cmd.Flags().Lookup(addressFlag))
_ = viper.BindPFlag(flagBatchSize, cmd.Flags().Lookup(flagBatchSize))
_ = viper.BindPFlag(moveFlag, cmd.Flags().Lookup(moveFlag))
},
Run: scan,
}
listCmd = &cobra.Command{
Use: "list",
Short: "List zombie objects from quarantine",
Long: "",
PreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
_ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag))
},
Run: list,
}
restoreCmd = &cobra.Command{
Use: "restore",
Short: "Restore zombie objects from quarantine",
Long: "",
PreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
_ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag))
_ = viper.BindPFlag(oidFlag, cmd.Flags().Lookup(oidFlag))
},
Run: restore,
}
removeCmd = &cobra.Command{
Use: "remove",
Short: "Remove zombie objects from quarantine",
Long: "",
PreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(commonflags.ConfigFlag, cmd.Flags().Lookup(commonflags.ConfigFlag))
_ = viper.BindPFlag(commonflags.ConfigDirFlag, cmd.Flags().Lookup(commonflags.ConfigDirFlag))
_ = viper.BindPFlag(cidFlag, cmd.Flags().Lookup(cidFlag))
_ = viper.BindPFlag(oidFlag, cmd.Flags().Lookup(oidFlag))
},
Run: remove,
}
)
func init() {
initScanCmd()
initListCmd()
initRestoreCmd()
initRemoveCmd()
}
func initScanCmd() {
Cmd.AddCommand(scanCmd)
scanCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
scanCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
scanCmd.Flags().Uint32(flagBatchSize, 1000, flagBatchSizeUsage)
scanCmd.Flags().StringP(walletFlag, walletFlagShorthand, "", walletFlagUsage)
scanCmd.Flags().String(addressFlag, "", addressFlagUsage)
scanCmd.Flags().Bool(moveFlag, false, moveFlagUsage)
}
func initListCmd() {
Cmd.AddCommand(listCmd)
listCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
listCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
listCmd.Flags().String(cidFlag, "", cidFlagUsage)
}
func initRestoreCmd() {
Cmd.AddCommand(restoreCmd)
restoreCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
restoreCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
restoreCmd.Flags().String(cidFlag, "", cidFlagUsage)
restoreCmd.Flags().String(oidFlag, "", oidFlagUsage)
}
func initRemoveCmd() {
Cmd.AddCommand(removeCmd)
removeCmd.Flags().StringP(commonflags.ConfigFlag, commonflags.ConfigFlagShorthand, "", commonflags.ConfigFlagUsage)
removeCmd.Flags().String(commonflags.ConfigDirFlag, "", commonflags.ConfigDirFlagUsage)
removeCmd.Flags().String(cidFlag, "", cidFlagUsage)
removeCmd.Flags().String(oidFlag, "", oidFlagUsage)
}

View file

@ -0,0 +1,281 @@
package zombie
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
func scan(cmd *cobra.Command, _ []string) {
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
appCfg := config.New(configFile, configDir, config.EnvPrefix)
batchSize, _ := cmd.Flags().GetUint32(flagBatchSize)
if batchSize == 0 {
commonCmd.ExitOnErr(cmd, "invalid batch size: %w", errors.New("batch size must be positive value"))
}
move, _ := cmd.Flags().GetBool(moveFlag)
storageEngine := newEngine(cmd, appCfg)
morphClient := createMorphClient(cmd, appCfg)
cnrCli := createContainerClient(cmd, morphClient)
nmCli := createNetmapClient(cmd, morphClient)
q := createQuarantine(cmd, storageEngine.DumpInfo())
pk := getPrivateKey(cmd, appCfg)
epoch, err := nmCli.Epoch(cmd.Context())
commonCmd.ExitOnErr(cmd, "read epoch from morph: %w", err)
nm, err := nmCli.GetNetMapByEpoch(cmd.Context(), epoch)
commonCmd.ExitOnErr(cmd, "read netmap from morph: %w", err)
cmd.Printf("Epoch: %d\n", nm.Epoch())
cmd.Printf("Nodes in the netmap: %d\n", len(nm.Nodes()))
ps := &processStatus{
statusCount: make(map[status]uint64),
}
stopCh := make(chan struct{})
start := time.Now()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
tick := time.NewTicker(time.Second)
defer tick.Stop()
for {
select {
case <-cmd.Context().Done():
return
case <-stopCh:
return
case <-tick.C:
fmt.Printf("Objects processed: %d; Time elapsed: %s\n", ps.total(), time.Since(start))
}
}
}()
go func() {
defer wg.Done()
err = scanStorageEngine(cmd, batchSize, storageEngine, ps, appCfg, cnrCli, nmCli, q, pk, move)
close(stopCh)
}()
wg.Wait()
commonCmd.ExitOnErr(cmd, "scan storage engine for zombie objects: %w", err)
cmd.Println()
cmd.Println("Status description:")
cmd.Println("undefined -- nothing is clear")
cmd.Println("found -- object is found in cluster")
cmd.Println("quarantine -- object is not found in cluster")
cmd.Println()
for status, count := range ps.statusCount {
cmd.Printf("Status: %s, Count: %d\n", status, count)
}
}
type status string
const (
statusUndefined status = "undefined"
statusFound status = "found"
statusQuarantine status = "quarantine"
)
func checkAddr(ctx context.Context, cnrCli *cntClient.Client, nmCli *netmap.Client, cc *cache.ClientCache, obj object.Info) (status, error) {
rawCID := make([]byte, sha256.Size)
cid := obj.Address.Container()
cid.Encode(rawCID)
cnr, err := cnrCli.Get(ctx, rawCID)
if err != nil {
var errContainerNotFound *apistatus.ContainerNotFound
if errors.As(err, &errContainerNotFound) {
// Policer will deal with this object.
return statusFound, nil
}
return statusUndefined, fmt.Errorf("read container %s from morph: %w", cid, err)
}
nm, err := nmCli.NetMap(ctx)
if err != nil {
return statusUndefined, fmt.Errorf("read netmap from morph: %w", err)
}
nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), rawCID)
if err != nil {
// Not enough nodes, check all netmap nodes.
nodes = append([][]netmap.NodeInfo{}, nm.Nodes())
}
objID := obj.Address.Object()
cnrID := obj.Address.Container()
local := true
raw := false
if obj.ECInfo != nil {
objID = obj.ECInfo.ParentID
local = false
raw = true
}
prm := clientSDK.PrmObjectHead{
ObjectID: &objID,
ContainerID: &cnrID,
Local: local,
Raw: raw,
}
var ni clientCore.NodeInfo
for i := range nodes {
for j := range nodes[i] {
if err := clientCore.NodeInfoFromRawNetmapElement(&ni, netmapCore.Node(nodes[i][j])); err != nil {
return statusUndefined, fmt.Errorf("parse node info: %w", err)
}
c, err := cc.Get(ni)
if err != nil {
continue
}
res, err := c.ObjectHead(ctx, prm)
if err != nil {
var errECInfo *objectSDK.ECInfoError
if raw && errors.As(err, &errECInfo) {
return statusFound, nil
}
continue
}
if err := apistatus.ErrFromStatus(res.Status()); err != nil {
continue
}
return statusFound, nil
}
}
if cnr.Value.PlacementPolicy().NumberOfReplicas() == 1 && cnr.Value.PlacementPolicy().ReplicaDescriptor(0).NumberOfObjects() == 1 {
return statusFound, nil
}
return statusQuarantine, nil
}
func scanStorageEngine(cmd *cobra.Command, batchSize uint32, storageEngine *engine.StorageEngine, ps *processStatus,
appCfg *config.Config, cnrCli *cntClient.Client, nmCli *netmap.Client, q *quarantine, pk *ecdsa.PrivateKey, move bool,
) error {
cc := cache.NewSDKClientCache(cache.ClientCacheOpts{
DialTimeout: apiclientconfig.DialTimeout(appCfg),
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
Key: pk,
AllowExternal: apiclientconfig.AllowExternal(appCfg),
})
ctx := cmd.Context()
var cursor *engine.Cursor
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var prm engine.ListWithCursorPrm
prm.WithCursor(cursor)
prm.WithCount(batchSize)
res, err := storageEngine.ListWithCursor(ctx, prm)
if err != nil {
if errors.Is(err, engine.ErrEndOfListing) {
return nil
}
return fmt.Errorf("list with cursor: %w", err)
}
cursor = res.Cursor()
addrList := res.AddressList()
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(int(batchSize))
for i := range addrList {
addr := addrList[i]
eg.Go(func() error {
result, err := checkAddr(egCtx, cnrCli, nmCli, cc, addr)
if err != nil {
return fmt.Errorf("check object %s status: %w", addr.Address, err)
}
ps.add(result)
if !move && result == statusQuarantine {
cmd.Println(addr)
return nil
}
if result == statusQuarantine {
return moveToQuarantine(egCtx, storageEngine, q, addr.Address)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("process objects batch: %w", err)
}
}
}
func moveToQuarantine(ctx context.Context, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address) error {
var getPrm engine.GetPrm
getPrm.WithAddress(addr)
res, err := storageEngine.Get(ctx, getPrm)
if err != nil {
return fmt.Errorf("get object %s from storage engine: %w", addr, err)
}
if err := q.Put(ctx, res.Object()); err != nil {
return fmt.Errorf("put object %s to quarantine: %w", addr, err)
}
var delPrm engine.DeletePrm
delPrm.WithForceRemoval()
delPrm.WithAddress(addr)
if err = storageEngine.Delete(ctx, delPrm); err != nil {
return fmt.Errorf("delete object %s from storage engine: %w", addr, err)
}
return nil
}
type processStatus struct {
guard sync.RWMutex
statusCount map[status]uint64
count uint64
}
func (s *processStatus) add(st status) {
s.guard.Lock()
defer s.guard.Unlock()
s.statusCount[st]++
s.count++
}
func (s *processStatus) total() uint64 {
s.guard.RLock()
defer s.guard.RUnlock()
return s.count
}

View file

@ -0,0 +1,203 @@
package zombie
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/panjf2000/ants/v2"
"github.com/spf13/cobra"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func newEngine(cmd *cobra.Command, c *config.Config) *engine.StorageEngine {
ngOpts := storageEngineOptions(c)
shardOpts := shardOptions(cmd, c)
e := engine.New(ngOpts...)
for _, opts := range shardOpts {
_, err := e.AddShard(cmd.Context(), opts...)
commonCmd.ExitOnErr(cmd, "iterate shards from config: %w", err)
}
commonCmd.ExitOnErr(cmd, "open storage engine: %w", e.Open(cmd.Context()))
commonCmd.ExitOnErr(cmd, "init storage engine: %w", e.Init(cmd.Context()))
return e
}
func storageEngineOptions(c *config.Config) []engine.Option {
return []engine.Option{
engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)),
engine.WithLogger(logger.NewLoggerWrapper(zap.NewNop())),
engine.WithLowMemoryConsumption(engineconfig.EngineLowMemoryConsumption(c)),
}
}
func shardOptions(cmd *cobra.Command, c *config.Config) [][]shard.Option {
var result [][]shard.Option
err := engineconfig.IterateShards(c, false, func(sh *shardconfig.Config) error {
result = append(result, getShardOpts(cmd, c, sh))
return nil
})
commonCmd.ExitOnErr(cmd, "iterate shards from config: %w", err)
return result
}
func getShardOpts(cmd *cobra.Command, c *config.Config, sh *shardconfig.Config) []shard.Option {
wc, wcEnabled := getWriteCacheOpts(sh)
return []shard.Option{
shard.WithLogger(logger.NewLoggerWrapper(zap.NewNop())),
shard.WithRefillMetabase(sh.RefillMetabase()),
shard.WithRefillMetabaseWorkersCount(sh.RefillMetabaseWorkersCount()),
shard.WithMode(sh.Mode()),
shard.WithBlobStorOptions(getBlobstorOpts(cmd.Context(), sh)...),
shard.WithMetaBaseOptions(getMetabaseOpts(sh)...),
shard.WithPiloramaOptions(getPiloramaOpts(c, sh)...),
shard.WithWriteCache(wcEnabled),
shard.WithWriteCacheOptions(wc),
shard.WithRemoverBatchSize(sh.GC().RemoverBatchSize()),
shard.WithGCRemoverSleepInterval(sh.GC().RemoverSleepInterval()),
shard.WithExpiredCollectorBatchSize(sh.GC().ExpiredCollectorBatchSize()),
shard.WithExpiredCollectorWorkerCount(sh.GC().ExpiredCollectorWorkerCount()),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
commonCmd.ExitOnErr(cmd, "init GC pool: %w", err)
return pool
}),
shard.WithLimiter(qos.NewNoopLimiter()),
}
}
func getWriteCacheOpts(sh *shardconfig.Config) ([]writecache.Option, bool) {
if wc := sh.WriteCache(); wc != nil && wc.Enabled() {
var result []writecache.Option
result = append(result,
writecache.WithPath(wc.Path()),
writecache.WithFlushSizeLimit(wc.MaxFlushingObjectsSize()),
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
writecache.WithFlushWorkersCount(wc.WorkerCount()),
writecache.WithMaxCacheSize(wc.SizeLimit()),
writecache.WithMaxCacheCount(wc.CountLimit()),
writecache.WithNoSync(wc.NoSync()),
writecache.WithLogger(logger.NewLoggerWrapper(zap.NewNop())),
writecache.WithQoSLimiter(qos.NewNoopLimiter()),
)
return result, true
}
return nil, false
}
func getPiloramaOpts(c *config.Config, sh *shardconfig.Config) []pilorama.Option {
var piloramaOpts []pilorama.Option
if config.BoolSafe(c.Sub("tree"), "enabled") {
pr := sh.Pilorama()
piloramaOpts = append(piloramaOpts,
pilorama.WithPath(pr.Path()),
pilorama.WithPerm(pr.Perm()),
pilorama.WithNoSync(pr.NoSync()),
pilorama.WithMaxBatchSize(pr.MaxBatchSize()),
pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()),
)
}
return piloramaOpts
}
func getMetabaseOpts(sh *shardconfig.Config) []meta.Option {
return []meta.Option{
meta.WithPath(sh.Metabase().Path()),
meta.WithPermissions(sh.Metabase().BoltDB().Perm()),
meta.WithMaxBatchSize(sh.Metabase().BoltDB().MaxBatchSize()),
meta.WithMaxBatchDelay(sh.Metabase().BoltDB().MaxBatchDelay()),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),
meta.WithLogger(logger.NewLoggerWrapper(zap.NewNop())),
meta.WithEpochState(&epochState{}),
}
}
func getBlobstorOpts(ctx context.Context, sh *shardconfig.Config) []blobstor.Option {
result := []blobstor.Option{
blobstor.WithCompressObjects(sh.Compress()),
blobstor.WithUncompressableContentTypes(sh.UncompressableContentTypes()),
blobstor.WithCompressibilityEstimate(sh.EstimateCompressibility()),
blobstor.WithCompressibilityEstimateThreshold(sh.EstimateCompressibilityThreshold()),
blobstor.WithStorages(getSubStorages(ctx, sh)),
blobstor.WithLogger(logger.NewLoggerWrapper(zap.NewNop())),
}
return result
}
func getSubStorages(ctx context.Context, sh *shardconfig.Config) []blobstor.SubStorage {
var ss []blobstor.SubStorage
for _, storage := range sh.BlobStor().Storages() {
switch storage.Type() {
case blobovniczatree.Type:
sub := blobovniczaconfig.From((*config.Config)(storage))
blobTreeOpts := []blobovniczatree.Option{
blobovniczatree.WithRootPath(storage.Path()),
blobovniczatree.WithPermissions(storage.Perm()),
blobovniczatree.WithBlobovniczaSize(sub.Size()),
blobovniczatree.WithBlobovniczaShallowDepth(sub.ShallowDepth()),
blobovniczatree.WithBlobovniczaShallowWidth(sub.ShallowWidth()),
blobovniczatree.WithOpenedCacheSize(sub.OpenedCacheSize()),
blobovniczatree.WithOpenedCacheTTL(sub.OpenedCacheTTL()),
blobovniczatree.WithOpenedCacheExpInterval(sub.OpenedCacheExpInterval()),
blobovniczatree.WithInitWorkerCount(sub.InitWorkerCount()),
blobovniczatree.WithWaitBeforeDropDB(sub.RebuildDropTimeout()),
blobovniczatree.WithLogger(logger.NewLoggerWrapper(zap.NewNop())),
blobovniczatree.WithObjectSizeLimit(sh.SmallSizeLimit()),
}
ss = append(ss, blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(ctx, blobTreeOpts...),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < sh.SmallSizeLimit()
},
})
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storage))
fstreeOpts := []fstree.Option{
fstree.WithPath(storage.Path()),
fstree.WithPerm(storage.Perm()),
fstree.WithDepth(sub.Depth()),
fstree.WithNoSync(sub.NoSync()),
fstree.WithLogger(logger.NewLoggerWrapper(zap.NewNop())),
}
ss = append(ss, blobstor.SubStorage{
Storage: fstree.New(fstreeOpts...),
Policy: func(_ *objectSDK.Object, _ []byte) bool {
return true
},
})
default:
// should never happen, that has already
// been handled: when the config was read
}
}
return ss
}
type epochState struct{}
func (epochState) CurrentEpoch() uint64 {
return 0
}

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/state"
@ -161,9 +162,7 @@ func printAlphabetContractBalances(cmd *cobra.Command, c helper.Client, inv *inv
helper.GetAlphabetNNSDomain(i),
int64(nns.TXT))
}
if w.Err != nil {
panic(w.Err)
}
assert.NoError(w.Err)
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
if err != nil {
@ -226,9 +225,7 @@ func fetchBalances(c *invoker.Invoker, gasHash util.Uint160, accounts []accBalan
for i := range accounts {
emit.AppCall(w.BinWriter, gasHash, "balanceOf", callflag.ReadStates, accounts[i].scriptHash)
}
if w.Err != nil {
panic(w.Err)
}
assert.NoError(w.Err)
res, err := c.Run(w.Bytes())
if err != nil || res.State != vmstate.Halt.String() || len(res.Stack) != len(accounts) {

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/io"
@ -235,9 +236,7 @@ func restoreOrPutContainers(containers []Container, isOK func([]byte) bool, cmd
putContainer(bw, ch, cnt)
if bw.Err != nil {
panic(bw.Err)
}
assert.NoError(bw.Err)
if err := wCtx.SendConsensusTx(bw.Bytes()); err != nil {
return err

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
@ -120,9 +121,7 @@ func deployContractCmd(cmd *cobra.Command, args []string) error {
}
}
if writer.Err != nil {
panic(fmt.Errorf("BUG: can't create deployment script: %w", writer.Err))
}
assert.NoError(writer.Err, "can't create deployment script")
if err := c.SendCommitteeTx(writer.Bytes(), false); err != nil {
return err
@ -173,9 +172,8 @@ func registerNNS(nnsCs *state.Contract, c *helper.InitializeContext, zone string
domain, int64(nns.TXT), address.Uint160ToString(cs.Hash))
}
if bw.Err != nil {
panic(fmt.Errorf("BUG: can't create deployment script: %w", writer.Err))
} else if bw.Len() != start {
assert.NoError(bw.Err, "can't create deployment script")
if bw.Len() != start {
writer.WriteBytes(bw.Bytes())
emit.Opcodes(writer.BinWriter, opcode.LDSFLD0, opcode.PUSH1, opcode.PACK)
emit.AppCallNoArgs(writer.BinWriter, nnsCs.Hash, "setPrice", callflag.All)

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
@ -236,21 +237,17 @@ func fillContractVersion(cmd *cobra.Command, c helper.Client, infos []contractDu
} else {
sub.Reset()
emit.AppCall(sub.BinWriter, infos[i].hash, "version", callflag.NoneFlag)
if sub.Err != nil {
panic(fmt.Errorf("BUG: can't create version script: %w", bw.Err))
}
assert.NoError(sub.Err, "can't create version script")
script := sub.Bytes()
emit.Instruction(bw.BinWriter, opcode.TRY, []byte{byte(3 + len(script) + 2), 0})
bw.BinWriter.WriteBytes(script)
bw.WriteBytes(script)
emit.Instruction(bw.BinWriter, opcode.ENDTRY, []byte{2 + 1})
emit.Opcodes(bw.BinWriter, opcode.PUSH0)
}
}
emit.Opcodes(bw.BinWriter, opcode.NOP) // for the last ENDTRY target
if bw.Err != nil {
panic(fmt.Errorf("BUG: can't create version script: %w", bw.Err))
}
assert.NoError(bw.Err, "can't create version script")
res, err := c.InvokeScript(bw.Bytes(), nil)
if err != nil {

View file

@ -6,6 +6,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
nns2 "git.frostfs.info/TrueCloudLab/frostfs-contract/rpcclient/nns"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
@ -13,9 +14,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
nns2 "github.com/nspcc-dev/neo-go/pkg/rpcclient/nns"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -187,19 +186,9 @@ func NNSResolveKey(inv *invoker.Invoker, nnsHash util.Uint160, domain string) (*
}
func NNSIsAvailable(c Client, nnsHash util.Uint160, name string) (bool, error) {
switch c.(type) {
case *rpcclient.Client:
inv := invoker.New(c, nil)
reader := nns2.NewReader(inv, nnsHash)
return reader.IsAvailable(name)
default:
b, err := unwrap.Bool(InvokeFunction(c, nnsHash, "isAvailable", []any{name}, nil))
if err != nil {
return false, fmt.Errorf("`isAvailable`: invalid response: %w", err)
}
return b, nil
}
inv := invoker.New(c, nil)
reader := nns2.NewReader(inv, nnsHash)
return reader.IsAvailable(name)
}
func CheckNotaryEnabled(c Client) error {

View file

@ -13,6 +13,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"github.com/nspcc-dev/neo-go/pkg/core/state"
@ -21,6 +22,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/management"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/context"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
@ -28,7 +30,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -375,9 +376,7 @@ func (c *InitializeContext) sendMultiTx(script []byte, tryGroup bool, withConsen
}
act, err = actor.New(c.Client, signers)
} else {
if withConsensus {
panic("BUG: should never happen")
}
assert.False(withConsensus, "BUG: should never happen")
act, err = c.CommitteeAct, nil
}
if err != nil {
@ -411,11 +410,9 @@ func (c *InitializeContext) MultiSignAndSend(tx *transaction.Transaction, accTyp
func (c *InitializeContext) MultiSign(tx *transaction.Transaction, accType string) error {
version, err := c.Client.GetVersion()
if err != nil {
// error appears only if client
// has not been initialized
panic(err)
}
// error appears only if client
// has not been initialized
assert.NoError(err)
network := version.Protocol.Network
// Use parameter context to avoid dealing with signature order.
@ -447,12 +444,12 @@ func (c *InitializeContext) MultiSign(tx *transaction.Transaction, accType strin
for i := range tx.Signers {
if tx.Signers[i].Account == h {
assert.True(i <= len(tx.Scripts), "BUG: invalid signing order")
if i < len(tx.Scripts) {
tx.Scripts[i] = *w
} else if i == len(tx.Scripts) {
}
if i == len(tx.Scripts) {
tx.Scripts = append(tx.Scripts, *w)
} else {
panic("BUG: invalid signing order")
}
return nil
}
@ -510,9 +507,7 @@ func (c *InitializeContext) NNSRegisterDomainScript(nnsHash, expectedHash util.U
int64(constants.DefaultExpirationTime), constants.NNSTtlDefVal)
emit.Opcodes(bw.BinWriter, opcode.ASSERT)
if bw.Err != nil {
panic(bw.Err)
}
assert.NoError(bw.Err)
return bw.Bytes(), false, nil
}
@ -524,12 +519,8 @@ func (c *InitializeContext) NNSRegisterDomainScript(nnsHash, expectedHash util.U
}
func (c *InitializeContext) NNSRootRegistered(nnsHash util.Uint160, zone string) (bool, error) {
res, err := c.CommitteeAct.Call(nnsHash, "isAvailable", "name."+zone)
if err != nil {
return false, err
}
return res.State == vmstate.Halt.String(), nil
avail, err := unwrap.Bool(c.CommitteeAct.Call(nnsHash, "isAvailable", zone))
return !avail, err
}
func (c *InitializeContext) IsUpdated(ctrHash util.Uint160, cs *ContractState) bool {

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core"
@ -316,9 +317,7 @@ func (l *LocalClient) SendRawTransaction(tx *transaction.Transaction) (util.Uint
func (l *LocalClient) putTransactions() error {
// 1. Prepare new block.
lastBlock, err := l.bc.GetBlock(l.bc.CurrentBlockHash())
if err != nil {
panic(err)
}
assert.NoError(err)
defer func() { l.transactions = l.transactions[:0] }()
b := &block.Block{
@ -359,9 +358,7 @@ func InvokeFunction(c Client, h util.Uint160, method string, parameters []any, s
w := io.NewBufBinWriter()
emit.Array(w.BinWriter, parameters...)
emit.AppCallNoArgs(w.BinWriter, h, method, callflag.All)
if w.Err != nil {
panic(fmt.Sprintf("BUG: invalid parameters for '%s': %v", method, w.Err))
}
assert.True(w.Err == nil, fmt.Sprintf("BUG: invalid parameters for '%s': %v", method, w.Err))
return c.InvokeScript(w.Bytes(), signers)
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-contract/nns"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -111,9 +112,7 @@ func wrapRegisterScriptWithPrice(w *io.BufBinWriter, nnsHash util.Uint160, s []b
emit.Opcodes(w.BinWriter, opcode.LDSFLD0, opcode.PUSH1, opcode.PACK)
emit.AppCallNoArgs(w.BinWriter, nnsHash, "setPrice", callflag.All)
if w.Err != nil {
panic(fmt.Errorf("BUG: can't wrap register script: %w", w.Err))
}
assert.NoError(w.Err, "can't wrap register script")
}
func nnsRegisterDomain(c *helper.InitializeContext, nnsHash, expectedHash util.Uint160, domain string) error {

View file

@ -1,21 +1,18 @@
package initialize
import (
"errors"
"fmt"
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/constants"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph/helper"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/neo"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -30,7 +27,8 @@ const (
)
func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
regPrice, err := getCandidateRegisterPrice(c)
reader := neo.NewReader(c.ReadOnlyInvoker)
regPrice, err := reader.GetRegisterPrice()
if err != nil {
return fmt.Errorf("can't fetch registration price: %w", err)
}
@ -42,9 +40,7 @@ func registerCandidateRange(c *helper.InitializeContext, start, end int) error {
emit.Opcodes(w.BinWriter, opcode.ASSERT)
}
emit.AppCall(w.BinWriter, neo.Hash, "setRegisterPrice", callflag.States, regPrice)
if w.Err != nil {
panic(fmt.Sprintf("BUG: %v", w.Err))
}
assert.NoError(w.Err)
signers := []actor.SignerAccount{{
Signer: c.GetSigner(false, c.CommitteeAcc),
@ -116,7 +112,7 @@ func registerCandidates(c *helper.InitializeContext) error {
func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
neoHash := neo.Hash
ok, err := transferNEOFinished(c, neoHash)
ok, err := transferNEOFinished(c)
if ok || err != nil {
return err
}
@ -139,33 +135,8 @@ func transferNEOToAlphabetContracts(c *helper.InitializeContext) error {
return c.AwaitTx()
}
func transferNEOFinished(c *helper.InitializeContext, neoHash util.Uint160) (bool, error) {
r := nep17.NewReader(c.ReadOnlyInvoker, neoHash)
func transferNEOFinished(c *helper.InitializeContext) (bool, error) {
r := neo.NewReader(c.ReadOnlyInvoker)
bal, err := r.BalanceOf(c.CommitteeAcc.Contract.ScriptHash())
return bal.Cmp(big.NewInt(native.NEOTotalSupply)) == -1, err
}
var errGetPriceInvalid = errors.New("`getRegisterPrice`: invalid response")
func getCandidateRegisterPrice(c *helper.InitializeContext) (int64, error) {
switch c.Client.(type) {
case *rpcclient.Client:
inv := invoker.New(c.Client, nil)
reader := neo.NewReader(inv)
return reader.GetRegisterPrice()
default:
neoHash := neo.Hash
res, err := helper.InvokeFunction(c.Client, neoHash, "getRegisterPrice", nil, nil)
if err != nil {
return 0, err
}
if len(res.Stack) == 0 {
return 0, errGetPriceInvalid
}
bi, err := res.Stack[0].TryInteger()
if err != nil || !bi.IsInt64() {
return 0, errGetPriceInvalid
}
return bi.Int64(), nil
}
}

View file

@ -5,6 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/maintenance"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/modules/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
@ -41,6 +42,7 @@ func init() {
rootCmd.AddCommand(config.RootCmd)
rootCmd.AddCommand(morph.RootCmd)
rootCmd.AddCommand(metabase.RootCmd)
rootCmd.AddCommand(maintenance.RootCmd)
rootCmd.AddCommand(autocomplete.Command("frostfs-adm"))
rootCmd.AddCommand(gendoc.Command(rootCmd, gendoc.Options{}))

View file

@ -858,6 +858,8 @@ type PatchObjectPrm struct {
ReplaceAttribute bool
NewSplitHeader *objectSDK.SplitHeader
PayloadPatches []PayloadPatch
}
@ -888,7 +890,11 @@ func Patch(ctx context.Context, prm PatchObjectPrm) (*PatchRes, error) {
return nil, fmt.Errorf("init payload reading: %w", err)
}
if patcher.PatchAttributes(ctx, prm.NewAttributes, prm.ReplaceAttribute) {
if patcher.PatchHeader(ctx, client.PatchHeaderPrm{
NewSplitHeader: prm.NewSplitHeader,
NewAttributes: prm.NewAttributes,
ReplaceAttributes: prm.ReplaceAttribute,
}) {
for _, pp := range prm.PayloadPatches {
payloadFile, err := os.OpenFile(pp.PayloadPath, os.O_RDONLY, os.ModePerm)
if err != nil {

View file

@ -44,6 +44,7 @@ is set to current epoch + n.
_ = viper.BindPFlag(commonflags.WalletPath, ff.Lookup(commonflags.WalletPath))
_ = viper.BindPFlag(commonflags.Account, ff.Lookup(commonflags.Account))
_ = viper.BindPFlag(commonflags.RPC, ff.Lookup(commonflags.RPC))
},
}
@ -81,7 +82,7 @@ func createToken(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "can't parse --"+notValidBeforeFlag+" flag: %w", err)
if iatRelative || expRelative || nvbRelative {
endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
endpoint := viper.GetString(commonflags.RPC)
if len(endpoint) == 0 {
commonCmd.ExitOnErr(cmd, "can't fetch current epoch: %w", fmt.Errorf("'%s' flag value must be specified", commonflags.RPC))
}

View file

@ -5,7 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"os"
"slices"
"strings"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
@ -19,8 +21,9 @@ import (
)
type policyPlaygroundREPL struct {
cmd *cobra.Command
nodes map[string]netmap.NodeInfo
cmd *cobra.Command
nodes map[string]netmap.NodeInfo
console *readline.Instance
}
func newPolicyPlaygroundREPL(cmd *cobra.Command) *policyPlaygroundREPL {
@ -40,7 +43,7 @@ func (repl *policyPlaygroundREPL) handleLs(args []string) error {
node.IterateAttributes(func(k, v string) {
attrs = append(attrs, fmt.Sprintf("%s:%q", k, v))
})
fmt.Printf("\t%2d: id=%s attrs={%v}\n", i, id, strings.Join(attrs, " "))
fmt.Fprintf(repl.console, "\t%2d: id=%s attrs={%v}\n", i, id, strings.Join(attrs, " "))
i++
}
return nil
@ -147,12 +150,29 @@ func (repl *policyPlaygroundREPL) handleEval(args []string) error {
for _, node := range ns {
ids = append(ids, hex.EncodeToString(node.PublicKey()))
}
fmt.Printf("\t%2d: %v\n", i+1, ids)
fmt.Fprintf(repl.console, "\t%2d: %v\n", i+1, ids)
}
return nil
}
func (repl *policyPlaygroundREPL) handleHelp(args []string) error {
if len(args) != 0 {
if _, ok := commands[args[0]]; !ok {
return fmt.Errorf("unknown command: %q", args[0])
}
fmt.Fprintln(repl.console, commands[args[0]].usage)
return nil
}
commandList := slices.Collect(maps.Keys(commands))
slices.Sort(commandList)
for _, command := range commandList {
fmt.Fprintf(repl.console, "%s: %s\n", command, commands[command].descriprion)
}
return nil
}
func (repl *policyPlaygroundREPL) netMap() netmap.NetMap {
var nm netmap.NetMap
var nodes []netmap.NodeInfo
@ -163,15 +183,104 @@ func (repl *policyPlaygroundREPL) netMap() netmap.NetMap {
return nm
}
var policyPlaygroundCompleter = readline.NewPrefixCompleter(
readline.PcItem("list"),
readline.PcItem("ls"),
readline.PcItem("add"),
readline.PcItem("load"),
readline.PcItem("remove"),
readline.PcItem("rm"),
readline.PcItem("eval"),
)
type commandDescription struct {
descriprion string
usage string
}
var commands = map[string]commandDescription{
"list": {
descriprion: "Display all nodes in the netmap",
usage: `Display all nodes in the netmap
Example of usage:
list
1: id=03ff65b6ae79134a4dce9d0d39d3851e9bab4ee97abf86e81e1c5bbc50cd2826ae attrs={Continent:"Europe" Country:"Poland"}
2: id=02ac920cd7df0b61b289072e6b946e2da4e1a31b9ab1c621bb475e30fa4ab102c3 attrs={Continent:"Antarctica" Country:"Heard Island"}
`,
},
"ls": {
descriprion: "Display all nodes in the netmap",
usage: `Display all nodes in the netmap
Example of usage:
ls
1: id=03ff65b6ae79134a4dce9d0d39d3851e9bab4ee97abf86e81e1c5bbc50cd2826ae attrs={Continent:"Europe" Country:"Poland"}
2: id=02ac920cd7df0b61b289072e6b946e2da4e1a31b9ab1c621bb475e30fa4ab102c3 attrs={Continent:"Antarctica" Country:"Heard Island"}
`,
},
"add": {
descriprion: "Add a new node: add <node-hash> attr=value",
usage: `Add a new node
Example of usage:
add 03ff65b6ae79134a4dce9d0d39d3851e9bab4ee97abf86e81e1c5bbc50cd2826ae continent:Europe country:Poland`,
},
"load": {
descriprion: "Load netmap from file: load <path>",
usage: `Load netmap from file
Example of usage:
load "netmap.json"
File format (netmap.json):
{
"03ff65b6ae79134a4dce9d0d39d3851e9bab4ee97abf86e81e1c5bbc50cd2826ae": {
"continent": "Europe",
"country": "Poland"
},
"02ac920cd7df0b61b289072e6b946e2da4e1a31b9ab1c621bb475e30fa4ab102c3": {
"continent": "Antarctica",
"country": "Heard Island"
}
}`,
},
"remove": {
descriprion: "Remove a node: remove <node-hash>",
usage: `Remove a node
Example of usage:
remove 03ff65b6ae79134a4dce9d0d39d3851e9bab4ee97abf86e81e1c5bbc50cd2826ae`,
},
"rm": {
descriprion: "Remove a node: rm <node-hash>",
usage: `Remove a node
Example of usage:
rm 03ff65b6ae79134a4dce9d0d39d3851e9bab4ee97abf86e81e1c5bbc50cd2826ae`,
},
"eval": {
descriprion: "Evaluate a policy: eval <policy>",
usage: `Evaluate a policy
Example of usage:
eval REP 2`,
},
"help": {
descriprion: "Show available commands",
},
}
func (repl *policyPlaygroundREPL) handleCommand(args []string) error {
if len(args) == 0 {
return nil
}
switch args[0] {
case "list", "ls":
return repl.handleLs(args[1:])
case "add":
return repl.handleAdd(args[1:])
case "load":
return repl.handleLoad(args[1:])
case "remove", "rm":
return repl.handleRemove(args[1:])
case "eval":
return repl.handleEval(args[1:])
case "help":
return repl.handleHelp(args[1:])
}
return fmt.Errorf("unknown command %q. See 'help' for assistance", args[0])
}
func (repl *policyPlaygroundREPL) run() error {
if len(viper.GetString(commonflags.RPC)) > 0 {
@ -190,24 +299,32 @@ func (repl *policyPlaygroundREPL) run() error {
}
}
cmdHandlers := map[string]func([]string) error{
"list": repl.handleLs,
"ls": repl.handleLs,
"add": repl.handleAdd,
"load": repl.handleLoad,
"remove": repl.handleRemove,
"rm": repl.handleRemove,
"eval": repl.handleEval,
if len(viper.GetString(netmapConfigPath)) > 0 {
err := repl.handleLoad([]string{viper.GetString(netmapConfigPath)})
commonCmd.ExitOnErr(repl.cmd, "load netmap config error: %w", err)
}
var cfgCompleter []readline.PrefixCompleterInterface
var helpSubItems []readline.PrefixCompleterInterface
for name := range commands {
if name != "help" {
cfgCompleter = append(cfgCompleter, readline.PcItem(name))
helpSubItems = append(helpSubItems, readline.PcItem(name))
}
}
cfgCompleter = append(cfgCompleter, readline.PcItem("help", helpSubItems...))
completer := readline.NewPrefixCompleter(cfgCompleter...)
rl, err := readline.NewEx(&readline.Config{
Prompt: "> ",
InterruptPrompt: "^C",
AutoComplete: policyPlaygroundCompleter,
AutoComplete: completer,
})
if err != nil {
return fmt.Errorf("error initializing readline: %w", err)
}
repl.console = rl
defer rl.Close()
var exit bool
@ -225,17 +342,8 @@ func (repl *policyPlaygroundREPL) run() error {
}
exit = false
parts := strings.Fields(line)
if len(parts) == 0 {
continue
}
cmd := parts[0]
if handler, exists := cmdHandlers[cmd]; exists {
if err := handler(parts[1:]); err != nil {
fmt.Printf("error: %v\n", err)
}
} else {
fmt.Printf("error: unknown command %q\n", cmd)
if err := repl.handleCommand(strings.Fields(line)); err != nil {
fmt.Fprintf(repl.console, "error: %v\n", err)
}
}
}
@ -251,6 +359,14 @@ If a wallet and endpoint is provided, the initial netmap data will be loaded fro
},
}
const (
netmapConfigPath = "netmap-config"
netmapConfigUsage = "Path to the netmap configuration file"
)
func initContainerPolicyPlaygroundCmd() {
commonflags.Init(policyPlaygroundCmd)
policyPlaygroundCmd.Flags().String(netmapConfigPath, "", netmapConfigUsage)
_ = viper.BindPFlag(netmapConfigPath, policyPlaygroundCmd.Flags().Lookup(netmapConfigPath))
}

View file

@ -296,7 +296,7 @@ func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatu
leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft
leftMinutes := int(leftSeconds / 60)
sb.WriteString(fmt.Sprintf(" Estimated time left: %d minutes.", leftMinutes))
fmt.Fprintf(sb, " Estimated time left: %d minutes.", leftMinutes)
}
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
@ -305,20 +305,20 @@ func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusR
hour := int(duration.Seconds() / 3600)
minute := int(duration.Seconds()/60) % 60
second := int(duration.Seconds()) % 60
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
fmt.Fprintf(sb, " Duration: %02d:%02d:%02d.", hour, minute, second)
}
}
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetStartedAt() != nil {
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
fmt.Fprintf(sb, " Started at: %s UTC.", startedAt.Format(time.RFC3339))
}
}
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if len(resp.GetBody().GetErrorMessage()) > 0 {
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.GetBody().GetErrorMessage()))
fmt.Fprintf(sb, " Error: %s.", resp.GetBody().GetErrorMessage())
}
}
@ -332,7 +332,7 @@ func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusRes
default:
status = "undefined"
}
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
fmt.Fprintf(sb, " Status: %s.", status)
}
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
@ -350,14 +350,14 @@ func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusR
}
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d; evacuated %d trees out of %d, failed to evacuate: %d.",
fmt.Fprintf(sb, " Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d; evacuated %d trees out of %d, failed to evacuate: %d.",
resp.GetBody().GetEvacuatedObjects(),
resp.GetBody().GetTotalObjects(),
resp.GetBody().GetFailedObjects(),
resp.GetBody().GetSkippedObjects(),
resp.GetBody().GetEvacuatedTrees(),
resp.GetBody().GetTotalTrees(),
resp.GetBody().GetFailedTrees()))
resp.GetBody().GetFailedTrees())
}
func initControlEvacuationShardCmd() {

View file

@ -18,6 +18,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// object lock command.
@ -78,7 +79,7 @@ var objectLockCmd = &cobra.Command{
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
endpoint := viper.GetString(commonflags.RPC)
currEpoch, err := internalclient.GetCurrentEpoch(ctx, cmd, endpoint)
commonCmd.ExitOnErr(cmd, "Request current epoch: %w", err)

View file

@ -48,6 +48,12 @@ type ecHeader struct {
parent oid.ID
}
type objectCounter struct {
sync.Mutex
total uint32
isECcounted bool
}
type objectPlacement struct {
requiredNodes []netmapSDK.NodeInfo
confirmedNodes []netmapSDK.NodeInfo
@ -56,6 +62,7 @@ type objectPlacement struct {
type objectNodesResult struct {
errors []error
placements map[oid.ID]objectPlacement
total uint32
}
type ObjNodesDataObject struct {
@ -106,18 +113,18 @@ func objectNodes(cmd *cobra.Command, _ []string) {
pk := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
objects := getPhyObjects(cmd, cnrID, objID, cli, pk)
objects, count := getPhyObjects(cmd, cnrID, objID, cli, pk)
placementPolicy, netmap := getPlacementPolicyAndNetmap(cmd, cnrID, cli)
result := getRequiredPlacement(cmd, objects, placementPolicy, netmap)
getActualPlacement(cmd, netmap, pk, objects, result)
getActualPlacement(cmd, netmap, pk, objects, count, result)
printPlacement(cmd, objID, objects, result)
}
func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) []phyObject {
func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) ([]phyObject, int) {
var addrObj oid.Address
addrObj.SetContainer(cnrID)
addrObj.SetObject(objID)
@ -145,7 +152,7 @@ func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.C
parent: res.Header().ECHeader().Parent(),
}
}
return []phyObject{obj}
return []phyObject{obj}, 1
}
var errSplitInfo *objectSDK.SplitInfoError
@ -155,29 +162,34 @@ func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.C
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
return getECObjectChunks(cmd, cnrID, objID, ecInfoError)
return getECObjectChunks(cmd, cnrID, objID, ecInfoError), 1
}
commonCmd.ExitOnErr(cmd, "failed to get object info: %w", err)
return nil
return nil, 0
}
func getComplexObjectParts(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []phyObject {
members := getCompexObjectMembers(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
return flattenComplexMembersIfECContainer(cmd, cnrID, members, prmHead)
func getComplexObjectParts(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) ([]phyObject, int) {
members, total := getCompexObjectMembers(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
return flattenComplexMembersIfECContainer(cmd, cnrID, members, prmHead), total
}
func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []oid.ID {
func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) ([]oid.ID, int) {
var total int
splitInfo := errSplitInfo.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID); ok {
return members
if total = len(members); total > 0 {
total-- // linking object is not data object
}
return members, total
}
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnrID); ok {
return members
return members, len(members)
}
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
members := tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
return members, len(members)
}
func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, members []oid.ID, prmHead internalclient.HeadObjectPrm) []phyObject {
@ -383,8 +395,11 @@ func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placem
}
}
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, pk *ecdsa.PrivateKey, objects []phyObject, result *objectNodesResult) {
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, pk *ecdsa.PrivateKey, objects []phyObject, count int, result *objectNodesResult) {
resultMtx := &sync.Mutex{}
counter := &objectCounter{
total: uint32(count),
}
candidates := getNodesToCheckObjectExistance(cmd, netmap, result)
@ -401,7 +416,7 @@ func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, pk *ecdsa.
for _, object := range objects {
eg.Go(func() error {
stored, err := isObjectStoredOnNode(egCtx, cmd, object.containerID, object.objectID, cli, pk)
stored, err := isObjectStoredOnNode(egCtx, cmd, object.containerID, object.objectID, cli, pk, counter)
resultMtx.Lock()
defer resultMtx.Unlock()
if err == nil && stored {
@ -420,6 +435,7 @@ func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, pk *ecdsa.
}
commonCmd.ExitOnErr(cmd, "failed to get actual placement: %w", eg.Wait())
result.total = counter.total
}
func getNodesToCheckObjectExistance(cmd *cobra.Command, netmap *netmapSDK.NetMap, result *objectNodesResult) []netmapSDK.NodeInfo {
@ -478,7 +494,7 @@ func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.N
return cli, nil
}
func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) (bool, error) {
func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey, counter *objectCounter) (bool, error) {
var addrObj oid.Address
addrObj.SetContainer(cnrID)
addrObj.SetObject(objID)
@ -493,6 +509,14 @@ func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID,
res, err := internalclient.HeadObject(ctx, prmHead)
if err == nil && res != nil {
if res.Header().ECHeader() != nil {
counter.Lock()
defer counter.Unlock()
if !counter.isECcounted {
counter.total *= res.Header().ECHeader().Total()
}
counter.isECcounted = true
}
return true, nil
}
var notFound *apistatus.ObjectNotFound
@ -512,7 +536,8 @@ func printPlacement(cmd *cobra.Command, objID oid.ID, objects []phyObject, resul
}
func printObjectNodesAsText(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) {
fmt.Fprintf(cmd.OutOrStdout(), "Object %s stores payload in %d data objects:\n", objID.EncodeToString(), len(objects))
fmt.Fprintf(cmd.OutOrStdout(), "Object %s stores payload in %d data objects\n", objID.EncodeToString(), result.total)
fmt.Fprintf(cmd.OutOrStdout(), "Found %d:\n", len(objects))
for _, object := range objects {
fmt.Fprintf(cmd.OutOrStdout(), "- %s\n", object.objectID)

View file

@ -2,6 +2,7 @@ package object
import (
"fmt"
"os"
"strconv"
"strings"
@ -9,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -20,6 +22,7 @@ const (
replaceAttrsFlagName = "replace-attrs"
rangeFlagName = "range"
payloadFlagName = "payload"
splitHeaderFlagName = "split-header"
)
var objectPatchCmd = &cobra.Command{
@ -50,6 +53,7 @@ func initObjectPatchCmd() {
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
flags.String(splitHeaderFlagName, "", "Path to binary or JSON-encoded split header")
}
func patch(cmd *cobra.Command, _ []string) {
@ -84,6 +88,8 @@ func patch(cmd *cobra.Command, _ []string) {
prm.NewAttributes = newAttrs
prm.ReplaceAttribute = replaceAttrs
prm.NewSplitHeader = parseSplitHeaderBinaryOrJSON(cmd)
for i := range ranges {
prm.PayloadPatches = append(prm.PayloadPatches, internalclient.PayloadPatch{
Range: ranges[i],
@ -147,3 +153,22 @@ func patchPayloadPaths(cmd *cobra.Command) []string {
v, _ := cmd.Flags().GetStringSlice(payloadFlagName)
return v
}
func parseSplitHeaderBinaryOrJSON(cmd *cobra.Command) *objectSDK.SplitHeader {
path, _ := cmd.Flags().GetString(splitHeaderFlagName)
if path == "" {
return nil
}
data, err := os.ReadFile(path)
commonCmd.ExitOnErr(cmd, "read file error: %w", err)
splitHdrV2 := new(objectV2.SplitHeader)
err = splitHdrV2.Unmarshal(data)
if err != nil {
err = splitHdrV2.UnmarshalJSON(data)
commonCmd.ExitOnErr(cmd, "unmarshal error: %w", err)
}
return objectSDK.NewSplitHeaderFromV2(splitHdrV2)
}

View file

@ -154,7 +154,7 @@ func printECInfoErr(cmd *cobra.Command, err error) bool {
if ok {
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
toProto, _ := cmd.Flags().GetBool("proto")
if !(toJSON || toProto) {
if !toJSON && !toProto {
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
}
printECInfo(cmd, errECInfo.ECInfo())

View file

@ -2,17 +2,19 @@ package tree
import (
"context"
"crypto/tls"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
@ -31,6 +33,16 @@ func _client() (tree.TreeServiceClient, error) {
return nil, err
}
host, isTLS, err := client.ParseURI(netAddr.URIAddr())
if err != nil {
return nil, err
}
creds := insecure.NewCredentials()
if isTLS {
creds = credentials.NewTLS(&tls.Config{})
}
opts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
tracing.NewUnaryClientInterceptor(),
@ -40,13 +52,10 @@ func _client() (tree.TreeServiceClient, error) {
),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
grpc.WithDisableServiceConfig(),
grpc.WithTransportCredentials(creds),
}
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
cc, err := grpc.NewClient(host, opts...)
return tree.NewTreeServiceClient(cc), err
}

View file

@ -4,12 +4,14 @@ import (
"context"
"os"
"os/signal"
"strconv"
"syscall"
configViper "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/spf13/cast"
"github.com/spf13/viper"
"go.uber.org/zap"
)
@ -44,11 +46,30 @@ func reloadConfig() error {
if err != nil {
return err
}
log.Reload(logPrm)
err = logPrm.SetTags(loggerTags())
if err != nil {
return err
}
logger.UpdateLevelForTags(logPrm)
return nil
}
func loggerTags() [][]string {
var res [][]string
for i := 0; ; i++ {
var item []string
index := strconv.FormatInt(int64(i), 10)
names := cast.ToString(cfg.Get("logger.tags." + index + ".names"))
if names == "" {
break
}
item = append(item, names, cast.ToString(cfg.Get("logger.tags."+index+".level")))
res = append(res, item)
}
return res
}
func watchForSignal(ctx context.Context, cancel func()) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)

View file

@ -80,10 +80,14 @@ func main() {
exitErr(err)
logPrm.SamplingHook = metrics.LogMetrics().GetSamplingHook()
logPrm.PrependTimestamp = cfg.GetBool("logger.timestamp")
err = logPrm.SetTags(loggerTags())
exitErr(err)
log, err = logger.NewLogger(logPrm)
exitErr(err)
logger.UpdateLevelForTags(logPrm)
ctx, cancel := context.WithCancel(context.Background())
pprofCmp = newPprofComponent()

View file

@ -3,6 +3,8 @@ package common
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
)
type FilterResult byte
@ -71,11 +73,7 @@ func (fp FallbackParser) ToParser() Parser {
func (p Parser) ToFallbackParser() FallbackParser {
return func(key, value []byte) (SchemaEntry, Parser) {
entry, next, err := p(key, value)
if err != nil {
panic(fmt.Errorf(
"couldn't use that parser as a fallback parser, it returned an error: %w", err,
))
}
assert.NoError(err, "couldn't use that parser as a fallback parser")
return entry, next
}
}

View file

@ -53,17 +53,17 @@ func (f *InputFieldWithHistory) InputHandler() func(event *tcell.EventKey, setFo
f.historyPointer++
// Stop iterating over history.
if f.historyPointer == len(f.history) {
f.InputField.SetText(f.currentContent)
f.SetText(f.currentContent)
return
}
f.InputField.SetText(f.history[f.historyPointer])
f.SetText(f.history[f.historyPointer])
case tcell.KeyUp:
if len(f.history) == 0 {
return
}
// Start iterating over history.
if f.historyPointer == len(f.history) {
f.currentContent = f.InputField.GetText()
f.currentContent = f.GetText()
}
// End of history.
if f.historyPointer == 0 {
@ -71,7 +71,7 @@ func (f *InputFieldWithHistory) InputHandler() func(event *tcell.EventKey, setFo
}
// Iterate to least recent prompts.
f.historyPointer--
f.InputField.SetText(f.history[f.historyPointer])
f.SetText(f.history[f.historyPointer])
default:
f.InputField.InputHandler()(event, func(tview.Primitive) {})
}

View file

@ -8,6 +8,7 @@ import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal/schema/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
)
@ -94,9 +95,7 @@ func (v *RecordsView) Mount(ctx context.Context) error {
}
func (v *RecordsView) Unmount() {
if v.onUnmount == nil {
panic("try to unmount not mounted component")
}
assert.False(v.onUnmount == nil, "try to unmount not mounted component")
v.onUnmount()
v.onUnmount = nil
}

View file

@ -482,7 +482,7 @@ func (ui *UI) handleInputOnSearching(event *tcell.EventKey) {
ui.searchBar.InputHandler()(event, func(tview.Primitive) {})
}
ui.Box.MouseHandler()
ui.MouseHandler()
}
func (ui *UI) WithPrompt(prompt string) error {

View file

@ -14,7 +14,7 @@ import (
func initAPEManagerService(c *cfg) {
contractStorage := ape_contract.NewProxyVerificationContractStorage(
morph.NewSwitchRPCGuardedActor(c.cfgMorph.client),
c.shared.key,
c.key,
c.cfgMorph.proxyScriptHash,
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)

View file

@ -1,20 +1,27 @@
package main
import (
"bytes"
"cmp"
"context"
"slices"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.uber.org/zap"
)
type netValueReader[K any, V any] func(ctx context.Context, cid K) (V, error)
@ -110,55 +117,6 @@ func (c *ttlNetCache[K, V]) remove(key K) {
hit = c.cache.Remove(key)
}
// entity that provides LRU cache interface.
type lruNetCache struct {
cache *lru.Cache[uint64, *netmapSDK.NetMap]
netRdr netValueReader[uint64, *netmapSDK.NetMap]
metrics cacheMetrics
}
// newNetworkLRUCache returns wrapper over netValueReader with LRU cache.
func newNetworkLRUCache(sz int, netRdr netValueReader[uint64, *netmapSDK.NetMap], metrics cacheMetrics) *lruNetCache {
cache, err := lru.New[uint64, *netmapSDK.NetMap](sz)
fatalOnErr(err)
return &lruNetCache{
cache: cache,
netRdr: netRdr,
metrics: metrics,
}
}
// reads value by the key.
//
// updates the value from the network on cache miss.
//
// returned value should not be modified.
func (c *lruNetCache) get(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
hit := false
startedAt := time.Now()
defer func() {
c.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
}()
val, ok := c.cache.Get(key)
if ok {
hit = true
return val, nil
}
val, err := c.netRdr(ctx, key)
if err != nil {
return nil, err
}
c.cache.Add(key, val)
return val, nil
}
// wrapper over TTL cache of values read from the network
// that implements container storage.
type ttlContainerStorage struct {
@ -200,20 +158,236 @@ func (s ttlContainerStorage) DeletionInfo(ctx context.Context, cnr cid.ID) (*con
type lruNetmapSource struct {
netState netmap.State
cache *lruNetCache
client rawSource
cache *simplelru.LRU[uint64, *atomic.Pointer[netmapSDK.NetMap]]
mtx sync.RWMutex
metrics cacheMetrics
log *logger.Logger
candidates atomic.Pointer[[]netmapSDK.NodeInfo]
}
func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
type rawSource interface {
GetCandidates(ctx context.Context) ([]netmapSDK.NodeInfo, error)
GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error)
}
func newCachedNetmapStorage(ctx context.Context, log *logger.Logger,
netState netmap.State, client rawSource, wg *sync.WaitGroup, d time.Duration,
) netmap.Source {
const netmapCacheSize = 10
lruNetmapCache := newNetworkLRUCache(netmapCacheSize, func(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
return v.GetNetMapByEpoch(ctx, key)
}, metrics.NewCacheMetrics("netmap"))
cache, err := simplelru.NewLRU[uint64, *atomic.Pointer[netmapSDK.NetMap]](netmapCacheSize, nil)
fatalOnErr(err)
return &lruNetmapSource{
netState: s,
cache: lruNetmapCache,
src := &lruNetmapSource{
netState: netState,
client: client,
cache: cache,
log: log,
metrics: metrics.NewCacheMetrics("netmap"),
}
wg.Add(1)
go func() {
defer wg.Done()
src.updateCandidates(ctx, d)
}()
return src
}
// updateCandidates routine to merge netmap in cache with candidates list.
func (s *lruNetmapSource) updateCandidates(ctx context.Context, d time.Duration) {
timer := time.NewTimer(d)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
newCandidates, err := s.client.GetCandidates(ctx)
if err != nil {
s.log.Debug(ctx, logs.FailedToUpdateNetmapCandidates, zap.Error(err))
timer.Reset(d)
break
}
if len(newCandidates) == 0 {
s.candidates.Store(&newCandidates)
timer.Reset(d)
break
}
slices.SortFunc(newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int {
return cmp.Compare(n1.Hash(), n2.Hash())
})
// Check once state changed
v := s.candidates.Load()
if v == nil {
s.candidates.Store(&newCandidates)
s.mergeCacheWithCandidates(newCandidates)
timer.Reset(d)
break
}
ret := slices.CompareFunc(*v, newCandidates, func(n1 netmapSDK.NodeInfo, n2 netmapSDK.NodeInfo) int {
if !bytes.Equal(n1.PublicKey(), n2.PublicKey()) ||
uint32(n1.Status()) != uint32(n2.Status()) ||
slices.Compare(n1.ExternalAddresses(), n2.ExternalAddresses()) != 0 {
return 1
}
var ne1 []string
n1.IterateNetworkEndpoints(func(s string) bool {
ne1 = append(ne1, s)
return false
})
var ne2 []string
n2.IterateNetworkEndpoints(func(s string) bool {
ne2 = append(ne2, s)
return false
})
return slices.Compare(ne1, ne2)
})
if ret != 0 {
s.candidates.Store(&newCandidates)
s.mergeCacheWithCandidates(newCandidates)
}
timer.Reset(d)
}
}
}
func (s *lruNetmapSource) mergeCacheWithCandidates(candidates []netmapSDK.NodeInfo) {
s.mtx.Lock()
tmp := s.cache.Values()
s.mtx.Unlock()
for _, pointer := range tmp {
nm := pointer.Load()
updates := getNetMapNodesToUpdate(nm, candidates)
if len(updates) > 0 {
nm = nm.Clone()
mergeNetmapWithCandidates(updates, nm)
pointer.Store(nm)
}
}
}
// reads value by the key.
//
// updates the value from the network on cache miss.
//
// returned value should not be modified.
func (s *lruNetmapSource) get(ctx context.Context, key uint64) (*netmapSDK.NetMap, error) {
hit := false
startedAt := time.Now()
defer func() {
s.metrics.AddMethodDuration("Get", time.Since(startedAt), hit)
}()
s.mtx.RLock()
val, ok := s.cache.Get(key)
s.mtx.RUnlock()
if ok {
hit = true
return val.Load(), nil
}
s.mtx.Lock()
defer s.mtx.Unlock()
val, ok = s.cache.Get(key)
if ok {
hit = true
return val.Load(), nil
}
nm, err := s.client.GetNetMapByEpoch(ctx, key)
if err != nil {
return nil, err
}
v := s.candidates.Load()
if v != nil {
updates := getNetMapNodesToUpdate(nm, *v)
if len(updates) > 0 {
mergeNetmapWithCandidates(updates, nm)
}
}
p := atomic.Pointer[netmapSDK.NetMap]{}
p.Store(nm)
s.cache.Add(key, &p)
return nm, nil
}
// mergeNetmapWithCandidates updates nodes state in the provided netmap with state in the list of candidates.
func mergeNetmapWithCandidates(updates []nodeToUpdate, nm *netmapSDK.NetMap) {
for _, v := range updates {
if v.status != netmapSDK.UnspecifiedState {
nm.Nodes()[v.netmapIndex].SetStatus(v.status)
}
if v.externalAddresses != nil {
nm.Nodes()[v.netmapIndex].SetExternalAddresses(v.externalAddresses...)
}
if v.endpoints != nil {
nm.Nodes()[v.netmapIndex].SetNetworkEndpoints(v.endpoints...)
}
}
}
type nodeToUpdate struct {
netmapIndex int
status netmapSDK.NodeState
externalAddresses []string
endpoints []string
}
// getNetMapNodesToUpdate checks for the changes between provided netmap and the list of candidates.
func getNetMapNodesToUpdate(nm *netmapSDK.NetMap, candidates []netmapSDK.NodeInfo) []nodeToUpdate {
var res []nodeToUpdate
for i := range nm.Nodes() {
for _, cnd := range candidates {
if bytes.Equal(nm.Nodes()[i].PublicKey(), cnd.PublicKey()) {
var tmp nodeToUpdate
var update bool
if cnd.Status() != nm.Nodes()[i].Status() &&
(cnd.Status() == netmapSDK.Online || cnd.Status() == netmapSDK.Maintenance) {
update = true
tmp.status = cnd.Status()
}
externalAddresses := cnd.ExternalAddresses()
if externalAddresses != nil &&
slices.Compare(externalAddresses, nm.Nodes()[i].ExternalAddresses()) != 0 {
update = true
tmp.externalAddresses = externalAddresses
}
nodeEndpoints := make([]string, 0, nm.Nodes()[i].NumberOfNetworkEndpoints())
nm.Nodes()[i].IterateNetworkEndpoints(func(s string) bool {
nodeEndpoints = append(nodeEndpoints, s)
return false
})
candidateEndpoints := make([]string, 0, cnd.NumberOfNetworkEndpoints())
cnd.IterateNetworkEndpoints(func(s string) bool {
candidateEndpoints = append(candidateEndpoints, s)
return false
})
if slices.Compare(nodeEndpoints, candidateEndpoints) != 0 {
update = true
tmp.endpoints = candidateEndpoints
}
if update {
tmp.netmapIndex = i
res = append(res, tmp)
}
break
}
}
}
return res
}
func (s *lruNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmapSDK.NetMap, error) {
@ -225,7 +399,7 @@ func (s *lruNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*
}
func (s *lruNetmapSource) getNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
val, err := s.cache.get(ctx, epoch)
val, err := s.get(ctx, epoch)
if err != nil {
return nil, err
}

View file

@ -3,9 +3,11 @@ package main
import (
"context"
"errors"
"sync"
"testing"
"time"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/stretchr/testify/require"
)
@ -59,3 +61,75 @@ func testNetValueReader(_ context.Context, key string) (time.Time, error) {
type noopCacheMetricts struct{}
func (m *noopCacheMetricts) AddMethodDuration(method string, d time.Duration, hit bool) {}
type rawSrc struct{}
func (r *rawSrc) GetCandidates(_ context.Context) ([]netmapSDK.NodeInfo, error) {
node0 := netmapSDK.NodeInfo{}
node0.SetPublicKey([]byte{byte(1)})
node0.SetStatus(netmapSDK.Online)
node0.SetExternalAddresses("1", "0")
node0.SetNetworkEndpoints("1", "0")
node1 := netmapSDK.NodeInfo{}
node1.SetPublicKey([]byte{byte(1)})
node1.SetStatus(netmapSDK.Online)
node1.SetExternalAddresses("1", "0")
node1.SetNetworkEndpoints("1", "0")
return []netmapSDK.NodeInfo{node0, node1}, nil
}
func (r *rawSrc) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
nm := netmapSDK.NetMap{}
nm.SetEpoch(1)
node0 := netmapSDK.NodeInfo{}
node0.SetPublicKey([]byte{byte(1)})
node0.SetStatus(netmapSDK.Maintenance)
node0.SetExternalAddresses("0")
node0.SetNetworkEndpoints("0")
node1 := netmapSDK.NodeInfo{}
node1.SetPublicKey([]byte{byte(1)})
node1.SetStatus(netmapSDK.Maintenance)
node1.SetExternalAddresses("0")
node1.SetNetworkEndpoints("0")
nm.SetNodes([]netmapSDK.NodeInfo{node0, node1})
return &nm, nil
}
type st struct{}
func (s *st) CurrentEpoch() uint64 {
return 1
}
func TestNetmapStorage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
cache := newCachedNetmapStorage(ctx, nil, &st{}, &rawSrc{}, &wg, time.Millisecond*50)
nm, err := cache.GetNetMapByEpoch(ctx, 1)
require.NoError(t, err)
require.True(t, nm.Nodes()[0].Status() == netmapSDK.Maintenance)
require.True(t, len(nm.Nodes()[0].ExternalAddresses()) == 1)
require.True(t, nm.Nodes()[0].NumberOfNetworkEndpoints() == 1)
require.Eventually(t, func() bool {
nm, err := cache.GetNetMapByEpoch(ctx, 1)
require.NoError(t, err)
for _, node := range nm.Nodes() {
if !(node.Status() == netmapSDK.Online && len(node.ExternalAddresses()) == 2 &&
node.NumberOfNetworkEndpoints() == 2) {
return false
}
}
return true
}, time.Second*5, time.Millisecond*10)
cancel()
wg.Wait()
}

View file

@ -108,6 +108,8 @@ type applicationConfiguration struct {
level string
destination string
timestamp bool
options []zap.Option
tags [][]string
}
ObjectCfg struct {
@ -232,6 +234,15 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.LoggerCfg.level = loggerconfig.Level(c)
a.LoggerCfg.destination = loggerconfig.Destination(c)
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
var opts []zap.Option
if loggerconfig.ToLokiConfig(c).Enabled {
opts = []zap.Option{zap.WrapCore(func(core zapcore.Core) zapcore.Core {
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(c))
return lokiCore
})}
}
a.LoggerCfg.options = opts
a.LoggerCfg.tags = loggerconfig.Tags(c)
// Object
@ -374,14 +385,11 @@ func (a *applicationConfiguration) setGCConfig(target *shardCfg, source *shardco
}
func (a *applicationConfiguration) setLimiter(target *shardCfg, source *shardconfig.Config) error {
limitsConfig := source.Limits()
limitsConfig := source.Limits().ToConfig()
limiter, err := qos.NewLimiter(limitsConfig)
if err != nil {
return err
}
if target.limiter != nil {
target.limiter.Close()
}
target.limiter = limiter
return nil
}
@ -718,12 +726,7 @@ func initCfg(appCfg *config.Config) *cfg {
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
log, err := logger.NewLogger(logPrm)
fatalOnErr(err)
if loggerconfig.ToLokiConfig(appCfg).Enabled {
log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
return lokiCore
}))
}
logger.UpdateLevelForTags(logPrm)
c.internals = initInternals(appCfg, log)
@ -1090,6 +1093,12 @@ func (c *cfg) loggerPrm() (logger.Prm, error) {
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
}
prm.PrependTimestamp = c.LoggerCfg.timestamp
prm.Options = c.LoggerCfg.options
err = prm.SetTags(c.LoggerCfg.tags)
if err != nil {
// not expected since validation should be performed before
return logger.Prm{}, errors.New("incorrect allowed tags format: " + c.LoggerCfg.destination)
}
return prm, nil
}
@ -1377,7 +1386,7 @@ func (c *cfg) getComponents(ctx context.Context) []dCmp {
if err != nil {
return err
}
c.log.Reload(prm)
logger.UpdateLevelForTags(prm)
return nil
}})
components = append(components, dCmp{"runtime", func() error {

View file

@ -11,10 +11,10 @@ import (
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/stretchr/testify/require"
)
@ -135,8 +135,8 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, mode.ReadOnly, sc.Mode())
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
readLimits := limits.Read()
writeLimits := limits.Write()
readLimits := limits.ToConfig().Read
writeLimits := limits.ToConfig().Write
require.Equal(t, 30*time.Second, readLimits.IdleTimeout)
require.Equal(t, int64(10_000), readLimits.MaxRunningOps)
require.Equal(t, int64(1_000), readLimits.MaxWaitingOps)
@ -144,7 +144,7 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, int64(1_000), writeLimits.MaxRunningOps)
require.Equal(t, int64(100), writeLimits.MaxWaitingOps)
require.ElementsMatch(t, readLimits.Tags,
[]limitsconfig.IOTagConfig{
[]qos.IOTagConfig{
{
Tag: "internal",
Weight: toPtr(20),
@ -168,13 +168,19 @@ func TestEngineSection(t *testing.T) {
LimitOps: toPtr(25000),
},
{
Tag: "policer",
Tag: "policer",
Weight: toPtr(5),
LimitOps: toPtr(25000),
Prohibited: true,
},
{
Tag: "treesync",
Weight: toPtr(5),
LimitOps: toPtr(25000),
LimitOps: toPtr(25),
},
})
require.ElementsMatch(t, writeLimits.Tags,
[]limitsconfig.IOTagConfig{
[]qos.IOTagConfig{
{
Tag: "internal",
Weight: toPtr(200),
@ -202,6 +208,11 @@ func TestEngineSection(t *testing.T) {
Weight: toPtr(50),
LimitOps: toPtr(2500),
},
{
Tag: "treesync",
Weight: toPtr(50),
LimitOps: toPtr(100),
},
})
case 1:
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
@ -258,14 +269,14 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, mode.ReadWrite, sc.Mode())
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
readLimits := limits.Read()
writeLimits := limits.Write()
require.Equal(t, limitsconfig.DefaultIdleTimeout, readLimits.IdleTimeout)
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxRunningOps)
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxWaitingOps)
require.Equal(t, limitsconfig.DefaultIdleTimeout, writeLimits.IdleTimeout)
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxRunningOps)
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxWaitingOps)
readLimits := limits.ToConfig().Read
writeLimits := limits.ToConfig().Write
require.Equal(t, qos.DefaultIdleTimeout, readLimits.IdleTimeout)
require.Equal(t, qos.NoLimit, readLimits.MaxRunningOps)
require.Equal(t, qos.NoLimit, readLimits.MaxWaitingOps)
require.Equal(t, qos.DefaultIdleTimeout, writeLimits.IdleTimeout)
require.Equal(t, qos.NoLimit, writeLimits.MaxRunningOps)
require.Equal(t, qos.NoLimit, writeLimits.MaxWaitingOps)
require.Equal(t, 0, len(readLimits.Tags))
require.Equal(t, 0, len(writeLimits.Tags))
}

View file

@ -1,19 +1,13 @@
package limits
import (
"math"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"github.com/spf13/cast"
)
const (
NoLimit int64 = math.MaxInt64
DefaultIdleTimeout = 5 * time.Minute
)
// From wraps config section into Config.
func From(c *config.Config) *Config {
return (*Config)(c)
@ -23,36 +17,43 @@ func From(c *config.Config) *Config {
// which provides access to Shard's limits configurations.
type Config config.Config
// Read returns the value of "read" limits config section.
func (x *Config) Read() OpConfig {
func (x *Config) ToConfig() qos.LimiterConfig {
result := qos.LimiterConfig{
Read: x.read(),
Write: x.write(),
}
panicOnErr(result.Validate())
return result
}
func (x *Config) read() qos.OpConfig {
return x.parse("read")
}
// Write returns the value of "write" limits config section.
func (x *Config) Write() OpConfig {
func (x *Config) write() qos.OpConfig {
return x.parse("write")
}
func (x *Config) parse(sub string) OpConfig {
func (x *Config) parse(sub string) qos.OpConfig {
c := (*config.Config)(x).Sub(sub)
var result OpConfig
var result qos.OpConfig
if s := config.Int(c, "max_waiting_ops"); s > 0 {
result.MaxWaitingOps = s
} else {
result.MaxWaitingOps = NoLimit
result.MaxWaitingOps = qos.NoLimit
}
if s := config.Int(c, "max_running_ops"); s > 0 {
result.MaxRunningOps = s
} else {
result.MaxRunningOps = NoLimit
result.MaxRunningOps = qos.NoLimit
}
if s := config.DurationSafe(c, "idle_timeout"); s > 0 {
result.IdleTimeout = s
} else {
result.IdleTimeout = DefaultIdleTimeout
result.IdleTimeout = qos.DefaultIdleTimeout
}
result.Tags = tags(c)
@ -60,42 +61,16 @@ func (x *Config) parse(sub string) OpConfig {
return result
}
type OpConfig struct {
// MaxWaitingOps returns the value of "max_waiting_ops" config parameter.
//
// Equals NoLimit if the value is not a positive number.
MaxWaitingOps int64
// MaxRunningOps returns the value of "max_running_ops" config parameter.
//
// Equals NoLimit if the value is not a positive number.
MaxRunningOps int64
// IdleTimeout returns the value of "idle_timeout" config parameter.
//
// Equals DefaultIdleTimeout if the value is not a valid duration.
IdleTimeout time.Duration
// Tags returns the value of "tags" config parameter.
//
// Equals nil if the value is not a valid tags config slice.
Tags []IOTagConfig
}
type IOTagConfig struct {
Tag string
Weight *float64
LimitOps *float64
ReservedOps *float64
}
func tags(c *config.Config) []IOTagConfig {
func tags(c *config.Config) []qos.IOTagConfig {
c = c.Sub("tags")
var result []IOTagConfig
var result []qos.IOTagConfig
for i := 0; ; i++ {
tag := config.String(c, strconv.Itoa(i)+".tag")
if tag == "" {
return result
}
var tagConfig IOTagConfig
var tagConfig qos.IOTagConfig
tagConfig.Tag = tag
v := c.Value(strconv.Itoa(i) + ".weight")
@ -119,6 +94,13 @@ func tags(c *config.Config) []IOTagConfig {
tagConfig.ReservedOps = &r
}
v = c.Value(strconv.Itoa(i) + ".prohibited")
if v != nil {
r, err := cast.ToBoolE(v)
panicOnErr(err)
tagConfig.Prohibited = r
}
result = append(result, tagConfig)
}
}

View file

@ -2,6 +2,7 @@ package loggerconfig
import (
"os"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
@ -60,6 +61,21 @@ func Timestamp(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "timestamp")
}
// Tags returns the value of "tags" config parameter from "logger" section.
func Tags(c *config.Config) [][]string {
var res [][]string
sub := c.Sub(subsection).Sub("tags")
for i := 0; ; i++ {
s := sub.Sub(strconv.FormatInt(int64(i), 10))
names := config.StringSafe(s, "names")
if names == "" {
break
}
res = append(res, []string{names, config.StringSafe(s, "level")})
}
return res
}
// ToLokiConfig extracts loki config.
func ToLokiConfig(c *config.Config) loki.Config {
hostname, _ := os.Hostname()

View file

@ -33,6 +33,9 @@ const (
// ContainerCacheSizeDefault represents the default size for the container cache.
ContainerCacheSizeDefault = 100
// PollCandidatesTimeoutDefault is a default poll timeout for netmap candidates.
PollCandidatesTimeoutDefault = 20 * time.Second
)
var errNoMorphEndpoints = errors.New("no morph chain RPC endpoints, see `morph.rpc_endpoint` section")
@ -154,3 +157,17 @@ func FrostfsIDCacheSize(c *config.Config) uint32 {
}
return config.Uint32Safe(c.Sub(subsection), "frostfsid_cache_size")
}
// NetmapCandidatesPollInterval returns the value of "netmap.candidates.poll_interval" config parameter
// from "morph" section.
//
// Returns PollCandidatesTimeoutDefault if the value is not positive duration.
func NetmapCandidatesPollInterval(c *config.Config) time.Duration {
v := config.DurationSafe(c.Sub(subsection).
Sub("netmap").Sub("candidates"), "poll_interval")
if v > 0 {
return v
}
return PollCandidatesTimeoutDefault
}

View file

@ -32,7 +32,7 @@ func initContainerService(_ context.Context, c *cfg) {
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
fatalOnErr(err)
c.shared.cnrClient = wrap
c.cnrClient = wrap
cnrSrc := cntClient.AsContainerSource(wrap)
@ -47,7 +47,7 @@ func initContainerService(_ context.Context, c *cfg) {
frostfsIDSubjectProvider = newMorphFrostfsIDCache(frostfsIDSubjectProvider, int(cacheSize), c.cfgMorph.cacheTTL, metrics.NewCacheMetrics("frostfs_id"))
}
c.shared.frostfsidClient = frostfsIDSubjectProvider
c.frostfsidClient = frostfsIDSubjectProvider
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
@ -57,7 +57,7 @@ func initContainerService(_ context.Context, c *cfg) {
service := containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.frostfsidClient,
containerService.NewSplitterService(
c.cfgContainer.containerBatchSize, c.respSvc,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),

View file

@ -8,38 +8,38 @@ import (
func metricsComponent(c *cfg) (*httpComponent, bool) {
var updated bool
// check if it has been inited before
if c.dynamicConfiguration.metrics == nil {
c.dynamicConfiguration.metrics = new(httpComponent)
c.dynamicConfiguration.metrics.cfg = c
c.dynamicConfiguration.metrics.name = "metrics"
c.dynamicConfiguration.metrics.handler = metrics.Handler()
if c.metrics == nil {
c.metrics = new(httpComponent)
c.metrics.cfg = c
c.metrics.name = "metrics"
c.metrics.handler = metrics.Handler()
updated = true
}
// (re)init read configuration
enabled := metricsconfig.Enabled(c.appCfg)
if enabled != c.dynamicConfiguration.metrics.enabled {
c.dynamicConfiguration.metrics.enabled = enabled
if enabled != c.metrics.enabled {
c.metrics.enabled = enabled
updated = true
}
address := metricsconfig.Address(c.appCfg)
if address != c.dynamicConfiguration.metrics.address {
c.dynamicConfiguration.metrics.address = address
if address != c.metrics.address {
c.metrics.address = address
updated = true
}
dur := metricsconfig.ShutdownTimeout(c.appCfg)
if dur != c.dynamicConfiguration.metrics.shutdownDur {
c.dynamicConfiguration.metrics.shutdownDur = dur
if dur != c.metrics.shutdownDur {
c.metrics.shutdownDur = dur
updated = true
}
return c.dynamicConfiguration.metrics, updated
return c.metrics, updated
}
func enableMetricsSvc(c *cfg) {
c.shared.metricsSvc.Enable()
c.metricsSvc.Enable()
}
func disableMetricsSvc(c *cfg) {
c.shared.metricsSvc.Disable()
c.metricsSvc.Disable()
}

View file

@ -60,10 +60,11 @@ func (c *cfg) initMorphComponents(ctx context.Context) {
}
if c.cfgMorph.cacheTTL < 0 {
netmapSource = wrap
netmapSource = newRawNetmapStorage(wrap)
} else {
// use RPC node as source of netmap (with caching)
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
netmapSource = newCachedNetmapStorage(ctx, c.log, c.cfgNetmap.state, wrap, &c.wg,
morphconfig.NetmapCandidatesPollInterval(c.appCfg))
}
c.netMapSource = netmapSource

View file

@ -0,0 +1,55 @@
package main
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
netmapClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
type rawNetmapSource struct {
client *netmapClient.Client
}
func newRawNetmapStorage(client *netmapClient.Client) netmap.Source {
return &rawNetmapSource{
client: client,
}
}
func (s *rawNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmapSDK.NetMap, error) {
nm, err := s.client.GetNetMap(ctx, diff)
if err != nil {
return nil, err
}
candidates, err := s.client.GetCandidates(ctx)
if err != nil {
return nil, err
}
updates := getNetMapNodesToUpdate(nm, candidates)
if len(updates) > 0 {
mergeNetmapWithCandidates(updates, nm)
}
return nm, nil
}
func (s *rawNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmapSDK.NetMap, error) {
nm, err := s.client.GetNetMapByEpoch(ctx, epoch)
if err != nil {
return nil, err
}
candidates, err := s.client.GetCandidates(ctx)
if err != nil {
return nil, err
}
updates := getNetMapNodesToUpdate(nm, candidates)
if len(updates) > 0 {
mergeNetmapWithCandidates(updates, nm)
}
return nm, nil
}
func (s *rawNetmapSource) Epoch(ctx context.Context) (uint64, error) {
return s.client.Epoch(ctx)
}

View file

@ -186,9 +186,9 @@ func initObjectService(c *cfg) {
respSvc,
)
c.shared.metricsSvc = objectService.NewMetricCollector(
c.metricsSvc = objectService.NewMetricCollector(
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
qosService := objectService.NewQoSObjectService(c.shared.metricsSvc, &c.cfgQoSService)
qosService := objectService.NewQoSObjectService(c.metricsSvc, &c.cfgQoSService)
auditSvc := objectService.NewAuditService(qosService, c.log, c.audit)
server := objectTransportGRPC.New(auditSvc)
@ -432,7 +432,7 @@ func createAPEService(c *cfg, irFetcher *cachedIRFetcher, splitSvc *objectServic
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage(),
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
objectAPE.NewStorageEngineHeaderProvider(c.cfgObject.cfgLocalStorage.localStorage, c.cfgObject.getSvc),
c.shared.frostfsidClient,
c.frostfsidClient,
c.netMapSource,
c.cfgNetmap.state,
c.cfgObject.cnrSource,

View file

@ -18,33 +18,33 @@ func initProfilerService(ctx context.Context, c *cfg) {
func pprofComponent(c *cfg) (*httpComponent, bool) {
var updated bool
// check if it has been inited before
if c.dynamicConfiguration.pprof == nil {
c.dynamicConfiguration.pprof = new(httpComponent)
c.dynamicConfiguration.pprof.cfg = c
c.dynamicConfiguration.pprof.name = "pprof"
c.dynamicConfiguration.pprof.handler = httputil.Handler()
c.dynamicConfiguration.pprof.preReload = tuneProfilers
if c.pprof == nil {
c.pprof = new(httpComponent)
c.pprof.cfg = c
c.pprof.name = "pprof"
c.pprof.handler = httputil.Handler()
c.pprof.preReload = tuneProfilers
updated = true
}
// (re)init read configuration
enabled := profilerconfig.Enabled(c.appCfg)
if enabled != c.dynamicConfiguration.pprof.enabled {
c.dynamicConfiguration.pprof.enabled = enabled
if enabled != c.pprof.enabled {
c.pprof.enabled = enabled
updated = true
}
address := profilerconfig.Address(c.appCfg)
if address != c.dynamicConfiguration.pprof.address {
c.dynamicConfiguration.pprof.address = address
if address != c.pprof.address {
c.pprof.address = address
updated = true
}
dur := profilerconfig.ShutdownTimeout(c.appCfg)
if dur != c.dynamicConfiguration.pprof.shutdownDur {
c.dynamicConfiguration.pprof.shutdownDur = dur
if dur != c.pprof.shutdownDur {
c.pprof.shutdownDur = dur
updated = true
}
return c.dynamicConfiguration.pprof, updated
return c.pprof, updated
}
func tuneProfilers(c *cfg) {

View file

@ -43,6 +43,9 @@ func initQoSService(c *cfg) {
func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context {
rawTag, defined := qosTagging.IOTagFromContext(ctx)
if !defined {
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
return qosTagging.ContextWithIOTag(ctx, qos.IOTagInternal.String())
}
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
ioTag, err := qos.FromRawString(rawTag)
@ -73,20 +76,8 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
case qos.IOTagInternal:
for _, pk := range s.allowedInternalPubs {
if bytes.Equal(pk, requestSignPublicKey) {
return ctx
}
}
nm, err := s.netmapSource.GetNetMap(ctx, 0)
if err != nil {
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
for _, node := range nm.Nodes() {
if bytes.Equal(node.PublicKey(), requestSignPublicKey) {
return ctx
}
if s.isInternalIOTagPublicKey(ctx, requestSignPublicKey) {
return ctx
}
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
@ -95,3 +86,23 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
}
func (s *cfgQoSService) isInternalIOTagPublicKey(ctx context.Context, publicKey []byte) bool {
for _, pk := range s.allowedInternalPubs {
if bytes.Equal(pk, publicKey) {
return true
}
}
nm, err := s.netmapSource.GetNetMap(ctx, 0)
if err != nil {
s.logger.Debug(ctx, logs.FailedToGetNetmapToAdjustIOTag, zap.Error(err))
return false
}
for _, node := range nm.Nodes() {
if bytes.Equal(node.PublicKey(), publicKey) {
return true
}
}
return false
}

View file

@ -0,0 +1,226 @@
package main
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
func TestQoSService_Client(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
t.Run("IO tag client defined", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagClient.String())
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("no IO tag defined, signed with unknown key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("no IO tag defined, signed with allowed critical key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with unknown key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with allowed internal key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with allowed critical key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag internal defined, signed with unknown key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag internal defined, signed with allowed critical key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag critical defined, signed with unknown key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.Request)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag critical defined, signed with allowed internal key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
}
func TestQoSService_Internal(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
t.Run("IO tag internal defined, signed with allowed internal key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
t.Run("no IO tag defined, signed with allowed internal key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.Internal)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagInternal.String(), tag)
})
}
func TestQoSService_Critical(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagCritical.String(), tag)
})
t.Run("IO tag critical defined, signed with allowed critical key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.Critical)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagCritical.String(), tag)
})
}
func TestQoSService_NetmapGetError(t *testing.T) {
t.Parallel()
s, pk := testQoSServicePrepare(t)
s.netmapSource = &utilTesting.TestNetmapSource{}
t.Run("IO tag internal defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagInternal.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("IO tag critical defined, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("no IO tag defined, signed with netmap key", func(t *testing.T) {
ctx := s.AdjustIncomingTag(context.Background(), pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
t.Run("unknown IO tag, signed with netmap key", func(t *testing.T) {
ctx := tagging.ContextWithIOTag(context.Background(), "some IO tag we don't know")
ctx = s.AdjustIncomingTag(ctx, pk.NetmapNode)
tag, ok := tagging.IOTagFromContext(ctx)
require.True(t, ok)
require.Equal(t, qos.IOTagClient.String(), tag)
})
}
func testQoSServicePrepare(t *testing.T) (*cfgQoSService, *testQoSServicePublicKeys) {
nmSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
reqSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
allowedCritSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
allowedIntSigner, err := keys.NewPrivateKey()
require.NoError(t, err)
var node netmap.NodeInfo
node.SetPublicKey(nmSigner.PublicKey().Bytes())
nm := &netmap.NetMap{}
nm.SetEpoch(100)
nm.SetNodes([]netmap.NodeInfo{node})
return &cfgQoSService{
logger: test.NewLogger(t),
netmapSource: &utilTesting.TestNetmapSource{
Netmaps: map[uint64]*netmap.NetMap{
100: nm,
},
CurrentEpoch: 100,
},
allowedCriticalPubs: [][]byte{
allowedCritSigner.PublicKey().Bytes(),
},
allowedInternalPubs: [][]byte{
allowedIntSigner.PublicKey().Bytes(),
},
},
&testQoSServicePublicKeys{
NetmapNode: nmSigner.PublicKey().Bytes(),
Request: reqSigner.PublicKey().Bytes(),
Internal: allowedIntSigner.PublicKey().Bytes(),
Critical: allowedCritSigner.PublicKey().Bytes(),
}
}
type testQoSServicePublicKeys struct {
NetmapNode []byte
Request []byte
Internal []byte
Critical []byte
}

View file

@ -51,9 +51,9 @@ func initTreeService(c *cfg) {
c.treeService = tree.New(
tree.WithContainerSource(cnrSource{
src: c.cfgObject.cnrSource,
cli: c.shared.cnrClient,
cli: c.cnrClient,
}),
tree.WithFrostfsidSubjectProvider(c.shared.frostfsidClient),
tree.WithFrostfsidSubjectProvider(c.frostfsidClient),
tree.WithNetmapSource(c.netMapSource),
tree.WithPrivateKey(&c.key.PrivateKey),
tree.WithLogger(c.log),

View file

@ -30,6 +30,11 @@ func validateConfig(c *config.Config) error {
return fmt.Errorf("invalid logger destination: %w", err)
}
err = loggerPrm.SetTags(loggerconfig.Tags(c))
if err != nil {
return fmt.Errorf("invalid list of allowed tags: %w", err)
}
// shard configuration validation
shardNum := 0

View file

@ -1,5 +1,7 @@
FROSTFS_IR_LOGGER_LEVEL=info
FROSTFS_IR_LOGGER_TIMESTAMP=true
FROSTFS_IR_LOGGER_TAGS_0_NAMES="main, morph"
FROSTFS_IR_LOGGER_TAGS_0_LEVEL="debug"
FROSTFS_IR_WALLET_PATH=/path/to/wallet.json
FROSTFS_IR_WALLET_ADDRESS=NUHtW3eM6a4mmFCgyyr4rj4wygsTKB88XX

View file

@ -3,6 +3,9 @@
logger:
level: info # Logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
timestamp: true
tags:
- names: "main, morph" # Possible values: `main`, `morph`, `grpc_svc`, `ir`, `processor`.
level: debug
wallet:
path: /path/to/wallet.json # Path to NEP-6 NEO wallet file

View file

@ -180,6 +180,10 @@ FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_PROHIBITED=true
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_5_TAG=treesync
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_5_WEIGHT=5
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_5_LIMIT_OPS=25
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
@ -197,6 +201,9 @@ FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_LIMIT_OPS=2500
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_TAG=policer
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_LIMIT_OPS=2500
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_5_TAG=treesync
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_5_WEIGHT=50
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_5_LIMIT_OPS=100
## 1 shard
### Flag to refill Metabase from BlobStor

View file

@ -252,7 +252,13 @@
{
"tag": "policer",
"weight": 5,
"limit_ops": 25000
"limit_ops": 25000,
"prohibited": true
},
{
"tag": "treesync",
"weight": 5,
"limit_ops": 25
}
]
},
@ -287,6 +293,11 @@
"tag": "policer",
"weight": 50,
"limit_ops": 2500
},
{
"tag": "treesync",
"weight": 50,
"limit_ops": 100
}
]
}

View file

@ -95,6 +95,9 @@ morph:
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
ape_chain_cache_size: 100000
netmap:
candidates:
poll_interval: 20s
apiclient:
dial_timeout: 15s # timeout for FrostFS API client connection
@ -148,7 +151,7 @@ storage:
flush_worker_count: 30 # number of write-cache flusher threads
metabase:
perm: 0644 # permissions for metabase files(directories: +x for current user and group)
perm: 0o644 # permissions for metabase files(directories: +x for current user and group)
max_batch_size: 200
max_batch_delay: 20ms
@ -161,13 +164,13 @@ storage:
blobstor:
- size: 4m # approximate size limit of single blobovnicza instance, total size will be: size*width^(depth+1), bytes
perm: 0644 # permissions for blobstor files(directories: +x for current user and group)
perm: 0o644 # permissions for blobstor files(directories: +x for current user and group)
depth: 1 # max depth of object tree storage in key-value DB
width: 4 # max width of object tree storage in key-value DB
opened_cache_capacity: 50 # maximum number of opened database files
opened_cache_ttl: 5m # ttl for opened database file
opened_cache_exp_interval: 15s # cache cleanup interval for expired blobovnicza's
- perm: 0644 # permissions for blobstor files(directories: +x for current user and group)
- perm: 0o644 # permissions for blobstor files(directories: +x for current user and group)
depth: 5 # max depth of object tree storage in FS
gc:
@ -249,6 +252,10 @@ storage:
- tag: policer
weight: 5
limit_ops: 25000
prohibited: true
- tag: treesync
weight: 5
limit_ops: 25
write:
max_running_ops: 1000
max_waiting_ops: 100
@ -271,6 +278,9 @@ storage:
- tag: policer
weight: 50
limit_ops: 2500
- tag: treesync
weight: 50
limit_ops: 100
1:
writecache:
@ -290,7 +300,7 @@ storage:
pilorama:
path: tmp/1/blob/pilorama.db
no_sync: true # USE WITH CAUTION. Return to user before pages have been persisted.
perm: 0644 # permission to use for the database file and intermediate directories
perm: 0o644 # permission to use for the database file and intermediate directories
tracing:
enabled: true

View file

@ -148,15 +148,19 @@ morph:
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
switch_interval: 2m
netmap:
candidates:
poll_interval: 20s
```
| Parameter | Type | Default value | Description |
| ---------------------- | --------------------------------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. |
| `cache_ttl` | `duration` | Morph block time | Sidechain cache TTL value (min interval between similar calls).<br/>Negative value disables caching.<br/>Cached entities: containers, container lists, eACL tables. |
| `rpc_endpoint` | list of [endpoint descriptions](#rpc_endpoint-subsection) | | Array of endpoint descriptions. |
| `switch_interval` | `duration` | `2m` | Time interval between the attempts to connect to the highest priority RPC node if the connection is not established yet. |
| `ape_chain_cache_size` | `int` | `10000` | Size of the morph cache for APE chains. |
| Parameter | Type | Default value | Description |
|-----------------------------------|-----------------------------------------------------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `dial_timeout` | `duration` | `5s` | Timeout for dialing connections to N3 RPCs. |
| `cache_ttl` | `duration` | Morph block time | Sidechain cache TTL value (min interval between similar calls).<br/>Negative value disables caching.<br/>Cached entities: containers, container lists, eACL tables. |
| `rpc_endpoint` | list of [endpoint descriptions](#rpc_endpoint-subsection) | | Array of endpoint descriptions. |
| `switch_interval` | `duration` | `2m` | Time interval between the attempts to connect to the highest priority RPC node if the connection is not established yet. |
| `ape_chain_cache_size` | `int` | `10000` | Size of the morph cache for APE chains. |
| `netmap.candidates.poll_interval` | `duration` | `20s` | Timeout to set up frequency of merge candidates to netmap with netmap in local cache. |
## `rpc_endpoint` subsection
| Parameter | Type | Default value | Description |
@ -209,7 +213,7 @@ blobstor:
width: 4
- type: fstree
path: /path/to/blobstor/blobovnicza
perm: 0644
perm: 0o644
size: 4194304
depth: 1
width: 4
@ -269,7 +273,7 @@ gc:
```yaml
metabase:
path: /path/to/meta.db
perm: 0644
perm: 0o644
max_batch_size: 200
max_batch_delay: 20ms
```
@ -359,6 +363,7 @@ limits:
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
| `tag.prohibited` | `bool` | false | If true, operations with this specified tag will be prohibited. |
# `node` section

8
go.mod
View file

@ -6,13 +6,13 @@ require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250404152210-6458c11e833d
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20250402100642-acd94d200f88
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/VictoriaMetrics/easyproto v0.1.4

16
go.sum
View file

@ -4,22 +4,22 @@ git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1 h1:k1Qw8dWUQczfo0eVXlhrq9
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1/go.mod h1:5fSm/l5xSjGWqsPUffSdboiGFUHa7y/1S0fvxzQowN8=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d h1:uJ/wvuMdepbkaV8XMS5uN9B0FQWMep0CttSuDZiDhq0=
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2 h1:AovQs7bea0fLnYfldCZB88FkUgRj0QaHkJEbcWfgzvY=
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.5.2/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248 h1:fluzML8BIIabd07LyPSjc0JAV2qymWkPiFaLrXdALLA=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250321063246-93b681a20248/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275 h1:WqWxCnCl2ekfjWja/CpGeF2rf4h0x199xhdnsm/j+E8=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250320142439-32079ad7c275/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47 h1:O2c3VOlaGZ862hf2ZPLBMdTG6vGJzhIgDvFEFGfntzU=
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250331080422-b5ed0b6eff47/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250404152210-6458c11e833d h1:ZLKDupw362Ciing7kdIZhDYGMyo2QZyJ6sS/8X9QWJ0=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250404152210-6458c11e833d/go.mod h1:2PWt5GwJTnhjHp+mankcfCeAJBMn7puxPm+RS+lliVk=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 h1:gPaqGsk6gSWQyNVjaStydfUz6Z/loHc9XyvGrJ5qSPY=
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b h1:M50kdfrf/h8c3cz0bJ2AEUcbXvAlPFVC1Wp1WkfZ/8E=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b/go.mod h1:GZTk55RI4dKzsK6BCn5h2xxE28UHNfgoq/NJxW/LQ6A=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20250402100642-acd94d200f88 h1:V0a7ia84ZpSM2YxpJq1SKLQfeYmsqFWqcxwweBHJIzc=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20250402100642-acd94d200f88/go.mod h1:GZTk55RI4dKzsK6BCn5h2xxE28UHNfgoq/NJxW/LQ6A=
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc=
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA=

View file

@ -1,9 +1,25 @@
package assert
import "strings"
import (
"fmt"
"strings"
)
func True(cond bool, details ...string) {
if !cond {
panic(strings.Join(details, " "))
}
}
func False(cond bool, details ...string) {
if cond {
panic(strings.Join(details, " "))
}
}
func NoError(err error, details ...string) {
if err != nil {
content := fmt.Sprintf("BUG: %v: %s", err, strings.Join(details, " "))
panic(content)
}
}

View file

@ -512,7 +512,8 @@ const (
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag"
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
FailedToUpdateNetmapCandidates = "update netmap candidates failed"
)

31
internal/qos/config.go Normal file
View file

@ -0,0 +1,31 @@
package qos
import (
"math"
"time"
)
const (
NoLimit int64 = math.MaxInt64
DefaultIdleTimeout = 5 * time.Minute
)
type LimiterConfig struct {
Read OpConfig
Write OpConfig
}
type OpConfig struct {
MaxWaitingOps int64
MaxRunningOps int64
IdleTimeout time.Duration
Tags []IOTagConfig
}
type IOTagConfig struct {
Tag string
Weight *float64
LimitOps *float64
ReservedOps *float64
Prohibited bool
}

219
internal/qos/grpc_test.go Normal file
View file

@ -0,0 +1,219 @@
package qos_test
import (
"context"
"errors"
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
const (
okKey = "ok"
)
var (
errTest = errors.New("mock")
errWrongTag = errors.New("wrong tag")
errNoTag = errors.New("failed to get tag from context")
errResExhausted *apistatus.ResourceExhausted
tags = []qos.IOTag{qos.IOTagBackground, qos.IOTagWritecache, qos.IOTagPolicer, qos.IOTagTreeSync}
)
type mockGRPCServerStream struct {
grpc.ServerStream
ctx context.Context
}
func (m *mockGRPCServerStream) Context() context.Context {
return m.ctx
}
type limiter struct {
acquired bool
released bool
}
func (l *limiter) Acquire(key string) (limiting.ReleaseFunc, bool) {
l.acquired = true
if key != okKey {
return nil, false
}
return func() { l.released = true }, true
}
func unaryMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) error {
interceptor := qos.NewMaxActiveRPCLimiterUnaryServerInterceptor(func() limiting.Limiter { return lim })
handler := func(ctx context.Context, req any) (any, error) {
return nil, errTest
}
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{FullMethod: methodName}, handler)
return err
}
func streamMaxActiveRPCLimiter(ctx context.Context, lim *limiter, methodName string) error {
interceptor := qos.NewMaxActiveRPCLimiterStreamServerInterceptor(func() limiting.Limiter { return lim })
handler := func(srv any, stream grpc.ServerStream) error {
return errTest
}
err := interceptor(nil, &mockGRPCServerStream{ctx: ctx}, &grpc.StreamServerInfo{
FullMethod: methodName,
}, handler)
return err
}
func Test_MaxActiveRPCLimiter(t *testing.T) {
// UnaryServerInterceptor
t.Run("unary fail", func(t *testing.T) {
var lim limiter
err := unaryMaxActiveRPCLimiter(context.Background(), &lim, "")
require.ErrorAs(t, err, &errResExhausted)
require.True(t, lim.acquired)
require.False(t, lim.released)
})
t.Run("unary pass critical", func(t *testing.T) {
var lim limiter
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
err := unaryMaxActiveRPCLimiter(ctx, &lim, "")
require.ErrorIs(t, err, errTest)
require.False(t, lim.acquired)
require.False(t, lim.released)
})
t.Run("unary pass", func(t *testing.T) {
var lim limiter
err := unaryMaxActiveRPCLimiter(context.Background(), &lim, okKey)
require.ErrorIs(t, err, errTest)
require.True(t, lim.acquired)
require.True(t, lim.released)
})
// StreamServerInterceptor
t.Run("stream fail", func(t *testing.T) {
var lim limiter
err := streamMaxActiveRPCLimiter(context.Background(), &lim, "")
require.ErrorAs(t, err, &errResExhausted)
require.True(t, lim.acquired)
require.False(t, lim.released)
})
t.Run("stream pass critical", func(t *testing.T) {
var lim limiter
ctx := tagging.ContextWithIOTag(context.Background(), qos.IOTagCritical.String())
err := streamMaxActiveRPCLimiter(ctx, &lim, "")
require.ErrorIs(t, err, errTest)
require.False(t, lim.acquired)
require.False(t, lim.released)
})
t.Run("stream pass", func(t *testing.T) {
var lim limiter
err := streamMaxActiveRPCLimiter(context.Background(), &lim, okKey)
require.ErrorIs(t, err, errTest)
require.True(t, lim.acquired)
require.True(t, lim.released)
})
}
func TestSetCriticalIOTagUnaryServerInterceptor_Pass(t *testing.T) {
interceptor := qos.NewSetCriticalIOTagUnaryServerInterceptor()
called := false
handler := func(ctx context.Context, req any) (any, error) {
called = true
if tag, ok := tagging.IOTagFromContext(ctx); ok && tag == qos.IOTagCritical.String() {
return nil, nil
}
return nil, errWrongTag
}
_, err := interceptor(context.Background(), nil, nil, handler)
require.NoError(t, err)
require.True(t, called)
}
func TestAdjustOutgoingIOTagUnaryClientInterceptor(t *testing.T) {
interceptor := qos.NewAdjustOutgoingIOTagUnaryClientInterceptor()
// check context with no value
called := false
invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
called = true
if _, ok := tagging.IOTagFromContext(ctx); ok {
return fmt.Errorf("%v: expected no IO tags", errWrongTag)
}
return nil
}
require.NoError(t, interceptor(context.Background(), "", nil, nil, nil, invoker, nil))
require.True(t, called)
// check context for internal tag
targetTag := qos.IOTagInternal.String()
invoker = func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
raw, ok := tagging.IOTagFromContext(ctx)
if !ok {
return errNoTag
}
if raw != targetTag {
return errWrongTag
}
return nil
}
for _, tag := range tags {
ctx := tagging.ContextWithIOTag(context.Background(), tag.String())
require.NoError(t, interceptor(ctx, "", nil, nil, nil, invoker, nil))
}
// check context for client tag
ctx := tagging.ContextWithIOTag(context.Background(), "")
targetTag = qos.IOTagClient.String()
require.NoError(t, interceptor(ctx, "", nil, nil, nil, invoker, nil))
}
func TestAdjustOutgoingIOTagStreamClientInterceptor(t *testing.T) {
interceptor := qos.NewAdjustOutgoingIOTagStreamClientInterceptor()
// check context with no value
called := false
streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
called = true
if _, ok := tagging.IOTagFromContext(ctx); ok {
return nil, fmt.Errorf("%v: expected no IO tags", errWrongTag)
}
return nil, nil
}
_, err := interceptor(context.Background(), nil, nil, "", streamer, nil)
require.True(t, called)
require.NoError(t, err)
// check context for internal tag
targetTag := qos.IOTagInternal.String()
streamer = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
raw, ok := tagging.IOTagFromContext(ctx)
if !ok {
return nil, errNoTag
}
if raw != targetTag {
return nil, errWrongTag
}
return nil, nil
}
for _, tag := range tags {
ctx := tagging.ContextWithIOTag(context.Background(), tag.String())
_, err := interceptor(ctx, nil, nil, "", streamer, nil)
require.NoError(t, err)
}
// check context for client tag
ctx := tagging.ContextWithIOTag(context.Background(), "")
targetTag = qos.IOTagClient.String()
_, err = interceptor(ctx, nil, nil, "", streamer, nil)
require.NoError(t, err)
}

View file

@ -8,7 +8,6 @@ import (
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -37,15 +36,15 @@ type scheduler interface {
Close()
}
func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil {
func NewLimiter(c LimiterConfig) (Limiter, error) {
if err := c.Validate(); err != nil {
return nil, err
}
readScheduler, err := createScheduler(c.Read())
readScheduler, err := createScheduler(c.Read)
if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err)
}
writeScheduler, err := createScheduler(c.Write())
writeScheduler, err := createScheduler(c.Write)
if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err)
}
@ -63,8 +62,8 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
return l, nil
}
func createScheduler(config limits.OpConfig) (scheduler, error) {
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
func createScheduler(config OpConfig) (scheduler, error) {
if len(config.Tags) == 0 && config.MaxWaitingOps == NoLimit {
return newSemaphoreScheduler(config.MaxRunningOps), nil
}
return scheduling.NewMClock(
@ -72,7 +71,7 @@ func createScheduler(config limits.OpConfig) (scheduler, error) {
converToSchedulingTags(config.Tags), config.IdleTimeout)
}
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
func converToSchedulingTags(limits []IOTagConfig) map[string]scheduling.TagInfo {
result := make(map[string]scheduling.TagInfo)
for _, tag := range []IOTag{IOTagBackground, IOTagClient, IOTagInternal, IOTagPolicer, IOTagTreeSync, IOTagWritecache} {
result[tag.String()] = scheduling.TagInfo{
@ -90,6 +89,7 @@ func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.T
if l.ReservedOps != nil && *l.ReservedOps != 0 {
v.ReservedIOPS = l.ReservedOps
}
v.Prohibited = l.Prohibited
result[l.Tag] = v
}
return result
@ -164,8 +164,7 @@ func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (R
rel, err := s.RequestArrival(ctx, tag)
stat.inProgress.Add(1)
if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) {
if isResourceExhaustedErr(err) {
stat.resourceExhausted.Add(1)
return nil, &apistatus.ResourceExhausted{}
}
@ -234,3 +233,9 @@ func exportMetrics(metrics Metrics, stats map[string]*stat, shardID, operation s
metrics.SetOperationTagCounters(shardID, operation, tag, pending, inProgress, completed, resExh)
}
}
func isResourceExhaustedErr(err error) bool {
return errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) ||
errors.Is(err, scheduling.ErrTagRequestsProhibited)
}

View file

@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"math"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
)
var errWeightsMustBeSpecified = errors.New("invalid weights: weights must be specified for all tags or not specified for any")
@ -14,17 +12,17 @@ type tagConfig struct {
Shares, Limit, Reserved *float64
}
func validateConfig(c *limits.Config) error {
if err := validateOpConfig(c.Read()); err != nil {
func (c *LimiterConfig) Validate() error {
if err := validateOpConfig(c.Read); err != nil {
return fmt.Errorf("limits 'read' section validation error: %w", err)
}
if err := validateOpConfig(c.Write()); err != nil {
if err := validateOpConfig(c.Write); err != nil {
return fmt.Errorf("limits 'write' section validation error: %w", err)
}
return nil
}
func validateOpConfig(c limits.OpConfig) error {
func validateOpConfig(c OpConfig) error {
if c.MaxRunningOps <= 0 {
return fmt.Errorf("invalid 'max_running_ops = %d': must be greater than zero", c.MaxRunningOps)
}
@ -40,7 +38,7 @@ func validateOpConfig(c limits.OpConfig) error {
return nil
}
func validateTags(configTags []limits.IOTagConfig) error {
func validateTags(configTags []IOTagConfig) error {
tags := map[IOTag]tagConfig{
IOTagBackground: {},
IOTagClient: {},

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
utilTesting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/testing"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -410,11 +411,11 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
},
),
WithNetmapSource(
&testNetmapSource{
netmaps: map[uint64]*netmap.NetMap{
&utilTesting.TestNetmapSource{
Netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM,
},
currentEpoch: curEpoch,
CurrentEpoch: curEpoch,
},
),
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
@ -483,12 +484,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
},
),
WithNetmapSource(
&testNetmapSource{
netmaps: map[uint64]*netmap.NetMap{
&utilTesting.TestNetmapSource{
Netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM,
curEpoch - 1: previousEpochNM,
},
currentEpoch: curEpoch,
CurrentEpoch: curEpoch,
},
),
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
@ -559,12 +560,12 @@ func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
},
),
WithNetmapSource(
&testNetmapSource{
netmaps: map[uint64]*netmap.NetMap{
&utilTesting.TestNetmapSource{
Netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM,
curEpoch - 1: previousEpochNM,
},
currentEpoch: curEpoch,
CurrentEpoch: curEpoch,
},
),
WithLogger(logger.NewLoggerWrapper(zaptest.NewLogger(t))),
@ -596,26 +597,3 @@ func (s *testContainerSource) Get(ctx context.Context, cnrID cid.ID) (*container
func (s *testContainerSource) DeletionInfo(context.Context, cid.ID) (*container.DelInfo, error) {
return nil, nil
}
type testNetmapSource struct {
netmaps map[uint64]*netmap.NetMap
currentEpoch uint64
}
func (s *testNetmapSource) GetNetMap(ctx context.Context, diff uint64) (*netmap.NetMap, error) {
if diff >= s.currentEpoch {
return nil, fmt.Errorf("invalid diff")
}
return s.GetNetMapByEpoch(ctx, s.currentEpoch-diff)
}
func (s *testNetmapSource) GetNetMapByEpoch(ctx context.Context, epoch uint64) (*netmap.NetMap, error) {
if nm, found := s.netmaps[epoch]; found {
return nm, nil
}
return nil, fmt.Errorf("netmap not found")
}
func (s *testNetmapSource) Epoch(ctx context.Context) (uint64, error) {
return s.currentEpoch, nil
}

View file

@ -13,6 +13,13 @@ type ECInfo struct {
Total uint32
}
func (v *ECInfo) String() string {
if v == nil {
return "<nil>"
}
return fmt.Sprintf("parent ID: %s, index: %d, total %d", v.ParentID, v.Index, v.Total)
}
// Info groups object address with its FrostFS
// object info.
type Info struct {
@ -23,5 +30,5 @@ type Info struct {
}
func (v Info) String() string {
return fmt.Sprintf("address: %s, type: %s, is linking: %t", v.Address, v.Type, v.IsLinkingObject)
return fmt.Sprintf("address: %s, type: %s, is linking: %t, EC header: %s", v.Address, v.Type, v.IsLinkingObject, v.ECInfo)
}

View file

@ -50,7 +50,7 @@ func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
var err error
s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log,
Log: s.log.WithTag(logger.TagProcessor),
Metrics: s.irMetrics,
PoolSize: poolSize,
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
@ -159,7 +159,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
} else {
// create governance processor
governanceProcessor, err := governance.New(&governance.Params{
Log: s.log,
Log: s.log.WithTag(logger.TagProcessor),
Metrics: s.irMetrics,
FrostFSClient: frostfsCli,
AlphabetState: s,
@ -225,7 +225,7 @@ func (s *Server) initAlphabetProcessor(ctx context.Context, cfg *viper.Viper) er
// create alphabet processor
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets,
Log: s.log,
Log: s.log.WithTag(logger.TagProcessor),
Metrics: s.irMetrics,
PoolSize: poolSize,
AlphabetContracts: s.contracts.alphabet,
@ -247,7 +247,7 @@ func (s *Server) initContainerProcessor(ctx context.Context, cfg *viper.Viper, c
s.log.Debug(ctx, logs.ContainerContainerWorkerPool, zap.Int("size", poolSize))
// container processor
containerProcessor, err := cont.New(&cont.Params{
Log: s.log,
Log: s.log.WithTag(logger.TagProcessor),
Metrics: s.irMetrics,
PoolSize: poolSize,
AlphabetState: s,
@ -268,7 +268,7 @@ func (s *Server) initBalanceProcessor(ctx context.Context, cfg *viper.Viper, fro
s.log.Debug(ctx, logs.BalanceBalanceWorkerPool, zap.Int("size", poolSize))
// create balance processor
balanceProcessor, err := balance.New(&balance.Params{
Log: s.log,
Log: s.log.WithTag(logger.TagProcessor),
Metrics: s.irMetrics,
PoolSize: poolSize,
FrostFSClient: frostfsCli,
@ -291,7 +291,7 @@ func (s *Server) initFrostFSMainnetProcessor(ctx context.Context, cfg *viper.Vip
s.log.Debug(ctx, logs.FrostFSFrostfsWorkerPool, zap.Int("size", poolSize))
frostfsProcessor, err := frostfs.New(&frostfs.Params{
Log: s.log,
Log: s.log.WithTag(logger.TagProcessor),
Metrics: s.irMetrics,
PoolSize: poolSize,
FrostFSContract: s.contracts.frostfs,
@ -342,7 +342,7 @@ func (s *Server) initGRPCServer(ctx context.Context, cfg *viper.Viper, log *logg
controlSvc := controlsrv.NewAuditService(controlsrv.New(p, s.netmapClient, s.containerClient,
controlsrv.WithAllowedKeys(authKeys),
), log, audit)
), log.WithTag(logger.TagGrpcSvc), audit)
grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
@ -458,7 +458,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
}
morphChain := &chainParams{
log: s.log,
log: s.log.WithTag(logger.TagMorph),
cfg: cfg,
key: s.key,
name: morphPrefix,

View file

@ -339,7 +339,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
) (*Server, error) {
var err error
server := &Server{
log: log,
log: log.WithTag(logger.TagIr),
irMetrics: metrics,
cmode: cmode,
}

View file

@ -141,8 +141,8 @@ func (b *sharedDB) SystemPath() string {
return b.path
}
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDbManager struct {
// levelDBManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDBManager struct {
dbMtx *sync.RWMutex
databases map[uint64]*sharedDB
@ -157,8 +157,8 @@ type levelDbManager struct {
func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *levelDbManager {
result := &levelDbManager{
) *levelDBManager {
result := &levelDBManager{
databases: make(map[uint64]*sharedDB),
dbMtx: &sync.RWMutex{},
@ -173,7 +173,7 @@ func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath st
return result
}
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
func (m *levelDBManager) GetByIndex(idx uint64) *sharedDB {
res := m.getDBIfExists(idx)
if res != nil {
return res
@ -181,14 +181,14 @@ func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
return m.getOrCreateDB(idx)
}
func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB {
func (m *levelDBManager) getDBIfExists(idx uint64) *sharedDB {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return m.databases[idx]
}
func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
func (m *levelDBManager) getOrCreateDB(idx uint64) *sharedDB {
m.dbMtx.Lock()
defer m.dbMtx.Unlock()
@ -202,7 +202,7 @@ func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
return db
}
func (m *levelDbManager) hasAnyDB() bool {
func (m *levelDBManager) hasAnyDB() bool {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
@ -213,7 +213,7 @@ func (m *levelDbManager) hasAnyDB() bool {
//
// The blobovnicza opens at the first request, closes after the last request.
type dbManager struct {
levelToManager map[string]*levelDbManager
levelToManager map[string]*levelDBManager
levelToManagerGuard *sync.RWMutex
closedFlag *atomic.Bool
dbCounter *openDBCounter
@ -231,7 +231,7 @@ func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool,
options: options,
readOnly: readOnly,
metrics: metrics,
levelToManager: make(map[string]*levelDbManager),
levelToManager: make(map[string]*levelDBManager),
levelToManagerGuard: &sync.RWMutex{},
log: log,
closedFlag: &atomic.Bool{},
@ -266,7 +266,7 @@ func (m *dbManager) Close() {
m.dbCounter.WaitUntilAllClosed()
}
func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager {
func (m *dbManager) getLevelManager(lvlPath string) *levelDBManager {
result := m.getLevelManagerIfExists(lvlPath)
if result != nil {
return result
@ -274,14 +274,14 @@ func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager {
return m.getOrCreateLevelManager(lvlPath)
}
func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager {
func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDBManager {
m.levelToManagerGuard.RLock()
defer m.levelToManagerGuard.RUnlock()
return m.levelToManager[lvlPath]
}
func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDBManager {
m.levelToManagerGuard.Lock()
defer m.levelToManagerGuard.Unlock()

View file

@ -328,7 +328,7 @@ func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobo
return nil
}
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDB *sharedDB) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
@ -341,7 +341,7 @@ func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB)
b.dbFilesGuard.Lock()
defer b.dbFilesGuard.Unlock()
if err := shDb.CloseAndRemoveFile(ctx); err != nil {
if err := shDB.CloseAndRemoveFile(ctx); err != nil {
return false, err
}
b.commondbManager.CleanResources(path)

View file

@ -153,5 +153,5 @@ func WithMetrics(m Metrics) Option {
}
func (b *BlobStor) Compressor() *compression.Config {
return &b.cfg.compression
return &b.compression
}

View file

@ -50,7 +50,7 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc)
_, err := s.Iterate(context.Background(), iterPrm)
require.NoError(t, err)
require.Equal(t, len(objects), len(seen))
require.Len(t, objects, len(seen))
for i := range objects {
d, ok := seen[objects[i].addr.String()]
require.True(t, ok)

View file

@ -74,7 +74,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm)
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr)
csRes, err := sh.Shard.ContainerSize(ctx, csPrm)
csRes, err := sh.ContainerSize(ctx, csPrm)
if err != nil {
e.reportShardError(ctx, sh, "can't get container size", err,
zap.Stringer("container_id", prm.cnr))
@ -119,7 +119,7 @@ func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes {
uniqueIDs := make(map[string]cid.ID)
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
res, err := sh.Shard.ListContainers(ctx, shard.ListContainersPrm{})
res, err := sh.ListContainers(ctx, shard.ListContainersPrm{})
if err != nil {
e.reportShardError(ctx, sh, "can't get list of containers", err)
return false

View file

@ -22,10 +22,6 @@ type shardInitError struct {
// Open opens all StorageEngine's components.
func (e *StorageEngine) Open(ctx context.Context) error {
return e.open(ctx)
}
func (e *StorageEngine) open(ctx context.Context) error {
e.mtx.Lock()
defer e.mtx.Unlock()
@ -77,7 +73,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
errCh := make(chan shardInitError, len(e.shards))
var eg errgroup.Group
if e.cfg.lowMem && e.anyShardRequiresRefill() {
if e.lowMem && e.anyShardRequiresRefill() {
eg.SetLimit(1)
}
@ -149,11 +145,11 @@ var errClosed = errors.New("storage engine is closed")
func (e *StorageEngine) Close(ctx context.Context) error {
close(e.closeCh)
defer e.wg.Wait()
return e.setBlockExecErr(ctx, errClosed)
return e.closeEngine(ctx)
}
// closes all shards. Never returns an error, shard errors are logged.
func (e *StorageEngine) close(ctx context.Context) error {
func (e *StorageEngine) closeAllShards(ctx context.Context) error {
e.mtx.RLock()
defer e.mtx.RUnlock()
@ -176,70 +172,23 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error {
e.blockExec.mtx.RLock()
defer e.blockExec.mtx.RUnlock()
if e.blockExec.err != nil {
return e.blockExec.err
if e.blockExec.closed {
return errClosed
}
return op()
}
// sets the flag of blocking execution of all data operations according to err:
// - err != nil, then blocks the execution. If exec wasn't blocked, calls close method
// (if err == errClosed => additionally releases pools and does not allow to resume executions).
// - otherwise, resumes execution. If exec was blocked, calls open method.
//
// Can be called concurrently with exec. In this case it waits for all executions to complete.
func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
func (e *StorageEngine) closeEngine(ctx context.Context) error {
e.blockExec.mtx.Lock()
defer e.blockExec.mtx.Unlock()
prevErr := e.blockExec.err
wasClosed := errors.Is(prevErr, errClosed)
if wasClosed {
if e.blockExec.closed {
return errClosed
}
e.blockExec.err = err
if err == nil {
if prevErr != nil { // block -> ok
return e.open(ctx)
}
} else if prevErr == nil { // ok -> block
return e.close(ctx)
}
// otherwise do nothing
return nil
}
// BlockExecution blocks the execution of any data-related operation. All blocked ops will return err.
// To resume the execution, use ResumeExecution method.
//
// Сan be called regardless of the fact of the previous blocking. If execution wasn't blocked, releases all resources
// similar to Close. Can be called concurrently with Close and any data related method (waits for all executions
// to complete). Returns error if any Close has been called before.
//
// Must not be called concurrently with either Open or Init.
//
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
// for this.
func (e *StorageEngine) BlockExecution(err error) error {
return e.setBlockExecErr(context.Background(), err)
}
// ResumeExecution resumes the execution of any data-related operation.
// To block the execution, use BlockExecution method.
//
// Сan be called regardless of the fact of the previous blocking. If execution was blocked, prepares all resources
// similar to Open. Can be called concurrently with Close and any data related method (waits for all executions
// to complete). Returns error if any Close has been called before.
//
// Must not be called concurrently with either Open or Init.
func (e *StorageEngine) ResumeExecution() error {
return e.setBlockExecErr(context.Background(), nil)
e.blockExec.closed = true
return e.closeAllShards(ctx)
}
type ReConfiguration struct {

View file

@ -2,7 +2,6 @@ package engine
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
@ -12,17 +11,14 @@ import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
@ -163,42 +159,6 @@ func testEngineFailInitAndReload(t *testing.T, degradedMode bool, opts []shard.O
require.Equal(t, 1, shardCount)
}
func TestExecBlocks(t *testing.T) {
e := testNewEngine(t).setShardsNum(t, 2).prepare(t).engine // number doesn't matter in this test, 2 is several but not many
// put some object
obj := testutil.GenerateObjectWithCID(cidtest.ID())
addr := object.AddressOf(obj)
require.NoError(t, Put(context.Background(), e, obj, false))
// block executions
errBlock := errors.New("block exec err")
require.NoError(t, e.BlockExecution(errBlock))
// try to exec some op
_, err := Head(context.Background(), e, addr)
require.ErrorIs(t, err, errBlock)
// resume executions
require.NoError(t, e.ResumeExecution())
_, err = Head(context.Background(), e, addr) // can be any data-related op
require.NoError(t, err)
// close
require.NoError(t, e.Close(context.Background()))
// try exec after close
_, err = Head(context.Background(), e, addr)
require.Error(t, err)
// try to resume
require.Error(t, e.ResumeExecution())
}
func TestPersistentShardID(t *testing.T) {
dir := t.TempDir()

View file

@ -33,9 +33,8 @@ type StorageEngine struct {
wg sync.WaitGroup
blockExec struct {
mtx sync.RWMutex
err error
mtx sync.RWMutex
closed bool
}
evacuateLimiter *evacuationLimiter
}
@ -212,12 +211,18 @@ func New(opts ...Option) *StorageEngine {
opts[i](c)
}
evLimMtx := &sync.RWMutex{}
evLimCond := sync.NewCond(evLimMtx)
return &StorageEngine{
cfg: c,
shards: make(map[string]hashedShard),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{},
cfg: c,
shards: make(map[string]hashedShard),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{
guard: evLimMtx,
statusCond: evLimCond,
},
}
}

View file

@ -2,8 +2,11 @@ package engine
import (
"context"
"fmt"
"path/filepath"
"sync/atomic"
"runtime/debug"
"strings"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
@ -157,26 +160,74 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
var _ qos.Limiter = (*testQoSLimiter)(nil)
type testQoSLimiter struct {
t testing.TB
read atomic.Int64
write atomic.Int64
t testing.TB
quard sync.Mutex
id int64
readStacks map[int64][]byte
writeStacks map[int64][]byte
}
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
t.quard.Lock()
defer t.quard.Unlock()
var sb strings.Builder
var seqN int
for _, stack := range t.readStacks {
seqN++
sb.WriteString(fmt.Sprintf("%d\n read request stack after limiter close: %s\n", seqN, string(stack)))
}
for _, stack := range t.writeStacks {
seqN++
sb.WriteString(fmt.Sprintf("%d\n write request stack after limiter close: %s\n", seqN, string(stack)))
}
require.True(t.t, seqN == 0, sb.String())
}
func (t *testQoSLimiter) ReadRequest(context.Context) (qos.ReleaseFunc, error) {
t.read.Add(1)
return func() { t.read.Add(-1) }, nil
t.quard.Lock()
defer t.quard.Unlock()
stack := debug.Stack()
t.id++
id := t.id
if t.readStacks == nil {
t.readStacks = make(map[int64][]byte)
}
t.readStacks[id] = stack
return func() {
t.quard.Lock()
defer t.quard.Unlock()
delete(t.readStacks, id)
}, nil
}
func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) {
t.write.Add(1)
return func() { t.write.Add(-1) }, nil
t.quard.Lock()
defer t.quard.Unlock()
stack := debug.Stack()
t.id++
id := t.id
if t.writeStacks == nil {
t.writeStacks = make(map[int64][]byte)
}
t.writeStacks[id] = stack
return func() {
t.quard.Lock()
defer t.quard.Unlock()
delete(t.writeStacks, id)
}, nil
}
func (t *testQoSLimiter) SetParentID(string) {}

View file

@ -95,8 +95,7 @@ func (s *EvacuationState) StartedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.startedAt == defaultTime {
if s.startedAt.IsZero() {
return nil
}
return &s.startedAt
@ -106,8 +105,7 @@ func (s *EvacuationState) FinishedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.finishedAt == defaultTime {
if s.finishedAt.IsZero() {
return nil
}
return &s.finishedAt
@ -141,7 +139,8 @@ type evacuationLimiter struct {
eg *errgroup.Group
cancel context.CancelFunc
guard sync.RWMutex
guard *sync.RWMutex
statusCond *sync.Cond // used in unit tests
}
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
@ -167,6 +166,7 @@ func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, res
startedAt: time.Now().UTC(),
result: result,
}
l.statusCond.Broadcast()
return l.eg, egCtx, nil
}
@ -182,6 +182,7 @@ func (l *evacuationLimiter) Complete(err error) {
l.state.processState = EvacuateProcessStateCompleted
l.state.errMessage = errMsq
l.state.finishedAt = time.Now().UTC()
l.statusCond.Broadcast()
l.eg = nil
}
@ -216,6 +217,7 @@ func (l *evacuationLimiter) ResetEvacuationStatus() error {
l.state = EvacuationState{}
l.eg = nil
l.cancel = nil
l.statusCond.Broadcast()
return nil
}

View file

@ -204,11 +204,10 @@ func TestEvacuateShardObjects(t *testing.T) {
func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState {
var st *EvacuationState
var err error
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond)
e.evacuateLimiter.waitForCompleted()
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
return st
}
@ -817,3 +816,12 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err)
}
func (l *evacuationLimiter) waitForCompleted() {
l.guard.Lock()
defer l.guard.Unlock()
for l.state.processState != EvacuateProcessStateCompleted {
l.statusCond.Wait()
}
}

View file

@ -227,7 +227,7 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
var outErr error
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locked, err = h.Shard.IsLocked(ctx, addr)
locked, err = h.IsLocked(ctx, addr)
if err != nil {
e.reportShardError(ctx, h, "can't check object's lockers", err, zap.Stringer("address", addr))
outErr = err
@ -256,7 +256,7 @@ func (e *StorageEngine) GetLocks(ctx context.Context, addr oid.Address) ([]oid.I
var outErr error
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locks, err := h.Shard.GetLocks(ctx, addr)
locks, err := h.GetLocks(ctx, addr)
if err != nil {
e.reportShardError(ctx, h, logs.EngineInterruptGettingLockers, err, zap.Stringer("address", addr))
outErr = err

View file

@ -84,17 +84,11 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
var siErr *objectSDK.SplitInfoError
var eiErr *objectSDK.ECInfoError
if errors.As(err, &eiErr) {
eclocked := []oid.ID{locked}
for _, chunk := range eiErr.ECInfo().Chunks {
var objID oid.ID
err = objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return false
}
eclocked = append(eclocked, objID)
eclocked, ok := e.checkECLocked(ctx, sh, idCnr, locker, locked, eiErr)
if !ok {
return false
}
err = sh.Lock(ctx, idCnr, locker, eclocked)
if err != nil {
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
@ -137,3 +131,18 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
})
return
}
func (e *StorageEngine) checkECLocked(ctx context.Context, sh hashedShard, idCnr cid.ID, locker, locked oid.ID, eiErr *objectSDK.ECInfoError) ([]oid.ID, bool) {
eclocked := []oid.ID{locked}
for _, chunk := range eiErr.ECInfo().Chunks {
var objID oid.ID
err := objID.ReadFromV2(chunk.ID)
if err != nil {
e.reportShardError(ctx, sh, "could not lock object in shard", err, zap.Stringer("container_id", idCnr),
zap.Stringer("locker_id", locker), zap.Stringer("locked_id", locked))
return nil, false
}
eclocked = append(eclocked, objID)
}
return eclocked, true
}

View file

@ -118,7 +118,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
return nil, fmt.Errorf("add %s shard: %w", sh.ID().String(), err)
}
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
e.metrics.SetMode(sh.ID().String(), sh.GetMode())
return sh.ID(), nil
}
@ -318,8 +318,6 @@ func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.M
// HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
ev := shard.EventNewEpoch(epoch)
e.mtx.RLock()
defer e.mtx.RUnlock()
@ -327,7 +325,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
select {
case <-ctx.Done():
return
case sh.NotificationChannel() <- ev:
case sh.NotificationChannel() <- epoch:
default:
e.log.Debug(ctx, logs.ShardEventProcessingInProgress,
zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID()))

View file

@ -376,11 +376,12 @@ func parentLength(tx *bbolt.Tx, addr oid.Address) int {
return len(lst)
}
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt := tx.Bucket(item.name)
if bkt != nil {
_ = bkt.Delete(item.key) // ignore error, best effort there
return bkt.Delete(item.key)
}
return nil
}
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
@ -405,19 +406,16 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
// if list empty, remove the key from <list> bucket
if len(lst) == 0 {
_ = bkt.Delete(item.key) // ignore error, best effort there
return nil
return bkt.Delete(item.key)
}
// if list is not empty, then update it
encodedLst, err := encodeList(lst)
if err != nil {
return nil // ignore error, best effort there
return err
}
_ = bkt.Put(item.key, encodedLst) // ignore error, best effort there
return nil
return bkt.Put(item.key, encodedLst)
}
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
@ -480,35 +478,47 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
return ErrUnknownObjectType
}
delUniqueIndexItem(tx, namedBucketItem{
if err := delUniqueIndexItem(tx, namedBucketItem{
name: bucketName,
key: objKey,
})
}); err != nil {
return err
}
} else {
delUniqueIndexItem(tx, namedBucketItem{
if err := delUniqueIndexItem(tx, namedBucketItem{
name: parentBucketName(cnr, bucketName),
key: objKey,
})
}); err != nil {
return err
}
}
delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
if err := delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
name: smallBucketName(cnr, bucketName),
key: objKey,
})
delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
}); err != nil {
return err
}
if err := delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
name: rootBucketName(cnr, bucketName),
key: objKey,
})
}); err != nil {
return err
}
if expEpoch, ok := hasExpirationEpoch(obj); ok {
delUniqueIndexItem(tx, namedBucketItem{
if err := delUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
})
delUniqueIndexItem(tx, namedBucketItem{
}); err != nil {
return err
}
if err := delUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
key: objKey,
})
}); err != nil {
return err
}
}
return nil
@ -535,10 +545,12 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
// also drop EC parent root info if current EC chunk is the last one
if !hasAnyChunks {
delUniqueIndexItem(tx, namedBucketItem{
if err := delUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
})
}); err != nil {
return err
}
}
if ech.ParentSplitParentID() == nil {
@ -572,11 +584,10 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
}
// drop split info
delUniqueIndexItem(tx, namedBucketItem{
return delUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
})
return nil
}
func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {

View file

@ -139,8 +139,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
var containerID cid.ID
var offset []byte
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
bc := newBucketCache()
rawAddr := make([]byte, cidSize, addressKeySize)
@ -169,7 +168,7 @@ loop:
bkt := tx.Bucket(name)
if bkt != nil {
copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, offset, cursor, err = selectNFromBucket(bc, bkt, objType, rawAddr, containerID,
result, count, cursor, threshold, currEpoch)
if err != nil {
return nil, nil, err
@ -204,9 +203,10 @@ loop:
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
// object to start selecting from. Ignores inhumed objects.
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
func selectNFromBucket(
bc *bucketCache,
bkt *bbolt.Bucket, // main bucket
objType objectSDK.Type, // type of the objects stored in the main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID
to []objectcore.Info, // listing result
@ -219,7 +219,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
cursor = new(Cursor)
}
count := len(to)
c := bkt.Cursor()
k, v := c.First()
@ -231,7 +230,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
}
for ; k != nil; k, v = c.Next() {
if count >= limit {
if len(to) >= limit {
break
}
@ -241,6 +240,8 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
}
offset = k
graveyardBkt := getGraveyardBucket(bc, bkt.Tx())
garbageBkt := getGarbageBucket(bc, bkt.Tx())
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
@ -251,7 +252,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
}
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
if hasExpEpoch && expEpoch < currEpoch && !objectLocked(bkt.Tx(), cnt, obj) {
if hasExpEpoch && expEpoch < currEpoch && !objectLockedWithCache(bc, bkt.Tx(), cnt, obj) {
continue
}
@ -273,7 +274,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
a.SetContainer(cnt)
a.SetObject(obj)
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
count++
}
return to, offset, cursor, nil

View file

@ -1582,12 +1582,12 @@ func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
func (t *boltForest) logFromBytes(lm *Move, data []byte) error {
lm.Child = binary.LittleEndian.Uint64(data)
lm.Parent = binary.LittleEndian.Uint64(data[8:])
return lm.Meta.FromBytes(data[16:])
return lm.FromBytes(data[16:])
}
func (t *boltForest) logToBytes(lm *Move) []byte {
w := io.NewBufBinWriter()
size := 8 + 8 + lm.Meta.Size() + 1
size := 8 + 8 + lm.Size() + 1
// if lm.HasOld {
// size += 8 + lm.Old.Meta.Size()
// }
@ -1595,7 +1595,7 @@ func (t *boltForest) logToBytes(lm *Move) []byte {
w.Grow(size)
w.WriteU64LE(lm.Child)
w.WriteU64LE(lm.Parent)
lm.Meta.EncodeBinary(w.BinWriter)
lm.EncodeBinary(w.BinWriter)
// w.WriteBool(lm.HasOld)
// if lm.HasOld {
// w.WriteU64LE(lm.Old.Parent)

View file

@ -177,7 +177,7 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
var res []NodeInfo
for _, nodeID := range nodeIDs {
children := s.tree.getChildren(nodeID)
children := s.getChildren(nodeID)
for _, childID := range children {
var found bool
for _, kv := range s.infoMap[childID].Meta.Items {
@ -222,7 +222,7 @@ func (f *memoryForest) TreeGetChildren(_ context.Context, cid cid.ID, treeID str
return nil, ErrTreeNotFound
}
children := s.tree.getChildren(nodeID)
children := s.getChildren(nodeID)
res := make([]NodeInfo, 0, len(children))
for _, childID := range children {
res = append(res, NodeInfo{

View file

@ -35,9 +35,9 @@ func newMemoryTree() *memoryTree {
// undo un-does op and changes s in-place.
func (s *memoryTree) undo(op *move) {
if op.HasOld {
s.tree.infoMap[op.Child] = op.Old
s.infoMap[op.Child] = op.Old
} else {
delete(s.tree.infoMap, op.Child)
delete(s.infoMap, op.Child)
}
}
@ -83,8 +83,8 @@ func (s *memoryTree) do(op *Move) move {
},
}
shouldPut := !s.tree.isAncestor(op.Child, op.Parent)
p, ok := s.tree.infoMap[op.Child]
shouldPut := !s.isAncestor(op.Child, op.Parent)
p, ok := s.infoMap[op.Child]
if ok {
lm.HasOld = true
lm.Old = p
@ -100,7 +100,7 @@ func (s *memoryTree) do(op *Move) move {
p.Meta = m
p.Parent = op.Parent
s.tree.infoMap[op.Child] = p
s.infoMap[op.Child] = p
return lm
}
@ -192,7 +192,7 @@ func (t tree) getByPath(attr string, path []string, latest bool) []Node {
}
var nodes []Node
var lastTs Timestamp
var lastTS Timestamp
children := t.getChildren(curNode)
for i := range children {
@ -200,7 +200,7 @@ func (t tree) getByPath(attr string, path []string, latest bool) []Node {
fileName := string(info.Meta.GetAttr(attr))
if fileName == path[len(path)-1] {
if latest {
if info.Meta.Time >= lastTs {
if info.Meta.Time >= lastTS {
nodes = append(nodes[:0], children[i])
}
} else {

View file

@ -108,19 +108,17 @@ func (s *Shard) Init(ctx context.Context) error {
s.updateMetrics(ctx)
s.gc = &gc{
gcCfg: &s.gcCfg,
remover: s.removeGarbage,
stopChannel: make(chan struct{}),
eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredMetrics,
},
gcCfg: &s.gcCfg,
remover: s.removeGarbage,
stopChannel: make(chan struct{}),
newEpochChan: make(chan uint64),
newEpochHandlers: &newEpochHandlers{
cancelFunc: func() {},
handlers: []newEpochHandler{
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredMetrics,
},
},
}
@ -216,8 +214,8 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
}
eg, egCtx := errgroup.WithContext(ctx)
if s.cfg.refillMetabaseWorkersCount > 0 {
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
if s.refillMetabaseWorkersCount > 0 {
eg.SetLimit(s.refillMetabaseWorkersCount)
}
var completedCount uint64
@ -365,6 +363,7 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
// Close releases all Shard's components.
func (s *Shard) Close(ctx context.Context) error {
unlock := s.lockExclusive()
if s.rb != nil {
s.rb.Stop(ctx, s.log)
}
@ -390,15 +389,19 @@ func (s *Shard) Close(ctx context.Context) error {
}
}
if s.opsLimiter != nil {
s.opsLimiter.Close()
}
unlock()
// GC waits for handlers and remover to complete. Handlers may try to lock shard's lock.
// So to prevent deadlock GC stopping is outside of exclusive lock.
// If Init/Open was unsuccessful gc can be nil.
if s.gc != nil {
s.gc.stop(ctx)
}
if s.opsLimiter != nil {
s.opsLimiter.Close()
}
return lastErr
}

View file

@ -33,41 +33,14 @@ type TombstoneSource interface {
IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool
}
// Event represents class of external events.
type Event interface {
typ() eventType
}
type newEpochHandler func(context.Context, uint64)
type eventType int
const (
_ eventType = iota
eventNewEpoch
)
type newEpoch struct {
epoch uint64
}
func (e newEpoch) typ() eventType {
return eventNewEpoch
}
// EventNewEpoch returns new epoch event.
func EventNewEpoch(e uint64) Event {
return newEpoch{
epoch: e,
}
}
type eventHandler func(context.Context, Event)
type eventHandlers struct {
type newEpochHandlers struct {
prevGroup sync.WaitGroup
cancelFunc context.CancelFunc
handlers []eventHandler
handlers []newEpochHandler
}
type gcRunResult struct {
@ -109,10 +82,10 @@ type gc struct {
remover func(context.Context) gcRunResult
// eventChan is used only for listening for the new epoch event.
// newEpochChan is used only for listening for the new epoch event.
// It is ok to keep opened, we are listening for context done when writing in it.
eventChan chan Event
mEventHandler map[eventType]*eventHandlers
newEpochChan chan uint64
newEpochHandlers *newEpochHandlers
}
type gcCfg struct {
@ -142,15 +115,7 @@ func defaultGCCfg() gcCfg {
}
func (gc *gc) init(ctx context.Context) {
sz := 0
for _, v := range gc.mEventHandler {
sz += len(v.handlers)
}
if sz > 0 {
gc.workerPool = gc.workerPoolInit(sz)
}
gc.workerPool = gc.workerPoolInit(len(gc.newEpochHandlers.handlers))
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
gc.wg.Add(2)
go gc.tickRemover(ctx)
@ -168,7 +133,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
case <-ctx.Done():
gc.log.Warn(ctx, logs.ShardStopEventListenerByContext)
return
case event, ok := <-gc.eventChan:
case event, ok := <-gc.newEpochChan:
if !ok {
gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel)
return
@ -179,38 +144,33 @@ func (gc *gc) listenEvents(ctx context.Context) {
}
}
func (gc *gc) handleEvent(ctx context.Context, event Event) {
v, ok := gc.mEventHandler[event.typ()]
if !ok {
return
}
v.cancelFunc()
v.prevGroup.Wait()
func (gc *gc) handleEvent(ctx context.Context, epoch uint64) {
gc.newEpochHandlers.cancelFunc()
gc.newEpochHandlers.prevGroup.Wait()
var runCtx context.Context
runCtx, v.cancelFunc = context.WithCancel(ctx)
runCtx, gc.newEpochHandlers.cancelFunc = context.WithCancel(ctx)
v.prevGroup.Add(len(v.handlers))
gc.newEpochHandlers.prevGroup.Add(len(gc.newEpochHandlers.handlers))
for i := range v.handlers {
for i := range gc.newEpochHandlers.handlers {
select {
case <-ctx.Done():
return
default:
}
h := v.handlers[i]
h := gc.newEpochHandlers.handlers[i]
err := gc.workerPool.Submit(func() {
defer v.prevGroup.Done()
h(runCtx, event)
defer gc.newEpochHandlers.prevGroup.Done()
h(runCtx, epoch)
})
if err != nil {
gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool,
zap.Error(err),
)
v.prevGroup.Done()
gc.newEpochHandlers.prevGroup.Done()
}
}
}
@ -267,6 +227,9 @@ func (gc *gc) stop(ctx context.Context) {
gc.log.Info(ctx, logs.ShardWaitingForGCWorkersToStop)
gc.wg.Wait()
gc.newEpochHandlers.cancelFunc()
gc.newEpochHandlers.prevGroup.Wait()
}
// iterates over metabase and deletes objects
@ -357,12 +320,12 @@ func (s *Shard) getGarbage(ctx context.Context) ([]oid.Address, error) {
}
func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
workerCount = max(minExpiredWorkers, s.gc.gcCfg.expiredCollectorWorkerCount)
batchSize = max(minExpiredBatchSize, s.gc.gcCfg.expiredCollectorBatchSize)
workerCount = max(minExpiredWorkers, s.gc.expiredCollectorWorkerCount)
batchSize = max(minExpiredBatchSize, s.gc.expiredCollectorBatchSize)
return
}
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
func (s *Shard) collectExpiredObjects(ctx context.Context, epoch uint64) {
var err error
startedAt := time.Now()
@ -370,8 +333,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular)
}()
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch))
workersCount, batchSize := s.getExpiredObjectsParameters()
@ -380,7 +343,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
batch = append(batch, o.Address())
@ -486,7 +449,7 @@ func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeR
return s.metaBase.Inhume(ctx, inhumePrm)
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
func (s *Shard) collectExpiredTombstones(ctx context.Context, epoch uint64) {
var err error
startedAt := time.Now()
@ -494,7 +457,6 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone)
}()
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
@ -527,7 +489,8 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
return
}
release, err := s.opsLimiter.ReadRequest(ctx)
var release qos.ReleaseFunc
release, err = s.opsLimiter.ReadRequest(ctx)
if err != nil {
log.Error(ctx, logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock()
@ -565,7 +528,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
}
}
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
func (s *Shard) collectExpiredLocks(ctx context.Context, epoch uint64) {
var err error
startedAt := time.Now()
@ -573,8 +536,8 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock)
}()
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", epoch))
workersCount, batchSize := s.getExpiredObjectsParameters()
@ -584,14 +547,14 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
if o.Type() == objectSDK.TypeLock {
batch = append(batch, o.Address())
if len(batch) == batchSize {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
s.expiredLocksCallback(egCtx, epoch, expired)
return egCtx.Err()
})
batch = make([]oid.Address, 0, batchSize)
@ -605,7 +568,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
if len(batch) > 0 {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
s.expiredLocksCallback(egCtx, epoch, expired)
return egCtx.Err()
})
}
@ -704,7 +667,10 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
// HandleExpiredLocks unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage.
func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
if s.GetMode().NoMetabase() {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return
}
@ -767,7 +733,10 @@ func (s *Shard) inhumeUnlockedIfExpired(ctx context.Context, epoch uint64, unloc
// HandleDeletedLocks unlocks all objects which were locked by lockers.
func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
if s.GetMode().NoMetabase() {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return
}
@ -784,17 +753,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
}
}
// NotificationChannel returns channel for shard events.
func (s *Shard) NotificationChannel() chan<- Event {
return s.gc.eventChan
// NotificationChannel returns channel for new epoch events.
func (s *Shard) NotificationChannel() chan<- uint64 {
return s.gc.newEpochChan
}
func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
func (s *Shard) collectExpiredMetrics(ctx context.Context, epoch uint64) {
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
defer span.End()
epoch := e.(newEpoch).epoch
s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))

View file

@ -69,7 +69,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
require.NoError(t, err)
epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
sh.gc.handleEvent(context.Background(), epoch.Value)
var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj))
@ -165,7 +165,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
sh.gc.handleEvent(context.Background(), epoch.Value)
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")

View file

@ -45,7 +45,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
}
shardID := s.info.ID.String()
s.cfg.metricsWriter.SetShardID(shardID)
s.metricsWriter.SetShardID(shardID)
if s.writeCache != nil && s.writeCache.GetMetrics() != nil {
s.writeCache.GetMetrics().SetShardID(shardID)
}

View file

@ -218,7 +218,7 @@ func WithWriteCache(use bool) Option {
// hasWriteCache returns bool if write cache exists on shards.
func (s *Shard) hasWriteCache() bool {
return s.cfg.useWriteCache
return s.useWriteCache
}
// NeedRefillMetabase returns true if metabase is needed to be refilled.
@ -379,15 +379,15 @@ func WithLimiter(l qos.Limiter) Option {
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
s.cfg.info.Mode = s.GetMode()
s.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.info.BlobStorInfo = s.blobStor.DumpInfo()
s.info.Mode = s.GetMode()
if s.cfg.useWriteCache {
s.cfg.info.WriteCacheInfo = s.writeCache.DumpInfo()
if s.useWriteCache {
s.info.WriteCacheInfo = s.writeCache.DumpInfo()
}
if s.pilorama != nil {
s.cfg.info.PiloramaInfo = s.pilorama.DumpInfo()
s.info.PiloramaInfo = s.pilorama.DumpInfo()
}
}
@ -454,57 +454,57 @@ func (s *Shard) updateMetrics(ctx context.Context) {
s.setContainerObjectsCount(contID.EncodeToString(), logical, count.Logic)
s.setContainerObjectsCount(contID.EncodeToString(), user, count.User)
}
s.cfg.metricsWriter.SetMode(s.info.Mode)
s.metricsWriter.SetMode(s.info.Mode)
}
// incObjectCounter increment both physical and logical object
// counters.
func (s *Shard) incObjectCounter(cnrID cid.ID, isUser bool) {
s.cfg.metricsWriter.IncObjectCounter(physical)
s.cfg.metricsWriter.IncObjectCounter(logical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
s.metricsWriter.IncObjectCounter(physical)
s.metricsWriter.IncObjectCounter(logical)
s.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
s.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
if isUser {
s.cfg.metricsWriter.IncObjectCounter(user)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), user)
s.metricsWriter.IncObjectCounter(user)
s.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), user)
}
}
func (s *Shard) decObjectCounterBy(typ string, v uint64) {
if v > 0 {
s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v))
s.metricsWriter.AddToObjectCounter(typ, -int(v))
}
}
func (s *Shard) setObjectCounterBy(typ string, v uint64) {
if v > 0 {
s.cfg.metricsWriter.SetObjectCounter(typ, v)
s.metricsWriter.SetObjectCounter(typ, v)
}
}
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
for cnrID, count := range byCnr {
if count.Phy > 0 {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy)
s.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy)
}
if count.Logic > 0 {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic)
s.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic)
}
if count.User > 0 {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), user, count.User)
s.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), user, count.User)
}
}
}
func (s *Shard) addToContainerSize(cnr string, size int64) {
if size != 0 {
s.cfg.metricsWriter.AddToContainerSize(cnr, size)
s.metricsWriter.AddToContainerSize(cnr, size)
}
}
func (s *Shard) addToPayloadSize(size int64) {
if size != 0 {
s.cfg.metricsWriter.AddToPayloadSize(size)
s.metricsWriter.AddToPayloadSize(size)
}
}

View file

@ -151,20 +151,6 @@ func (e *notHaltStateError) Error() string {
)
}
// implementation of error interface for FrostFS-specific errors.
type frostfsError struct {
err error
}
func (e frostfsError) Error() string {
return fmt.Sprintf("frostfs error: %v", e.err)
}
// wraps FrostFS-specific error into frostfsError. Arg must not be nil.
func wrapFrostFSError(err error) error {
return frostfsError{err}
}
// Invoke invokes contract method by sending transaction into blockchain.
// Returns valid until block value.
// Supported args types: int64, string, util.Uint160, []byte and bool.
@ -228,7 +214,7 @@ func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int
if err != nil {
return err
} else if val.State != HaltState {
return wrapFrostFSError(&notHaltStateError{state: val.State, exception: val.FaultException})
return &notHaltStateError{state: val.State, exception: val.FaultException}
}
arr, sid, r, err := unwrap.ArrayAndSessionIterator(val, err)
@ -292,7 +278,7 @@ func (c *Client) TestInvoke(contract util.Uint160, method string, args ...any) (
}
if val.State != HaltState {
return nil, wrapFrostFSError(&notHaltStateError{state: val.State, exception: val.FaultException})
return nil, &notHaltStateError{state: val.State, exception: val.FaultException}
}
success = true

View file

@ -2,7 +2,6 @@ package netmap
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
@ -26,44 +25,24 @@ const (
// MaxObjectSize receives max object size configuration
// value through the Netmap contract call.
func (c *Client) MaxObjectSize(ctx context.Context) (uint64, error) {
objectSize, err := c.readUInt64Config(ctx, MaxObjectSizeConfig)
if err != nil {
return 0, err
}
return objectSize, nil
return c.readUInt64Config(ctx, MaxObjectSizeConfig)
}
// EpochDuration returns number of sidechain blocks per one FrostFS epoch.
func (c *Client) EpochDuration(ctx context.Context) (uint64, error) {
epochDuration, err := c.readUInt64Config(ctx, EpochDurationConfig)
if err != nil {
return 0, err
}
return epochDuration, nil
return c.readUInt64Config(ctx, EpochDurationConfig)
}
// ContainerFee returns fee paid by container owner to each alphabet node
// for container registration.
func (c *Client) ContainerFee(ctx context.Context) (uint64, error) {
fee, err := c.readUInt64Config(ctx, ContainerFeeConfig)
if err != nil {
return 0, err
}
return fee, nil
return c.readUInt64Config(ctx, ContainerFeeConfig)
}
// ContainerAliasFee returns additional fee paid by container owner to each
// alphabet node for container nice name registration.
func (c *Client) ContainerAliasFee(ctx context.Context) (uint64, error) {
fee, err := c.readUInt64Config(ctx, ContainerAliasFeeConfig)
if err != nil {
return 0, err
}
return fee, nil
return c.readUInt64Config(ctx, ContainerAliasFeeConfig)
}
// HomomorphicHashDisabled returns global configuration value of homomorphic hashing
@ -77,23 +56,13 @@ func (c *Client) HomomorphicHashDisabled(ctx context.Context) (bool, error) {
// InnerRingCandidateFee returns global configuration value of fee paid by
// node to be in inner ring candidates list.
func (c *Client) InnerRingCandidateFee(ctx context.Context) (uint64, error) {
fee, err := c.readUInt64Config(ctx, IrCandidateFeeConfig)
if err != nil {
return 0, err
}
return fee, nil
return c.readUInt64Config(ctx, IrCandidateFeeConfig)
}
// WithdrawFee returns global configuration value of fee paid by user to
// withdraw assets from FrostFS contract.
func (c *Client) WithdrawFee(ctx context.Context) (uint64, error) {
fee, err := c.readUInt64Config(ctx, WithdrawFeeConfig)
if err != nil {
return 0, err
}
return fee, nil
return c.readUInt64Config(ctx, WithdrawFeeConfig)
}
// MaintenanceModeAllowed reads admission of "maintenance" state from the
@ -106,29 +75,27 @@ func (c *Client) MaintenanceModeAllowed(ctx context.Context) (bool, error) {
}
func (c *Client) readUInt64Config(ctx context.Context, key string) (uint64, error) {
v, err := c.config(ctx, []byte(key), IntegerAssert)
v, err := c.config(ctx, []byte(key))
if err != nil {
return 0, fmt.Errorf("read netconfig value '%s': %w", key, err)
}
// IntegerAssert is guaranteed to return int64 if the error is nil.
return uint64(v.(int64)), nil
bi, err := v.TryInteger()
if err != nil {
return 0, err
}
return bi.Uint64(), nil
}
// reads boolean value by the given key from the FrostFS network configuration
// stored in the Sidechain. Returns false if key is not presented.
func (c *Client) readBoolConfig(ctx context.Context, key string) (bool, error) {
v, err := c.config(ctx, []byte(key), BoolAssert)
v, err := c.config(ctx, []byte(key))
if err != nil {
if errors.Is(err, ErrConfigNotFound) {
return false, nil
}
return false, fmt.Errorf("read netconfig value '%s': %w", key, err)
}
// BoolAssert is guaranteed to return bool if the error is nil.
return v.(bool), nil
return v.TryBool()
}
// SetConfigPrm groups parameters of SetConfig operation.
@ -277,15 +244,11 @@ func bytesToBool(val []byte) bool {
return false
}
// ErrConfigNotFound is returned when the requested key was not found
// in the network config (returned value is `Null`).
var ErrConfigNotFound = errors.New("config value not found")
// config performs the test invoke of get config value
// method of FrostFS Netmap contract.
//
// Returns ErrConfigNotFound if config key is not found in the contract.
func (c *Client) config(ctx context.Context, key []byte, assert func(stackitem.Item) (any, error)) (any, error) {
func (c *Client) config(ctx context.Context, key []byte) (stackitem.Item, error) {
prm := client.TestInvokePrm{}
prm.SetMethod(configMethod)
prm.SetArgs(key)
@ -301,26 +264,7 @@ func (c *Client) config(ctx context.Context, key []byte, assert func(stackitem.I
configMethod, ln)
}
if _, ok := items[0].(stackitem.Null); ok {
return nil, ErrConfigNotFound
}
return assert(items[0])
}
// IntegerAssert converts stack item to int64.
func IntegerAssert(item stackitem.Item) (any, error) {
return client.IntFromStackItem(item)
}
// StringAssert converts stack item to string.
func StringAssert(item stackitem.Item) (any, error) {
return client.StringFromStackItem(item)
}
// BoolAssert converts stack item to bool.
func BoolAssert(item stackitem.Item) (any, error) {
return client.BoolFromStackItem(item)
return items[0], nil
}
// iterateRecords iterates over all config records and passes them to f.

Some files were not shown because too many files have changed in this diff Show more