diff --git a/.forgejo/ISSUE_TEMPLATE/bug_report.md b/.forgejo/ISSUE_TEMPLATE/bug_report.md deleted file mode 100644 index fb16999..0000000 --- a/.forgejo/ISSUE_TEMPLATE/bug_report.md +++ /dev/null @@ -1,46 +0,0 @@ ---- -name: Bug report -about: Create a report to help us improve -title: '' -labels: community, triage, bug -assignees: '' - ---- - - - -## Expected Behavior - - -## Current Behavior - - -## Possible Solution - - -## Steps to Reproduce (for bugs) - - -1. - -## Context - - -## Regression - - -## Your Environment - -* Version used: -* Server setup and configuration: -* Operating System and version (`uname -a`): diff --git a/.forgejo/ISSUE_TEMPLATE/config.yml b/.forgejo/ISSUE_TEMPLATE/config.yml deleted file mode 100644 index 3ba13e0..0000000 --- a/.forgejo/ISSUE_TEMPLATE/config.yml +++ /dev/null @@ -1 +0,0 @@ -blank_issues_enabled: false diff --git a/.forgejo/ISSUE_TEMPLATE/feature_request.md b/.forgejo/ISSUE_TEMPLATE/feature_request.md deleted file mode 100644 index 5beeb06..0000000 --- a/.forgejo/ISSUE_TEMPLATE/feature_request.md +++ /dev/null @@ -1,28 +0,0 @@ ---- -name: Feature request -about: Suggest an idea for this project -title: '' -labels: community, triage -assignees: '' - ---- - -## Is your feature request related to a problem? Please describe. - - -## Describe the solution you'd like - - -## Describe alternatives you've considered - - -## Additional context - - -## Don't forget to add labels! -- component label (`neofs-adm`, `neofs-storage`, ...) -- issue type (`enhancement`, `refactor`, ...) -- `goodfirstissue`, `helpwanted` if needed -- does this issue belong to an epic? -- priority (`P0`-`P4`) if already triaged -- quarter label (`202XQY`) if possible diff --git a/.forgejo/logo.svg b/.forgejo/logo.svg deleted file mode 100644 index 148c359..0000000 --- a/.forgejo/logo.svg +++ /dev/null @@ -1,70 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.forgejo/workflows/dco.yml b/.forgejo/workflows/dco.yml deleted file mode 100644 index 7c5af84..0000000 --- a/.forgejo/workflows/dco.yml +++ /dev/null @@ -1,21 +0,0 @@ -name: DCO action -on: [pull_request] - -jobs: - dco: - name: DCO - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - - name: Setup Go - uses: actions/setup-go@v3 - with: - go-version: '1.22' - - - name: Run commit format checker - uses: https://git.frostfs.info/TrueCloudLab/dco-go@v3 - with: - from: 'origin/${{ github.event.pull_request.base.ref }}' diff --git a/.forgejo/workflows/pre-commit.yml b/.forgejo/workflows/pre-commit.yml deleted file mode 100644 index b27e7a3..0000000 --- a/.forgejo/workflows/pre-commit.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: Pre-commit hooks - -on: - pull_request: - push: - branches: - - master - -jobs: - precommit: - name: Pre-commit - env: - # Skip pre-commit hooks which are executed by other actions. - SKIP: make-lint,go-staticcheck-repo-mod,go-unit-tests,gofumpt - runs-on: ubuntu-22.04 - # If we use actions/setup-python from either Github or Gitea, - # the line above fails with a cryptic error about not being able to find python. - # So install everything manually. - steps: - - uses: actions/checkout@v3 - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: 1.23 - - name: Set up Python - run: | - apt update - apt install -y pre-commit - - name: Run pre-commit - run: pre-commit run --color=always --hook-stage manual --all-files diff --git a/.forgejo/workflows/tests.yml b/.forgejo/workflows/tests.yml deleted file mode 100644 index 4f1bebe..0000000 --- a/.forgejo/workflows/tests.yml +++ /dev/null @@ -1,116 +0,0 @@ -name: Tests and linters - -on: - pull_request: - push: - branches: - - master - -jobs: - lint: - name: Lint - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: '1.23' - cache: true - - - name: Install linters - run: make lint-install - - - name: Run linters - run: make lint - - tests: - name: Tests - runs-on: ubuntu-latest - strategy: - matrix: - go_versions: [ '1.22', '1.23' ] - fail-fast: false - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: '${{ matrix.go_versions }}' - cache: true - - - name: Run tests - run: make test - - tests-race: - name: Tests with -race - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: '1.22' - cache: true - - - name: Run tests - run: go test ./... -count=1 -race - - staticcheck: - name: Staticcheck - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: '1.23' - cache: true - - - name: Install staticcheck - run: make staticcheck-install - - - name: Run staticcheck - run: make staticcheck-run - - gopls: - name: gopls check - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: '1.22' - cache: true - - - name: Install gopls - run: make gopls-install - - - name: Run gopls - run: make gopls-run - - fumpt: - name: Run gofumpt - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: '1.23' - cache: true - - - name: Install gofumpt - run: make fumpt-install - - - name: Run gofumpt - run: | - make fumpt - git diff --exit-code --quiet diff --git a/.forgejo/workflows/vulncheck.yml b/.forgejo/workflows/vulncheck.yml deleted file mode 100644 index 140434d..0000000 --- a/.forgejo/workflows/vulncheck.yml +++ /dev/null @@ -1,28 +0,0 @@ -name: Vulncheck - -on: - pull_request: - push: - branches: - - master - -jobs: - vulncheck: - name: Vulncheck - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - - name: Setup Go - uses: actions/setup-go@v3 - with: - go-version: '1.23' - check-latest: true - - - name: Install govulncheck - run: go install golang.org/x/vuln/cmd/govulncheck@latest - - - name: Run govulncheck - run: govulncheck ./... diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index c7a3f7a..0000000 --- a/.gitattributes +++ /dev/null @@ -1,2 +0,0 @@ -/**/*.pb.go -diff -merge -/**/*.pb.go linguist-generated=true diff --git a/.gitignore b/.gitignore deleted file mode 100644 index bfdd2f7..0000000 --- a/.gitignore +++ /dev/null @@ -1,22 +0,0 @@ -# IDE -.idea -.vscode - -# Vendoring -vendor - -# tempfiles -.DS_Store -*~ -.cache - -temp -tmp - -# binary -bin/ -release/ - -# coverage -coverage.txt -coverage.html diff --git a/.golangci.yml b/.golangci.yml deleted file mode 100644 index b0499c7..0000000 --- a/.golangci.yml +++ /dev/null @@ -1,75 +0,0 @@ -# This file contains all available configuration options -# with their default values. - -# options for analysis running -run: - # timeout for analysis, e.g. 30s, 5m, default is 1m - timeout: 5m - - # include test files or not, default is true - tests: false - -# output configuration options -output: - # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" - formats: - - format: tab - -# all available settings of specific linters -linters-settings: - exhaustive: - # indicates that switch statements are to be considered exhaustive if a - # 'default' case is present, even if all enum members aren't listed in the - # switch - default-signifies-exhaustive: true - funlen: - lines: 60 # default 60 - statements: 40 # default 40 - gocognit: - min-complexity: 30 # default 30 - importas: - no-unaliased: true - no-extra-aliases: false - unused: - field-writes-are-uses: false - exported-fields-are-used: false - local-variables-are-used: false - -linters: - enable: - # mandatory linters - - govet - - revive - - # some default golangci-lint linters - - errcheck - - gosimple - - godot - - ineffassign - - staticcheck - - typecheck - - unused - - # extra linters - - bidichk - - durationcheck - - exhaustive - - copyloopvar - - gofmt - - goimports - - misspell - - predeclared - - reassign - - whitespace - - containedctx - - funlen - - gocognit - - contextcheck - - importas - - perfsprint - - testifylint - - protogetter - - intrange - - tenv - disable-all: true - fast: false diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml deleted file mode 100644 index 6bd9629..0000000 --- a/.pre-commit-config.yaml +++ /dev/null @@ -1,45 +0,0 @@ -ci: - autofix_prs: false - -repos: - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 - hooks: - - id: check-added-large-files - - id: check-case-conflict - - id: check-executables-have-shebangs - - id: check-shebang-scripts-are-executable - - id: check-merge-conflict - - id: check-json - - id: check-xml - - id: check-yaml - - id: trailing-whitespace - args: [--markdown-linebreak-ext=md] - - id: end-of-file-fixer - exclude: ".key$" - - - repo: https://github.com/shellcheck-py/shellcheck-py - rev: v0.9.0.2 - hooks: - - id: shellcheck - - - repo: local - hooks: - - id: go-unit-tests - name: go unit tests - entry: make test GOFLAGS='' - pass_filenames: false - types: [go] - language: system - - id: golangci-lint - name: golangci-lint check - entry: make lint - pass_filenames: false - types: [go] - language: system - - id: gofumpt - name: gofumpt check - entry: make fumpt - pass_filenames: false - types: [go] - language: system diff --git a/CODEOWNERS b/CODEOWNERS deleted file mode 100644 index d19c96a..0000000 --- a/CODEOWNERS +++ /dev/null @@ -1,3 +0,0 @@ -.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers -.forgejo/.* @potyarkin -Makefile @potyarkin diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 8dada3e..0000000 --- a/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "{}" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright {yyyy} {name of copyright owner} - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/Makefile b/Makefile deleted file mode 100755 index 4fe0061..0000000 --- a/Makefile +++ /dev/null @@ -1,107 +0,0 @@ -#!/usr/bin/make -f -SHELL = bash - -BIN = bin -TMP_DIR := .cache - -SOURCES = $(shell find . -type f -name "*.go" -print) - -LINT_VERSION ?= 1.63.4 -LINT_DIR ?= $(abspath $(BIN))/golangci-lint -LINT_VERSION_DIR = $(LINT_DIR)/$(LINT_VERSION) - -STATICCHECK_VERSION ?= 2024.1.1 -STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck -STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION) - -GOPLS_VERSION ?= v0.17.1 -GOPLS_DIR ?= $(abspath $(BIN))/gopls -GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION) -GOPLS_TEMP_FILE := $(shell mktemp) - -GOFUMPT_VERSION ?= v0.7.0 -GOFUMPT_DIR ?= $(abspath $(BIN))/gofumpt -GOFUMPT_VERSION_DIR ?= $(GOFUMPT_DIR)/$(GOFUMPT_VERSION) - -# Run all code formatters -fmts: fumpt imports - -# Reformat imports -imports: - @echo "⇒ Processing goimports check" - @goimports -w . - -# Run Unit Test with go test -test: GOFLAGS ?= "-count=1" -test: - @echo "⇒ Running go test" - @GOFLAGS="$(GOFLAGS)" go test ./... - -# Activate pre-commit hooks -pre-commit: - pre-commit install -t pre-commit -t commit-msg - -# Deactivate pre-commit hooks -unpre-commit: - pre-commit uninstall -t pre-commit -t commit-msg - -pre-commit-run: - @pre-commit run -a --hook-stage manual - -# Install linters -lint-install: - @rm -rf $(LINT_DIR) - @mkdir -p $(LINT_DIR) - @CGO_ENABLED=1 GOBIN=$(LINT_VERSION_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION) - -# Run linters -lint: - @if [ ! -d "$(LINT_VERSION_DIR)" ]; then \ - make lint-install; \ - fi - $(LINT_VERSION_DIR)/golangci-lint run - -# Install staticcheck -staticcheck-install: - @rm -rf $(STATICCHECK_DIR) - @mkdir -p $(STATICCHECK_DIR) - @GOBIN=$(STATICCHECK_VERSION_DIR) go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION) - -# Run staticcheck -staticcheck-run: - @if [ ! -d "$(STATICCHECK_VERSION_DIR)" ]; then \ - make staticcheck-install; \ - fi - @$(STATICCHECK_VERSION_DIR)/staticcheck ./... - -# Install gopls -gopls-install: - @rm -rf $(GOPLS_DIR) - @mkdir -p $(GOPLS_DIR) - @GOBIN=$(GOPLS_VERSION_DIR) go install golang.org/x/tools/gopls@$(GOPLS_VERSION) - -# Run gopls -gopls-run: - @if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \ - make gopls-install; \ - fi - $(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE) - @if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \ - cat $(GOPLS_TEMP_FILE); \ - exit 1; \ - fi - rm $(GOPLS_TEMP_FILE) - -# Install gofumpt -fumpt-install: - @rm -rf $(GOFUMPT_DIR) - @mkdir -p $(GOFUMPT_DIR) - @GOBIN=$(GOFUMPT_VERSION_DIR) go install mvdan.cc/gofumpt@$(GOFUMPT_VERSION) - -# Run gofumpt -fumpt: - @if [ ! -d "$(GOFUMPT_VERSION_DIR)" ]; then \ - make fumpt-install; \ - fi - @echo "⇒ Processing gofumpt check" - $(GOFUMPT_VERSION_DIR)/gofumpt -l -w . diff --git a/README.md b/README.md index 36d5fb8..7463f9e 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,3 @@ -# Quality of Service (QoS) Go libraries for FrostFS object storage +# WIP area: this repo is just a fork! -See package documentation -at [pkg.go.dev](https://pkg.go.dev/git.frostfs.info/TrueCloudLab/frostfs-qos) - -## License and copyright - -Copyright 2023-2025 FrostFS contributors - -``` - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -``` +Useful things may be published only in [other branches](../../../branches) diff --git a/go.mod b/go.mod deleted file mode 100644 index 35f0342..0000000 --- a/go.mod +++ /dev/null @@ -1,21 +0,0 @@ -module git.frostfs.info/TrueCloudLab/frostfs-qos - -go 1.22 - -require ( - github.com/stretchr/testify v1.9.0 - google.golang.org/grpc v1.69.2 -) - -require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/net v0.30.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/text v0.19.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect - google.golang.org/protobuf v1.35.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) - -require golang.org/x/sync v0.10.0 diff --git a/go.sum b/go.sum deleted file mode 100644 index 7f2985d..0000000 --- a/go.sum +++ /dev/null @@ -1,54 +0,0 @@ -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= -go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= -google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/assert/cond.go b/internal/assert/cond.go deleted file mode 100644 index 4a1b201..0000000 --- a/internal/assert/cond.go +++ /dev/null @@ -1,9 +0,0 @@ -package assert - -import "strings" - -func Cond(cond bool, details ...string) { - if !cond { - panic(strings.Join(details, " ")) - } -} diff --git a/limiting/limiter.go b/limiting/limiter.go deleted file mode 100644 index 0f550f8..0000000 --- a/limiting/limiter.go +++ /dev/null @@ -1,75 +0,0 @@ -package limiting - -import ( - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" -) - -type ReleaseFunc func() - -type Limiter interface { - // Acquire attempts to reserve a slot without blocking. - // - // Returns a release function and true if successful. The function must be - // called to release the limiter. The function must be called exactly once. - // Calling the function more that once will cause incorrect behavior of the - // limiter. - // - // Returns nil and false if fails. - // - // If the key was not defined in the limiter, no limit is applied. - Acquire(key string) (ReleaseFunc, bool) -} - -type SemaphoreLimiter struct { - m map[string]*semaphore.Semaphore -} - -// KeyLimit defines a concurrency limit for a set of keys. -// -// All keys of one set share the same limit. -// Keys of different sets have separate limits. -// -// Sets must not overlap. -type KeyLimit struct { - Keys []string - Limit int64 -} - -func NewSemaphoreLimiter(limits []KeyLimit) (*SemaphoreLimiter, error) { - lr := SemaphoreLimiter{make(map[string]*semaphore.Semaphore)} - for _, limit := range limits { - if limit.Limit < 0 { - return nil, fmt.Errorf("invalid limit %d", limit.Limit) - } - sem := semaphore.NewSemaphore(limit.Limit) - - if err := lr.addLimit(&limit, sem); err != nil { - return nil, err - } - } - return &lr, nil -} - -func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error { - for _, key := range limit.Keys { - if _, exists := lr.m[key]; exists { - return fmt.Errorf("duplicate key %q", key) - } - lr.m[key] = sem - } - return nil -} - -func (lr *SemaphoreLimiter) Acquire(key string) (ReleaseFunc, bool) { - sem, ok := lr.m[key] - if !ok { - return func() {}, true - } - - if ok := sem.Acquire(); ok { - return sem.Release, true - } - return nil, false -} diff --git a/limiting/limiter_test.go b/limiting/limiter_test.go deleted file mode 100644 index c6087f1..0000000 --- a/limiting/limiter_test.go +++ /dev/null @@ -1,138 +0,0 @@ -package limiting_test - -import ( - "sync" - "sync/atomic" - "testing" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" - "github.com/stretchr/testify/require" -) - -const ( - operationDuration = 10 * time.Millisecond - operationCount = 64 -) - -type testCase struct { - keys []string - limit int64 - withoutLimit bool - failCount atomic.Int64 -} - -func TestLimiter(t *testing.T) { - testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) { - return limiting.NewSemaphoreLimiter(kl) - }) -} - -func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) { - t.Run("duplicate key", func(t *testing.T) { - _, err := getLimiter([]limiting.KeyLimit{ - {[]string{"A", "B"}, 10}, - {[]string{"B", "C"}, 10}, - }) - require.Error(t, err) - }) - - testCases := []*testCase{ - {keys: []string{"A"}, limit: operationCount / 4}, - {keys: []string{"B"}, limit: operationCount / 2}, - {keys: []string{"C", "D"}, limit: operationCount / 4}, - {keys: []string{"E"}, limit: 2 * operationCount}, - {keys: []string{"F"}, withoutLimit: true}, - } - - lr, err := getLimiter(getLimits(testCases)) - require.NoError(t, err) - - tasks := createTestTasks(testCases, lr) - - t.Run("first run", func(t *testing.T) { - executeTasks(tasks...) - verifyResults(t, testCases) - }) - - resetFailCounts(testCases) - - t.Run("repeated run", func(t *testing.T) { - executeTasks(tasks...) - verifyResults(t, testCases) - }) -} - -func getLimits(testCases []*testCase) []limiting.KeyLimit { - var limits []limiting.KeyLimit - for _, tc := range testCases { - if tc.withoutLimit { - continue - } - limits = append(limits, limiting.KeyLimit{ - Keys: tc.keys, - Limit: int64(tc.limit), - }) - } - return limits -} - -func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() { - var tasks []func() - for _, tc := range testCases { - for _, key := range tc.keys { - tasks = append(tasks, func() { - executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) }) - }) - } - } - return tasks -} - -func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) { - release, ok := lr.Acquire(key) - if !ok { - tc.failCount.Add(1) - return - } - defer release() - time.Sleep(operationDuration) -} - -func executeTasks(tasks ...func()) { - var g sync.WaitGroup - - g.Add(len(tasks)) - for _, task := range tasks { - go func() { - defer g.Done() - task() - }() - } - g.Wait() -} - -func executeTaskN(N int, task func()) { - tasks := make([]func(), N) - for i := range N { - tasks[i] = task - } - executeTasks(tasks...) -} - -func verifyResults(t *testing.T, testCases []*testCase) { - for _, tc := range testCases { - var expectedFailCount int64 - if !tc.withoutLimit { - numKeys := int64(len(tc.keys)) - expectedFailCount = max(operationCount*numKeys-tc.limit, 0) - } - require.Equal(t, expectedFailCount, tc.failCount.Load()) - } -} - -func resetFailCounts(testCases []*testCase) { - for _, tc := range testCases { - tc.failCount.Store(0) - } -} diff --git a/limiting/semaphore/semaphore.go b/limiting/semaphore/semaphore.go deleted file mode 100644 index c43dfc6..0000000 --- a/limiting/semaphore/semaphore.go +++ /dev/null @@ -1,27 +0,0 @@ -package semaphore - -import ( - "sync/atomic" -) - -type Semaphore struct { - count atomic.Int64 - limit int64 -} - -func NewSemaphore(size int64) *Semaphore { - return &Semaphore{limit: size} -} - -func (s *Semaphore) Acquire() bool { - v := s.count.Add(1) - if v > s.limit { - s.count.Add(-1) - return false - } - return true -} - -func (s *Semaphore) Release() { - s.count.Add(-1) -} diff --git a/limiting/semaphore/semaphore_bench.result b/limiting/semaphore/semaphore_bench.result deleted file mode 100644 index 5883c60..0000000 --- a/limiting/semaphore/semaphore_bench.result +++ /dev/null @@ -1,26 +0,0 @@ -goos: linux -goarch: amd64 -pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore -cpu: 12th Gen Intel(R) Core(TM) i5-1235U -BenchmarkSemaphore/semaphore_size=1/lock_duration=0s-12 6113605 1964 ns/op 383.1 acquire-ns/op 203.5 release-ns/op 0.6892 success-rate -BenchmarkSemaphore/semaphore_size=1/lock_duration=1µs-12 5826655 2067 ns/op 382.0 acquire-ns/op 307.0 release-ns/op 0.1460 success-rate -BenchmarkSemaphore/semaphore_size=1/lock_duration=10µs-12 5977272 2033 ns/op 370.4 acquire-ns/op 321.4 release-ns/op 0.05408 success-rate -BenchmarkSemaphore/semaphore_size=1/lock_duration=100µs-12 5862900 2030 ns/op 365.6 acquire-ns/op 343.1 release-ns/op 0.01242 success-rate -BenchmarkSemaphore/semaphore_size=10/lock_duration=0s-12 5637050 2173 ns/op 365.2 acquire-ns/op 261.7 release-ns/op 0.9765 success-rate -BenchmarkSemaphore/semaphore_size=10/lock_duration=1µs-12 5470316 2225 ns/op 390.4 acquire-ns/op 357.2 release-ns/op 0.9249 success-rate -BenchmarkSemaphore/semaphore_size=10/lock_duration=10µs-12 5584527 2134 ns/op 395.2 acquire-ns/op 339.0 release-ns/op 0.5409 success-rate -BenchmarkSemaphore/semaphore_size=10/lock_duration=100µs-12 5841032 2036 ns/op 369.4 acquire-ns/op 330.7 release-ns/op 0.1182 success-rate -BenchmarkSemaphore/semaphore_size=100/lock_duration=0s-12 5600013 2159 ns/op 369.9 acquire-ns/op 271.1 release-ns/op 0.9976 success-rate -BenchmarkSemaphore/semaphore_size=100/lock_duration=1µs-12 5323606 2280 ns/op 394.0 acquire-ns/op 368.9 release-ns/op 0.9697 success-rate -BenchmarkSemaphore/semaphore_size=100/lock_duration=10µs-12 5133394 2353 ns/op 405.8 acquire-ns/op 374.5 release-ns/op 0.9498 success-rate -BenchmarkSemaphore/semaphore_size=100/lock_duration=100µs-12 5238136 2303 ns/op 387.2 acquire-ns/op 362.2 release-ns/op 0.8749 success-rate -BenchmarkSemaphore/semaphore_size=1000/lock_duration=0s-12 5408720 2180 ns/op 367.6 acquire-ns/op 271.5 release-ns/op 0.9992 success-rate -BenchmarkSemaphore/semaphore_size=1000/lock_duration=1µs-12 5114854 2366 ns/op 407.9 acquire-ns/op 376.4 release-ns/op 0.9966 success-rate -BenchmarkSemaphore/semaphore_size=1000/lock_duration=10µs-12 4659454 2438 ns/op 412.2 acquire-ns/op 385.9 release-ns/op 0.9800 success-rate -BenchmarkSemaphore/semaphore_size=1000/lock_duration=100µs-12 4837894 2482 ns/op 401.7 acquire-ns/op 380.9 release-ns/op 0.9725 success-rate -BenchmarkSemaphore/semaphore_size=10000/lock_duration=0s-12 5403058 2188 ns/op 367.5 acquire-ns/op 273.1 release-ns/op 1.000 success-rate -BenchmarkSemaphore/semaphore_size=10000/lock_duration=1µs-12 5086929 2306 ns/op 390.6 acquire-ns/op 376.3 release-ns/op 1.000 success-rate -BenchmarkSemaphore/semaphore_size=10000/lock_duration=10µs-12 5059968 2378 ns/op 410.2 acquire-ns/op 384.5 release-ns/op 1.000 success-rate -BenchmarkSemaphore/semaphore_size=10000/lock_duration=100µs-12 4909206 2420 ns/op 408.4 acquire-ns/op 383.4 release-ns/op 1.000 success-rate -PASS -ok git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore 284.895s diff --git a/limiting/semaphore/semaphore_bench_test.go b/limiting/semaphore/semaphore_bench_test.go deleted file mode 100644 index f4837e8..0000000 --- a/limiting/semaphore/semaphore_bench_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package semaphore_test - -import ( - "fmt" - "sync" - "testing" - "time" - - semaphores "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" -) - -const maxWorkers = 10_000_000 - -type benchmarkSemaphoreMetrics struct { - mu sync.Mutex - - acquireDuration, - releaseDuration time.Duration - - acquireCount, - releaseCount uint64 -} - -func (c *benchmarkSemaphoreMetrics) reportAcquire(duration time.Duration) { - c.mu.Lock() - defer c.mu.Unlock() - c.acquireDuration += duration - c.acquireCount += 1 -} - -func (c *benchmarkSemaphoreMetrics) reportRelease(duration time.Duration) { - c.mu.Lock() - defer c.mu.Unlock() - c.releaseDuration += duration - c.releaseCount += 1 -} - -func (c *benchmarkSemaphoreMetrics) getResults() (timePerAcquire, timePerRelease, successRate float64) { - timePerAcquire = float64(c.acquireDuration) / float64(c.acquireCount) - timePerRelease = float64(c.releaseDuration) / float64(c.releaseCount) - successRate = float64(c.releaseCount) / float64(c.acquireCount) - return -} - -func BenchmarkSemaphore(b *testing.B) { - sizes := []int64{1, 10, 100, 1000, 10000} - lockDurations := []time.Duration{0, time.Microsecond, 10 * time.Microsecond, 100 * time.Microsecond} - - for _, size := range sizes { - for _, lockDuration := range lockDurations { - name := fmt.Sprintf("semaphore_size=%d/lock_duration=%v", size, lockDuration) - b.Run(name, func(b *testing.B) { - benchmarkSemaphore(b, semaphores.NewSemaphore(size), lockDuration) - }) - } - } -} - -func benchmarkSemaphore(b *testing.B, sem *semaphores.Semaphore, lockDuration time.Duration) { - var m benchmarkSemaphoreMetrics - var g errgroup.Group - g.SetLimit(maxWorkers) - - for range b.N { - g.Go(func() error { - now := time.Now() - ok := sem.Acquire() - m.reportAcquire(time.Since(now)) - - if !ok { - return nil - } - - time.Sleep(lockDuration) - - now = time.Now() - sem.Release() - m.reportRelease(time.Since(now)) - - return nil - }) - } - require.NoError(b, g.Wait()) - - require.Equal(b, uint64(b.N), m.acquireCount) - require.LessOrEqual(b, m.releaseCount, m.acquireCount) - - timePerAcquire, timePerRelease, successRate := m.getResults() - - b.ReportMetric(timePerAcquire, "acquire-ns/op") - b.ReportMetric(timePerRelease, "release-ns/op") - b.ReportMetric(successRate, "success-rate") -} diff --git a/scheduling/clock.go b/scheduling/clock.go deleted file mode 100644 index 6fa3d84..0000000 --- a/scheduling/clock.go +++ /dev/null @@ -1,95 +0,0 @@ -package scheduling - -import ( - "math" - "sync" - "time" -) - -type clock interface { - now() float64 - runAt(ts float64, f func()) - close() -} - -type scheduleInfo struct { - ts float64 - f func() -} - -type systemClock struct { - since time.Time - schedule chan scheduleInfo - wg sync.WaitGroup -} - -func newSystemClock() *systemClock { - c := &systemClock{ - since: time.Now(), - schedule: make(chan scheduleInfo), - } - c.start() - return c -} - -func (c *systemClock) now() float64 { - return time.Since(c.since).Seconds() -} - -func (c *systemClock) runAt(ts float64, f func()) { - c.schedule <- scheduleInfo{ts: ts, f: f} -} - -func (c *systemClock) close() { - close(c.schedule) - c.wg.Wait() -} - -func (c *systemClock) start() { - c.wg.Add(1) - go func() { - defer c.wg.Done() - t := time.NewTimer(0) - <-t.C - currentTs := math.MaxFloat64 - var currentTask func() - for { - select { - case <-t.C: - if currentTask != nil { - c.wg.Add(1) - f := currentTask - go func() { - defer c.wg.Done() - f() - }() - currentTask = nil - } - currentTs = math.MaxFloat64 - case s, ok := <-c.schedule: - if !ok { - return - } - if s.ts >= currentTs { - // current timer will fire earlier - // so next scheduleRequest will push new schedule event - continue - } - var d time.Duration - now := c.now() - if now < s.ts { - d = time.Duration((s.ts - now) * 1e9) - } - if !t.Stop() { - select { - case <-t.C: - default: - } - } - t.Reset(d) - currentTask = s.f - currentTs = s.ts - } - } - }() -} diff --git a/scheduling/mclock.go b/scheduling/mclock.go deleted file mode 100644 index 64c62a8..0000000 --- a/scheduling/mclock.go +++ /dev/null @@ -1,406 +0,0 @@ -package scheduling - -import ( - "container/heap" - "context" - "errors" - "math" - "sync" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert" -) - -const ( - invalidIndex = -1 - undefinedReservation float64 = -1.0 -) - -var ( - ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed") - ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded") - ErrMClockSchedulerUnknownTag = errors.New("unknown tag") - ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") - ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") - ErrTagRequestsProhibited = errors.New("tag requests are prohibited") -) - -type request struct { - tag string - ts float64 - - reservation float64 - limit float64 - shares float64 - - reservationIdx int - limitIdx int - sharesIdx int - readyIdx int - - scheduled chan struct{} - canceled chan struct{} -} - -// ReleaseFunc is the type of function that should be called after the request is completed. -type ReleaseFunc func() - -// TagInfo contains reserved IOPS, IOPS limit and share values for a tag. -type TagInfo struct { - ReservedIOPS *float64 - LimitIOPS *float64 - Share float64 - Prohibited bool -} - -// MClock is mClock scheduling algorithm implementation. -// -// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details. -type MClock struct { - runLimit uint64 - waitLimit int - clock clock - idleTimeout float64 - tagInfo map[string]TagInfo - - mtx sync.Mutex - previous map[string]*request - inProgress uint64 - reservationQueue *queue - limitQueue *queue - sharesQueue *queue - readyQueue *queue - closed bool -} - -// NewMClock creates new MClock scheduler instance with -// runLimit maximum allowed count of running requests and -// waitLimit maximum allowed count of waiting requests -// for tags specified by tagInfo. The value of idleTimeout defines -// the difference between the current time and the time of -// the previous request, at which the tag considered idle. -// If idleTimeout is negative, it means that there is no idle tags allowed. -// If waitLimit equals zero, it means that there is no limit on the -// number of waiting requests. -func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) { - if err := validateParams(runLimit, tagInfo); err != nil { - return nil, err - } - result := &MClock{ - runLimit: runLimit, - waitLimit: int(waitLimit), - clock: newSystemClock(), - idleTimeout: idleTimeout.Seconds(), - tagInfo: tagInfo, - - reservationQueue: &queue{}, - limitQueue: &queue{}, - sharesQueue: &queue{}, - readyQueue: &queue{}, - } - - previous := make(map[string]*request) - for tag := range tagInfo { - previous[tag] = &request{ - tag: tag, - reservationIdx: invalidIndex, - limitIdx: invalidIndex, - sharesIdx: invalidIndex, - } - } - result.previous = previous - - return result, nil -} - -// RequestArrival schedules new request with tag value. -// Method call is blocked until one of the following events occurs: -// request with the tag is scheduled for execution, -// context ctx is canceled or the scheduler is closed. -// If the method call returned non-nil ReleaseFunc, -// then it must be called after the request is completed. -func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) { - req, release, err := q.pushRequest(tag) - if err != nil { - return nil, err - } - select { - case <-ctx.Done(): - q.dropRequest(req) - return nil, ctx.Err() - case <-req.scheduled: - return release, nil - case <-req.canceled: - return nil, ErrMClockSchedulerClosed - } -} - -// Close closes MClock scheduler. -// No new requests for scheduling will be accepted after the closing. -func (q *MClock) Close() { - q.mtx.Lock() - q.closed = true - for q.limitQueue.Len() > 0 { - item := heap.Pop(q.limitQueue).(*limitMQueueItem) - close(item.r.canceled) - q.removeFromQueues(item.r) - } - q.mtx.Unlock() - - q.clock.close() -} - -func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { - if runLimit == 0 { - return ErrInvalidRunLimit - } - for _, v := range tagInfo { - if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) { - return ErrInvalidTagInfo - } - if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) { - return ErrInvalidTagInfo - } - if math.IsNaN(v.Share) || v.Share <= float64(0) { - return ErrInvalidTagInfo - } - } - return nil -} - -func (q *MClock) dropRequest(req *request) { - q.mtx.Lock() - defer q.mtx.Unlock() - - select { - case <-req.scheduled: - assert.Cond(q.inProgress > 0, "invalid requests count") - q.inProgress-- - default: - } - - q.removeFromQueues(req) -} - -func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { - q.mtx.Lock() - defer q.mtx.Unlock() - - if q.closed { - return nil, nil, ErrMClockSchedulerClosed - } - if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit { - return nil, nil, ErrMClockSchedulerRequestLimitExceeded - } - - now := q.clock.now() - tagInfo, ok := q.tagInfo[tag] - if !ok { - return nil, nil, ErrMClockSchedulerUnknownTag - } - if tagInfo.Prohibited { - return nil, nil, ErrTagRequestsProhibited - } - prev, ok := q.previous[tag] - assert.Cond(ok, "undefined previous:", tag) - - if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout - q.adjustTags(now, tag) - } - - r := &request{ - tag: tag, - ts: now, - shares: max(prev.shares+1.0/tagInfo.Share, now), - reservationIdx: invalidIndex, - limitIdx: invalidIndex, - sharesIdx: invalidIndex, - readyIdx: invalidIndex, - scheduled: make(chan struct{}), - canceled: make(chan struct{}), - } - if tagInfo.ReservedIOPS != nil { - r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now) - } else { - r.reservation = undefinedReservation - } - - if tagInfo.LimitIOPS != nil { - r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now) - } else { - r.limit = max(prev.limit, now) - } - - q.previous[tag] = r - if tagInfo.ReservedIOPS != nil { - heap.Push(q.reservationQueue, &reservationMQueueItem{r: r}) - } - heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) - heap.Push(q.limitQueue, &limitMQueueItem{r: r}) - q.scheduleRequestUnsafe() - - return r, q.requestCompleted, nil -} - -func (q *MClock) adjustTags(now float64, idleTag string) { - if q.sharesQueue.Len() == 0 { - return - } - minShare := q.sharesQueue.items[0].ts() - for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed - limitItem := item.(*limitMQueueItem) - if limitItem.r.tag == idleTag { - continue - } - limitItem.r.shares -= (minShare - now) - if limitItem.r.sharesIdx != invalidIndex { - heap.Fix(q.sharesQueue, limitItem.r.sharesIdx) - } - if limitItem.r.readyIdx != invalidIndex { - heap.Fix(q.readyQueue, limitItem.r.readyIdx) - } - } -} - -func (q *MClock) scheduleRequest() { - q.mtx.Lock() - defer q.mtx.Unlock() - - if q.closed { - return - } - - q.scheduleRequestUnsafe() -} - -func (q *MClock) scheduleRequestUnsafe() { - if q.inProgress >= q.runLimit { - return - } - now := q.clock.now() - q.scheduleByReservation(now) - if q.inProgress >= q.runLimit { - return - } - q.scheduleByLimitAndWeight(now) - if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) { - return - } - q.setNextScheduleTimer(now) -} - -func (q *MClock) setNextScheduleTimer(now float64) { - nextTs := math.MaxFloat64 - var hasNext bool - if q.reservationQueue.Len() > 0 { - nextTs = q.reservationQueue.items[0].ts() - hasNext = true - } - if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { - nextTs = q.limitQueue.items[0].ts() - hasNext = true - } - if nextTs <= now { - // should not happen as we always compare .ts() <= now - return - } - if !hasNext { - return - } - q.clock.runAt(nextTs, q.scheduleRequest) -} - -func (q *MClock) scheduleByLimitAndWeight(now float64) { - for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 { - ready := heap.Pop(q.limitQueue).(*limitMQueueItem) - heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) - } - - for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 { - next := heap.Pop(q.readyQueue).(*readyMQueueItem) - hadReservation := false - if next.r.reservationIdx != invalidIndex { - hadReservation = true - heap.Remove(q.reservationQueue, next.r.reservationIdx) - } - q.removeFromQueues(next.r) - - tagInfo, ok := q.tagInfo[next.r.tag] - assert.Cond(ok, "unknown tag:", next.r.tag) - if tagInfo.ReservedIOPS != nil && hadReservation { - var updated bool - for _, i := range q.reservationQueue.items { - ri := i.(*reservationMQueueItem) - if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation { - ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS - updated = true - } - } - if updated { - heap.Init(q.reservationQueue) - } - } - - select { - case <-next.r.canceled: - continue - default: - } - - assertIndexInvalid(next.r) - q.inProgress++ - close(next.r.scheduled) - } -} - -func (q *MClock) scheduleByReservation(now float64) { - for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 { - next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) - q.removeFromQueues(next.r) - - select { - case <-next.r.canceled: - continue - default: - } - - assertIndexInvalid(next.r) - q.inProgress++ - close(next.r.scheduled) - } -} - -func (q *MClock) removeFromQueues(r *request) { - if r.limitIdx != invalidIndex { - heap.Remove(q.limitQueue, r.limitIdx) - } - if r.sharesIdx != invalidIndex { - heap.Remove(q.sharesQueue, r.sharesIdx) - } - if r.readyIdx != invalidIndex { - heap.Remove(q.readyQueue, r.readyIdx) - } - if r.reservationIdx != invalidIndex { - heap.Remove(q.reservationQueue, r.reservationIdx) - } -} - -func (q *MClock) requestCompleted() { - q.mtx.Lock() - defer q.mtx.Unlock() - - if q.closed { - return - } - - assert.Cond(q.inProgress > 0, "invalid requests count") - q.inProgress-- - q.scheduleRequestUnsafe() -} - -func assertIndexInvalid(r *request) { - assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1") - assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1") - assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1") - assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1") -} diff --git a/scheduling/mclock_bench.result b/scheduling/mclock_bench.result deleted file mode 100644 index bdd4834..0000000 --- a/scheduling/mclock_bench.result +++ /dev/null @@ -1,172 +0,0 @@ -Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -tags integration -bench ^BenchmarkMClock$ git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling -count=1 - -goos: linux -goarch: amd64 -pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling -cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz -BenchmarkMClock/impl=noop/parallelism=1-8 8623 136817 ns/op 0 B/op 0 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=1-8 7368 140674 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=2-8 8486 140394 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=4-8 8500 141410 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=8-8 8268 142724 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=16-8 8431 142548 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=1-8 8505 142035 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=2-8 7845 142658 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=4-8 8473 140029 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=8-8 8518 142607 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=16-8 8578 141002 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=1-8 8557 141858 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=2-8 8353 142742 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=4-8 8475 142753 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=8-8 8433 141319 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=16-8 8480 141825 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=1-8 7827 141525 ns/op 371 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=2-8 7935 140939 ns/op 370 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=4-8 8472 140988 ns/op 368 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=8-8 8373 142260 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=16-8 8383 142239 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=1-8 5727 206852 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=2-8 6516 178739 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=4-8 7300 163438 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=8-8 7807 152344 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=16-8 8443 147051 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=1-8 6062 205018 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=2-8 6526 182511 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=4-8 7341 163028 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=8-8 7930 153741 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=16-8 7804 148216 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=1-8 5485 207763 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=2-8 5774 181830 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=4-8 7262 165102 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=8-8 7231 152958 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=16-8 7849 146705 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=1-8 5275 206549 ns/op 368 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=2-8 6115 180053 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=4-8 7264 163943 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=8-8 7810 152008 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=16-8 7875 147107 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=noop/parallelism=8-8 8589 139356 ns/op 0 B/op 0 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=1-8 7916 142917 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=2-8 8392 141914 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=4-8 8444 141011 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=8-8 8419 140638 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=16-8 8473 141018 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=1-8 8487 139941 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=2-8 7938 142745 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=4-8 8522 140837 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=8-8 8431 141361 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=16-8 8390 142171 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=1-8 8449 140695 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=2-8 8467 140622 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=4-8 8460 140925 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=8-8 8487 141316 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=16-8 7876 141374 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=1-8 7887 140590 ns/op 371 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=2-8 8328 142214 ns/op 370 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=4-8 8475 141472 ns/op 368 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=8-8 8402 141861 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=16-8 8509 142173 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=1-8 5490 207911 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=2-8 6481 182955 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=4-8 6816 165103 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=8-8 6901 155528 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=16-8 7690 148762 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=1-8 5437 205208 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=2-8 6092 183311 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=4-8 6907 162595 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=8-8 7756 151761 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=16-8 7855 146382 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=1-8 5468 206883 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=2-8 6061 180350 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=4-8 6795 163866 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=8-8 7350 152345 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=16-8 7869 145708 ns/op 374 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=1-8 5283 207099 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=2-8 6799 180029 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=4-8 7324 164306 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=8-8 7770 152377 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=16-8 8342 146888 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=noop/parallelism=32-8 8604 140481 ns/op 0 B/op 0 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=1-8 8491 142215 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=2-8 8508 140537 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=4-8 8320 142631 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=8-8 8368 142430 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=16-8 8432 141733 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=1-8 7855 141754 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=2-8 7858 141304 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=4-8 8545 140996 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=8-8 8437 142022 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=16-8 8418 142653 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=1-8 8448 141117 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=2-8 8530 142164 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=4-8 7944 142449 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=8-8 8551 139223 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=16-8 8491 140160 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=1-8 8354 141835 ns/op 371 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=2-8 7880 141608 ns/op 370 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=4-8 7940 140794 ns/op 368 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=8-8 8414 140646 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=16-8 8373 140890 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=1-8 5256 209447 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=2-8 6451 183969 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=4-8 7326 163980 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=8-8 7862 152768 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=16-8 8390 147437 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=1-8 5228 206086 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=2-8 6471 181844 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=4-8 7318 163604 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=8-8 7827 151880 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=16-8 8362 146623 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=1-8 5541 210639 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=2-8 5818 183541 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=4-8 6910 163609 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=8-8 7797 152752 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=16-8 7344 146966 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=1-8 5746 206651 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=2-8 6490 182702 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=4-8 7250 164727 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=8-8 7386 152508 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=16-8 8379 146547 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=noop/parallelism=64-8 8486 138281 ns/op 0 B/op 0 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=1-8 8472 142782 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=2-8 8437 140925 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=4-8 8338 141035 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=8-8 8487 142288 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=16-8 8366 142353 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=1-8 8510 140838 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=2-8 7935 142844 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=4-8 8218 139362 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=8-8 7977 140291 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=16-8 8371 140322 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=1-8 8524 140484 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=2-8 8461 142431 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=4-8 8420 141652 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=8-8 8385 140956 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=16-8 8355 142509 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=1-8 7239 141018 ns/op 371 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=2-8 8467 141807 ns/op 370 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=4-8 8420 140763 ns/op 368 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=8-8 8474 140264 ns/op 366 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=16-8 8413 142191 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=1-8 5474 208031 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=2-8 5706 182794 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=4-8 7248 165044 ns/op 364 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=8-8 7825 153229 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=16-8 7879 148568 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=1-8 5278 211267 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=2-8 6108 183247 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=4-8 7338 163152 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=8-8 7339 154054 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=16-8 7750 146000 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=1-8 5716 208259 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=2-8 6450 185159 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=4-8 7285 168077 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=8-8 7357 151950 ns/op 372 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=16-8 8257 147548 ns/op 373 B/op 9 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=1-8 5245 207383 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=2-8 6115 179041 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=4-8 6831 164377 ns/op 367 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=8-8 7378 152743 ns/op 365 B/op 8 allocs/op -BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=16-8 7837 148694 ns/op 366 B/op 8 allocs/op -PASS -ok git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling 194.532s diff --git a/scheduling/mclock_bench_test.go b/scheduling/mclock_bench_test.go deleted file mode 100644 index 09989e6..0000000 --- a/scheduling/mclock_bench_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package scheduling - -import ( - "context" - "fmt" - "math" - "math/rand/v2" - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -type noopMClockScheduler struct{} - -var ( - releaseStub ReleaseFunc = func() {} - defaultLimit float64 = 100_000 - shortReservation float64 = 1 - medReservation float64 = 100 - largeReservation float64 = 10_000 -) - -func (s *noopMClockScheduler) RequestArrival(context.Context, string) ReleaseFunc { - return releaseStub -} - -func BenchmarkMClock(b *testing.B) { - tagsCount := []int{1, 2, 4, 8, 16} - ioDuration := time.Millisecond - parallelismValues := []int{1, 8, 32, 64} - limits := []*float64{nil, &defaultLimit} - reservations := []*float64{nil, &shortReservation, &medReservation, &largeReservation} - for _, parallelism := range parallelismValues { - b.SetParallelism(parallelism) - - noopMClock := &noopMClockScheduler{} - b.Run(fmt.Sprintf("impl=noop/parallelism=%d", parallelism), func(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - release := noopMClock.RequestArrival(context.Background(), "tag") - time.Sleep(ioDuration) - release() - } - }) - }) - - for _, limit := range limits { - for _, reservation := range reservations { - for _, tags := range tagsCount { - tagInfos := make(map[string]TagInfo) - for tag := 0; tag < tags; tag++ { - tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{Share: 50, LimitIOPS: limit, ReservedIOPS: reservation} - } - - mClockQ, _ := NewMClock(math.MaxUint64, math.MaxUint64, tagInfos, time.Hour) - - resStr := "no" - if reservation != nil { - resStr = strconv.FormatFloat(*reservation, 'f', 1, 64) - } - limitStr := "no" - if limit != nil { - limitStr = strconv.FormatFloat(*limit, 'f', 1, 64) - } - b.Run(fmt.Sprintf("impl=mclock/limit=%s/reservation=%s/parallelism=%d/tags=%d", limitStr, resStr, parallelism, tags), func(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - tag := rand.Int64N(int64(tags)) - release, err := mClockQ.RequestArrival(context.Background(), "tag"+strconv.FormatInt(int64(tag), 10)) - require.NoError(b, err) - time.Sleep(ioDuration) - release() - } - }) - }) - } - } - } - - } -} diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go deleted file mode 100644 index 6433990..0000000 --- a/scheduling/mclock_test.go +++ /dev/null @@ -1,584 +0,0 @@ -package scheduling - -import ( - "context" - "math" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" -) - -func TestMClockSharesScheduling(t *testing.T) { - t.Parallel() - reqCount := 1000 - reqCount = (reqCount / 2) * 2 - q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ - "class1": {Share: 2}, - "class2": {Share: 1}, - }, 100) - require.NoError(t, err) - q.clock = &noopClock{} - - var releases []ReleaseFunc - var requests []*request - tag := "class1" - for i := 0; i < reqCount/2; i++ { - req, release, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - releases = append(releases, release) - } - tag = "class2" - for i := 0; i < reqCount/2; i++ { - req, release, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - releases = append(releases, release) - } - - var result []string - var wg sync.WaitGroup - for i := 0; i < reqCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - <-requests[i].scheduled - result = append(result, requests[i].tag) - releases[i]() - }() - } - wg.Wait() - - // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., - // because the ratio is 2 to 1. - // However, there may be deviations due to rounding and sorting. - result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail - var class1Count int - var class2Count int - var class2MaxSeq int - for _, res := range result { - switch res { - case "class1": - class1Count++ - class2MaxSeq = 0 - case "class2": - class2Count++ - class2MaxSeq++ - require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row - default: - require.Fail(t, "unknown tag") - } - } - - require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) -} - -var _ clock = &noopClock{} - -type noopClock struct { - v float64 - runAtValue *float64 -} - -func (n *noopClock) now() float64 { - return n.v -} - -func (n *noopClock) runAt(ts float64, f func()) { - n.runAtValue = &ts -} - -func (n *noopClock) close() {} - -func TestMClockRequestCancel(t *testing.T) { - t.Parallel() - q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ - "class1": {Share: 2}, - "class2": {Share: 1}, - }, 100) - require.NoError(t, err) - q.clock = &noopClock{} - - release1, err := q.RequestArrival(context.Background(), "class1") - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - release2, err := q.RequestArrival(ctx, "class1") - require.Nil(t, release2) - require.ErrorIs(t, err, context.DeadlineExceeded) - - require.Equal(t, 0, q.readyQueue.Len()) - require.Equal(t, 0, q.sharesQueue.Len()) - require.Equal(t, 0, q.limitQueue.Len()) - require.Equal(t, 0, q.reservationQueue.Len()) - - release1() -} - -func TestMClockLimitScheduling(t *testing.T) { - t.Parallel() - reqCount := 100 - reqCount = (reqCount / 2) * 2 - limit := 1.0 - cl := &noopClock{} - q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ - "class1": {Share: 2, LimitIOPS: &limit}, - "class2": {Share: 1, LimitIOPS: &limit}, - }, 100) - require.NoError(t, err) - q.clock = cl - - var releases []ReleaseFunc - var requests []*request - tag := "class1" - for i := 0; i < reqCount/2; i++ { - req, release, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - releases = append(releases, release) - } - tag = "class2" - for i := 0; i < reqCount/2; i++ { - req, release, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - releases = append(releases, release) - } - - q.scheduleRequest() - - for _, req := range requests { - select { - case <-req.scheduled: - require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") - default: - } - } - - cl.v = math.MaxFloat64 - - var result []string - var wg sync.WaitGroup - for i := 0; i < reqCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - <-requests[i].scheduled - result = append(result, requests[i].tag) - releases[i]() - }() - } - q.scheduleRequest() - wg.Wait() - - // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., - // because the ratio is 2 to 1. - // However, there may be deviations due to rounding and sorting. - result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail - var class1Count int - var class2Count int - var class2MaxSeq int - for _, res := range result { - switch res { - case "class1": - class1Count++ - class2MaxSeq = 0 - case "class2": - class2Count++ - class2MaxSeq++ - require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row - default: - require.Fail(t, "unknown tag") - } - } - - require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) - - require.Equal(t, 0, q.readyQueue.Len()) - require.Equal(t, 0, q.sharesQueue.Len()) - require.Equal(t, 0, q.limitQueue.Len()) - require.Equal(t, 0, q.reservationQueue.Len()) -} - -func TestMClockReservationScheduling(t *testing.T) { - t.Parallel() - reqCount := 1000 - reqCount = (reqCount / 2) * 2 - limit := 0.01 // 1 request in 100 seconds - resevation := 100.0 // 100 RPS - cl := &noopClock{v: float64(1.0)} - q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ - "class1": {Share: 2, LimitIOPS: &limit}, - "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, - }, 100) - require.NoError(t, err) - q.clock = cl - - var releases []ReleaseFunc - var requests []*request - tag := "class1" - for i := 0; i < reqCount/2; i++ { - req, release, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - releases = append(releases, release) - } - tag = "class2" - for i := 0; i < reqCount/2; i++ { - req, release, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - releases = append(releases, release) - } - - q.scheduleRequest() - - count := 0 - for _, req := range requests { - select { - case <-req.scheduled: - require.Equal(t, req.tag, "class2") - count++ - default: - } - } - require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled") - - cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy - q.scheduleRequest() - - var result []string - for i, req := range requests { - select { - case <-req.scheduled: - result = append(result, requests[i].tag) - releases[i]() - default: - } - } - - require.Equal(t, 200, len(result)) - for _, res := range result { - require.Equal(t, "class2", res) - } - - cl.v = math.MaxFloat64 - q.scheduleRequest() - - require.Equal(t, 0, q.readyQueue.Len()) - require.Equal(t, 0, q.sharesQueue.Len()) - require.Equal(t, 0, q.limitQueue.Len()) - require.Equal(t, 0, q.reservationQueue.Len()) -} - -func TestMClockIdleTag(t *testing.T) { - t.Parallel() - reqCount := 100 - idleTimeout := 2 * time.Second - cl := &noopClock{} - q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ - "class1": {Share: 1}, - "class2": {Share: 1}, - }, idleTimeout) - require.NoError(t, err) - q.clock = cl - - var requests []*request - tag := "class1" - for i := 0; i < reqCount/2; i++ { - cl.v += idleTimeout.Seconds() / 2 - req, _, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - } - - // class1 requests have shares [1.0; 2.0; 3.0; ... ] - - cl.v += 2 * idleTimeout.Seconds() - - tag = "class2" - req, _, err := q.pushRequest(tag) - require.NoError(t, err) - requests = append(requests, req) - - // class2 must be defined as idle, so all shares tags must be adjusted. - - for _, req := range requests { - select { - case <-req.scheduled: - default: - require.True(t, req.shares >= cl.v) - } - } -} - -func TestMClockClose(t *testing.T) { - t.Parallel() - q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ - "class1": {Share: 1}, - }, 1000) - require.NoError(t, err) - q.clock = &noopClock{} - - requestRunning := make(chan struct{}) - checkDone := make(chan struct{}) - eg, ctx := errgroup.WithContext(context.Background()) - tag := "class1" - eg.Go(func() error { - release, err := q.RequestArrival(ctx, tag) - if err != nil { - return err - } - defer release() - close(requestRunning) - <-checkDone - return nil - }) - <-requestRunning - - eg.Go(func() error { - release, err := q.RequestArrival(ctx, tag) - require.Nil(t, release) - require.ErrorIs(t, err, ErrMClockSchedulerClosed) - return nil - }) - - // wait until second request will be blocked on wait - for q.waitingCount() == 0 { - time.Sleep(1 * time.Second) - } - - q.Close() - - release, err := q.RequestArrival(context.Background(), tag) - require.Nil(t, release) - require.ErrorIs(t, err, ErrMClockSchedulerClosed) - - close(checkDone) - - require.NoError(t, eg.Wait()) -} - -func TestMClockWaitLimit(t *testing.T) { - t.Parallel() - q, err := NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: 1}, - }, 1000) - require.NoError(t, err) - q.clock = &noopClock{} - defer q.Close() - - requestRunning := make(chan struct{}) - checkDone := make(chan struct{}) - eg, ctx := errgroup.WithContext(context.Background()) - tag := "class1" - // running request - eg.Go(func() error { - release, err := q.RequestArrival(ctx, tag) - if err != nil { - return err - } - defer release() - close(requestRunning) - <-checkDone - return nil - }) - - // waiting request - eg.Go(func() error { - <-requestRunning - release, err := q.RequestArrival(ctx, tag) - require.NotNil(t, release) - require.NoError(t, err) - defer release() - <-checkDone - return nil - }) - - // wait until second request will be waiting - for q.waitingCount() == 0 { - time.Sleep(1 * time.Second) - } - - release, err := q.RequestArrival(ctx, tag) - require.Nil(t, release) - require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded) - - close(checkDone) - require.NoError(t, eg.Wait()) -} - -func TestMClockParameterValidation(t *testing.T) { - _, err := NewMClock(0, 1, map[string]TagInfo{ - "class1": {Share: 1}, - }, 1000) - require.ErrorIs(t, err, ErrInvalidRunLimit) - _, err = NewMClock(1, 0, map[string]TagInfo{ - "class1": {Share: 1}, - }, 1000) - require.NoError(t, err) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: 1}, - }, -1.0) - require.NoError(t, err) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: 1}, - }, 0) - require.NoError(t, err) - negativeValue := -1.0 - zeroValue := float64(0) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: negativeValue}, - }, 1000) - require.ErrorIs(t, err, ErrInvalidTagInfo) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: zeroValue}, - }, 1000) - require.ErrorIs(t, err, ErrInvalidTagInfo) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: 1.0, ReservedIOPS: &zeroValue}, - }, 1000) - require.ErrorIs(t, err, ErrInvalidTagInfo) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: 1.0, ReservedIOPS: &negativeValue}, - }, 1000) - require.ErrorIs(t, err, ErrInvalidTagInfo) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: 1.0, LimitIOPS: &zeroValue}, - }, 1000) - require.ErrorIs(t, err, ErrInvalidTagInfo) - _, err = NewMClock(1, 1, map[string]TagInfo{ - "class1": {Share: 1.0, LimitIOPS: &negativeValue}, - }, 1000) - require.ErrorIs(t, err, ErrInvalidTagInfo) -} - -func (q *MClock) waitingCount() int { - q.mtx.Lock() - defer q.mtx.Unlock() - - return q.sharesQueue.Len() -} - -func TestMClockTimeBasedSchedule(t *testing.T) { - t.Parallel() - limit := 1.0 // 1 request per second allowed - cl := &noopClock{v: float64(1.5)} - q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{ - "class1": {Share: 1, LimitIOPS: &limit}, - }, 100) - require.NoError(t, err) - defer q.Close() - q.clock = cl - - running := make(chan struct{}) - checked := make(chan struct{}) - eg, ctx := errgroup.WithContext(context.Background()) - eg.Go(func() error { - release, err := q.RequestArrival(ctx, "class1") - require.NoError(t, err) - defer release() - close(running) - <-checked - return nil - }) - - <-running - // request must be scheduled at 2.0 - _, _, err = q.pushRequest("class1") - require.NoError(t, err) - require.NotNil(t, cl.runAtValue) - require.Equal(t, cl.v+1.0/limit, *cl.runAtValue) - close(checked) - require.NoError(t, eg.Wait()) -} - -func TestMClockLowLimit(t *testing.T) { - t.Parallel() - limit := 2.0 - q, err := NewMClock(100, 100, map[string]TagInfo{ - "class1": {Share: 50, LimitIOPS: &limit}, - }, 5*time.Second) - require.NoError(t, err) - defer q.Close() - - eg, ctx := errgroup.WithContext(context.Background()) - eg.SetLimit(5) - eg.Go(func() error { - for range 3 { - release, err := q.RequestArrival(ctx, "class1") - require.NoError(t, err) - release() - } - return nil - }) - require.NoError(t, eg.Wait()) -} - -func TestMClockLimitTotalTime(t *testing.T) { - t.Parallel() - limit := 10.0 // 10 RPS -> 1 request per 100 ms - q, err := NewMClock(100, 100, map[string]TagInfo{ - "class1": {Share: 50, LimitIOPS: &limit}, - }, 5*time.Second) - require.NoError(t, err) - defer q.Close() - - // 10 requests, each request runs for 500 ms, - // but they should be scheduled as soon as possible, - // so total duration must be less than 1 second - eg, ctx := errgroup.WithContext(context.Background()) - startedAt := time.Now() - for range 10 { - eg.Go(func() error { - release, err := q.RequestArrival(ctx, "class1") - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - release() - return nil - }) - } - require.NoError(t, eg.Wait()) - require.True(t, time.Since(startedAt) <= 1*time.Second) - - // 11 requests, limit = 10 RPS, so 10 requests should be - // scheduled as soon as possible, but last request should be - // scheduled at now + 1.0 s - eg, ctx = errgroup.WithContext(context.Background()) - startedAt = time.Now() - for range 11 { - eg.Go(func() error { - release, err := q.RequestArrival(ctx, "class1") - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - release() - return nil - }) - } - require.NoError(t, eg.Wait()) - require.True(t, time.Since(startedAt) >= 1500*time.Millisecond) - require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests -} - -func TestMClockRestictTagRequests(t *testing.T) { - t.Parallel() - limit := 10.0 - q, err := NewMClock(100, 100, map[string]TagInfo{ - "class1": {Share: 50, LimitIOPS: &limit}, - "class2": {Share: 50, LimitIOPS: &limit, Prohibited: true}, - }, 5*time.Second) - require.NoError(t, err) - defer q.Close() - - release, err := q.RequestArrival(context.Background(), "class1") - require.NoError(t, err) - release() - - release, err = q.RequestArrival(context.Background(), "class2") - require.ErrorIs(t, err, ErrTagRequestsProhibited) - require.Nil(t, release) -} diff --git a/scheduling/queue.go b/scheduling/queue.go deleted file mode 100644 index 12dd44a..0000000 --- a/scheduling/queue.go +++ /dev/null @@ -1,100 +0,0 @@ -package scheduling - -type queueItem interface { - ts() float64 - setIndex(idx int) -} - -type queue struct { - items []queueItem -} - -// Len implements heap.Interface. -func (q *queue) Len() int { - return len(q.items) -} - -// Less implements heap.Interface. -func (q *queue) Less(i int, j int) bool { - return q.items[i].ts() < q.items[j].ts() -} - -// Pop implements heap.Interface. -func (q *queue) Pop() any { - n := len(q.items) - item := q.items[n-1] - q.items[n-1] = nil - q.items = q.items[0 : n-1] - item.setIndex(invalidIndex) - return item -} - -// Push implements heap.Interface. -func (q *queue) Push(x any) { - it := x.(queueItem) - it.setIndex(q.Len()) - q.items = append(q.items, it) -} - -// Swap implements heap.Interface. -func (q *queue) Swap(i int, j int) { - q.items[i], q.items[j] = q.items[j], q.items[i] - q.items[i].setIndex(i) - q.items[j].setIndex(j) -} - -var _ queueItem = &reservationMQueueItem{} - -type reservationMQueueItem struct { - r *request -} - -func (i *reservationMQueueItem) ts() float64 { - return i.r.reservation -} - -func (i *reservationMQueueItem) setIndex(idx int) { - i.r.reservationIdx = idx -} - -var _ queueItem = &limitMQueueItem{} - -type limitMQueueItem struct { - r *request -} - -func (i *limitMQueueItem) ts() float64 { - return i.r.limit -} - -func (i *limitMQueueItem) setIndex(idx int) { - i.r.limitIdx = idx -} - -var _ queueItem = &sharesMQueueItem{} - -type sharesMQueueItem struct { - r *request -} - -func (i *sharesMQueueItem) ts() float64 { - return i.r.shares -} - -func (i *sharesMQueueItem) setIndex(idx int) { - i.r.sharesIdx = idx -} - -var _ queueItem = &readyMQueueItem{} - -type readyMQueueItem struct { - r *request -} - -func (i *readyMQueueItem) ts() float64 { - return i.r.shares -} - -func (i *readyMQueueItem) setIndex(idx int) { - i.r.readyIdx = idx -} diff --git a/tagging/context.go b/tagging/context.go deleted file mode 100644 index 3b6c113..0000000 --- a/tagging/context.go +++ /dev/null @@ -1,21 +0,0 @@ -package tagging - -import "context" - -type tagContextKeyType struct{} - -var currentTagKey = tagContextKeyType{} - -func ContextWithIOTag(parent context.Context, ioTag string) context.Context { - return context.WithValue(parent, currentTagKey, ioTag) -} - -func IOTagFromContext(ctx context.Context) (string, bool) { - if ctx == nil { - panic("context must be non nil") - } - if tag, ok := ctx.Value(currentTagKey).(string); ok { - return tag, true - } - return "", false -} diff --git a/tagging/context_test.go b/tagging/context_test.go deleted file mode 100644 index b13b253..0000000 --- a/tagging/context_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package tagging - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestContext(t *testing.T) { - ctx := context.Background() - tag, ok := IOTagFromContext(ctx) - require.False(t, ok) - require.Equal(t, "", tag) - ctx = ContextWithIOTag(ctx, "tag1") - tag, ok = IOTagFromContext(ctx) - require.True(t, ok) - require.Equal(t, "tag1", tag) - ctx = ContextWithIOTag(ctx, "tag2") - tag, ok = IOTagFromContext(ctx) - require.True(t, ok) - require.Equal(t, "tag2", tag) -} diff --git a/tagging/grpc.go b/tagging/grpc.go deleted file mode 100644 index 4e2fcfe..0000000 --- a/tagging/grpc.go +++ /dev/null @@ -1,96 +0,0 @@ -package tagging - -import ( - "context" - - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" -) - -const ( - ioTagHeader = "x-frostfs-io-tag" -) - -// NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. -func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor { - return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) - } -} - -// NewStreamClientInterceptor creates new gRPC stream interceptor to set an IO tag to gRPC metadata. -func NewStreamClientInterceptor() grpc.StreamClientInterceptor { - return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - return streamer(setIOTagToGRPCMetadata(ctx), desc, cc, method, opts...) - } -} - -// NewUnaryServerInterceptor creates new gRPC unary interceptor to extract an IO tag to gRPC metadata. -func NewUnaryServerInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - return handler(extractIOTagFromGRPCMetadata(ctx), req) - } -} - -// NewStreamServerInterceptor creates new gRPC stream interceptor to extract an IO tag to gRPC metadata. -func NewStreamServerInterceptor() grpc.StreamServerInterceptor { - return func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - return handler(srv, &serverStream{origin: ss}) - } -} - -func setIOTagToGRPCMetadata(ctx context.Context) context.Context { - ioTag, ok := IOTagFromContext(ctx) - if !ok { - return ctx - } - md, ok := metadata.FromOutgoingContext(ctx) - if !ok { - md = metadata.MD{} - } - md.Set(ioTagHeader, ioTag) - return metadata.NewOutgoingContext(ctx, md) -} - -func extractIOTagFromGRPCMetadata(ctx context.Context) context.Context { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return ctx - } - - values := md.Get(ioTagHeader) - if len(values) > 0 { - return ContextWithIOTag(ctx, values[0]) - } - return ctx -} - -var _ grpc.ServerStream = &serverStream{} - -type serverStream struct { - origin grpc.ServerStream -} - -func (s *serverStream) Context() context.Context { - return extractIOTagFromGRPCMetadata(s.origin.Context()) -} - -func (s *serverStream) RecvMsg(m any) error { - return s.origin.RecvMsg(m) -} - -func (s *serverStream) SendHeader(md metadata.MD) error { - return s.origin.SendHeader(md) -} - -func (s *serverStream) SendMsg(m any) error { - return s.origin.SendMsg(m) -} - -func (s *serverStream) SetHeader(md metadata.MD) error { - return s.origin.SetHeader(md) -} - -func (s *serverStream) SetTrailer(md metadata.MD) { - s.origin.SetTrailer(md) -}