diff --git a/.forgejo/ISSUE_TEMPLATE/bug_report.md b/.forgejo/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..fb16999 --- /dev/null +++ b/.forgejo/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,46 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: community, triage, bug +assignees: '' + +--- + + + +## Expected Behavior + + +## Current Behavior + + +## Possible Solution + + +## Steps to Reproduce (for bugs) + + +1. + +## Context + + +## Regression + + +## Your Environment + +* Version used: +* Server setup and configuration: +* Operating System and version (`uname -a`): diff --git a/.forgejo/ISSUE_TEMPLATE/config.yml b/.forgejo/ISSUE_TEMPLATE/config.yml new file mode 100644 index 0000000..3ba13e0 --- /dev/null +++ b/.forgejo/ISSUE_TEMPLATE/config.yml @@ -0,0 +1 @@ +blank_issues_enabled: false diff --git a/.forgejo/ISSUE_TEMPLATE/feature_request.md b/.forgejo/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..5beeb06 --- /dev/null +++ b/.forgejo/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,28 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: community, triage +assignees: '' + +--- + +## Is your feature request related to a problem? Please describe. + + +## Describe the solution you'd like + + +## Describe alternatives you've considered + + +## Additional context + + +## Don't forget to add labels! +- component label (`neofs-adm`, `neofs-storage`, ...) +- issue type (`enhancement`, `refactor`, ...) +- `goodfirstissue`, `helpwanted` if needed +- does this issue belong to an epic? +- priority (`P0`-`P4`) if already triaged +- quarter label (`202XQY`) if possible diff --git a/.forgejo/logo.svg b/.forgejo/logo.svg new file mode 100644 index 0000000..148c359 --- /dev/null +++ b/.forgejo/logo.svg @@ -0,0 +1,70 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.forgejo/workflows/dco.yml b/.forgejo/workflows/dco.yml new file mode 100644 index 0000000..7c5af84 --- /dev/null +++ b/.forgejo/workflows/dco.yml @@ -0,0 +1,21 @@ +name: DCO action +on: [pull_request] + +jobs: + dco: + name: DCO + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: '1.22' + + - name: Run commit format checker + uses: https://git.frostfs.info/TrueCloudLab/dco-go@v3 + with: + from: 'origin/${{ github.event.pull_request.base.ref }}' diff --git a/.forgejo/workflows/pre-commit.yml b/.forgejo/workflows/pre-commit.yml new file mode 100644 index 0000000..b27e7a3 --- /dev/null +++ b/.forgejo/workflows/pre-commit.yml @@ -0,0 +1,30 @@ +name: Pre-commit hooks + +on: + pull_request: + push: + branches: + - master + +jobs: + precommit: + name: Pre-commit + env: + # Skip pre-commit hooks which are executed by other actions. + SKIP: make-lint,go-staticcheck-repo-mod,go-unit-tests,gofumpt + runs-on: ubuntu-22.04 + # If we use actions/setup-python from either Github or Gitea, + # the line above fails with a cryptic error about not being able to find python. + # So install everything manually. + steps: + - uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.23 + - name: Set up Python + run: | + apt update + apt install -y pre-commit + - name: Run pre-commit + run: pre-commit run --color=always --hook-stage manual --all-files diff --git a/.forgejo/workflows/tests.yml b/.forgejo/workflows/tests.yml new file mode 100644 index 0000000..4f1bebe --- /dev/null +++ b/.forgejo/workflows/tests.yml @@ -0,0 +1,116 @@ +name: Tests and linters + +on: + pull_request: + push: + branches: + - master + +jobs: + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '1.23' + cache: true + + - name: Install linters + run: make lint-install + + - name: Run linters + run: make lint + + tests: + name: Tests + runs-on: ubuntu-latest + strategy: + matrix: + go_versions: [ '1.22', '1.23' ] + fail-fast: false + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '${{ matrix.go_versions }}' + cache: true + + - name: Run tests + run: make test + + tests-race: + name: Tests with -race + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '1.22' + cache: true + + - name: Run tests + run: go test ./... -count=1 -race + + staticcheck: + name: Staticcheck + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '1.23' + cache: true + + - name: Install staticcheck + run: make staticcheck-install + + - name: Run staticcheck + run: make staticcheck-run + + gopls: + name: gopls check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '1.22' + cache: true + + - name: Install gopls + run: make gopls-install + + - name: Run gopls + run: make gopls-run + + fumpt: + name: Run gofumpt + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '1.23' + cache: true + + - name: Install gofumpt + run: make fumpt-install + + - name: Run gofumpt + run: | + make fumpt + git diff --exit-code --quiet diff --git a/.forgejo/workflows/vulncheck.yml b/.forgejo/workflows/vulncheck.yml new file mode 100644 index 0000000..140434d --- /dev/null +++ b/.forgejo/workflows/vulncheck.yml @@ -0,0 +1,28 @@ +name: Vulncheck + +on: + pull_request: + push: + branches: + - master + +jobs: + vulncheck: + name: Vulncheck + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: '1.23' + check-latest: true + + - name: Install govulncheck + run: go install golang.org/x/vuln/cmd/govulncheck@latest + + - name: Run govulncheck + run: govulncheck ./... diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..c7a3f7a --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +/**/*.pb.go -diff -merge +/**/*.pb.go linguist-generated=true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bfdd2f7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# IDE +.idea +.vscode + +# Vendoring +vendor + +# tempfiles +.DS_Store +*~ +.cache + +temp +tmp + +# binary +bin/ +release/ + +# coverage +coverage.txt +coverage.html diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..b0499c7 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,75 @@ +# This file contains all available configuration options +# with their default values. + +# options for analysis running +run: + # timeout for analysis, e.g. 30s, 5m, default is 1m + timeout: 5m + + # include test files or not, default is true + tests: false + +# output configuration options +output: + # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" + formats: + - format: tab + +# all available settings of specific linters +linters-settings: + exhaustive: + # indicates that switch statements are to be considered exhaustive if a + # 'default' case is present, even if all enum members aren't listed in the + # switch + default-signifies-exhaustive: true + funlen: + lines: 60 # default 60 + statements: 40 # default 40 + gocognit: + min-complexity: 30 # default 30 + importas: + no-unaliased: true + no-extra-aliases: false + unused: + field-writes-are-uses: false + exported-fields-are-used: false + local-variables-are-used: false + +linters: + enable: + # mandatory linters + - govet + - revive + + # some default golangci-lint linters + - errcheck + - gosimple + - godot + - ineffassign + - staticcheck + - typecheck + - unused + + # extra linters + - bidichk + - durationcheck + - exhaustive + - copyloopvar + - gofmt + - goimports + - misspell + - predeclared + - reassign + - whitespace + - containedctx + - funlen + - gocognit + - contextcheck + - importas + - perfsprint + - testifylint + - protogetter + - intrange + - tenv + disable-all: true + fast: false diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..6bd9629 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,45 @@ +ci: + autofix_prs: false + +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-added-large-files + - id: check-case-conflict + - id: check-executables-have-shebangs + - id: check-shebang-scripts-are-executable + - id: check-merge-conflict + - id: check-json + - id: check-xml + - id: check-yaml + - id: trailing-whitespace + args: [--markdown-linebreak-ext=md] + - id: end-of-file-fixer + exclude: ".key$" + + - repo: https://github.com/shellcheck-py/shellcheck-py + rev: v0.9.0.2 + hooks: + - id: shellcheck + + - repo: local + hooks: + - id: go-unit-tests + name: go unit tests + entry: make test GOFLAGS='' + pass_filenames: false + types: [go] + language: system + - id: golangci-lint + name: golangci-lint check + entry: make lint + pass_filenames: false + types: [go] + language: system + - id: gofumpt + name: gofumpt check + entry: make fumpt + pass_filenames: false + types: [go] + language: system diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 0000000..d19c96a --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1,3 @@ +.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers +.forgejo/.* @potyarkin +Makefile @potyarkin diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100755 index 0000000..4fe0061 --- /dev/null +++ b/Makefile @@ -0,0 +1,107 @@ +#!/usr/bin/make -f +SHELL = bash + +BIN = bin +TMP_DIR := .cache + +SOURCES = $(shell find . -type f -name "*.go" -print) + +LINT_VERSION ?= 1.63.4 +LINT_DIR ?= $(abspath $(BIN))/golangci-lint +LINT_VERSION_DIR = $(LINT_DIR)/$(LINT_VERSION) + +STATICCHECK_VERSION ?= 2024.1.1 +STATICCHECK_DIR ?= $(abspath $(BIN))/staticcheck +STATICCHECK_VERSION_DIR ?= $(STATICCHECK_DIR)/$(STATICCHECK_VERSION) + +GOPLS_VERSION ?= v0.17.1 +GOPLS_DIR ?= $(abspath $(BIN))/gopls +GOPLS_VERSION_DIR ?= $(GOPLS_DIR)/$(GOPLS_VERSION) +GOPLS_TEMP_FILE := $(shell mktemp) + +GOFUMPT_VERSION ?= v0.7.0 +GOFUMPT_DIR ?= $(abspath $(BIN))/gofumpt +GOFUMPT_VERSION_DIR ?= $(GOFUMPT_DIR)/$(GOFUMPT_VERSION) + +# Run all code formatters +fmts: fumpt imports + +# Reformat imports +imports: + @echo "⇒ Processing goimports check" + @goimports -w . + +# Run Unit Test with go test +test: GOFLAGS ?= "-count=1" +test: + @echo "⇒ Running go test" + @GOFLAGS="$(GOFLAGS)" go test ./... + +# Activate pre-commit hooks +pre-commit: + pre-commit install -t pre-commit -t commit-msg + +# Deactivate pre-commit hooks +unpre-commit: + pre-commit uninstall -t pre-commit -t commit-msg + +pre-commit-run: + @pre-commit run -a --hook-stage manual + +# Install linters +lint-install: + @rm -rf $(LINT_DIR) + @mkdir -p $(LINT_DIR) + @CGO_ENABLED=1 GOBIN=$(LINT_VERSION_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION) + +# Run linters +lint: + @if [ ! -d "$(LINT_VERSION_DIR)" ]; then \ + make lint-install; \ + fi + $(LINT_VERSION_DIR)/golangci-lint run + +# Install staticcheck +staticcheck-install: + @rm -rf $(STATICCHECK_DIR) + @mkdir -p $(STATICCHECK_DIR) + @GOBIN=$(STATICCHECK_VERSION_DIR) go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION) + +# Run staticcheck +staticcheck-run: + @if [ ! -d "$(STATICCHECK_VERSION_DIR)" ]; then \ + make staticcheck-install; \ + fi + @$(STATICCHECK_VERSION_DIR)/staticcheck ./... + +# Install gopls +gopls-install: + @rm -rf $(GOPLS_DIR) + @mkdir -p $(GOPLS_DIR) + @GOBIN=$(GOPLS_VERSION_DIR) go install golang.org/x/tools/gopls@$(GOPLS_VERSION) + +# Run gopls +gopls-run: + @if [ ! -d "$(GOPLS_VERSION_DIR)" ]; then \ + make gopls-install; \ + fi + $(GOPLS_VERSION_DIR)/gopls check $(SOURCES) 2>&1 >$(GOPLS_TEMP_FILE) + @if [[ $$(wc -l < $(GOPLS_TEMP_FILE)) -ne 0 ]]; then \ + cat $(GOPLS_TEMP_FILE); \ + exit 1; \ + fi + rm $(GOPLS_TEMP_FILE) + +# Install gofumpt +fumpt-install: + @rm -rf $(GOFUMPT_DIR) + @mkdir -p $(GOFUMPT_DIR) + @GOBIN=$(GOFUMPT_VERSION_DIR) go install mvdan.cc/gofumpt@$(GOFUMPT_VERSION) + +# Run gofumpt +fumpt: + @if [ ! -d "$(GOFUMPT_VERSION_DIR)" ]; then \ + make fumpt-install; \ + fi + @echo "⇒ Processing gofumpt check" + $(GOFUMPT_VERSION_DIR)/gofumpt -l -w . diff --git a/README.md b/README.md index 7463f9e..36d5fb8 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,22 @@ -# WIP area: this repo is just a fork! +# Quality of Service (QoS) Go libraries for FrostFS object storage -Useful things may be published only in [other branches](../../../branches) +See package documentation +at [pkg.go.dev](https://pkg.go.dev/git.frostfs.info/TrueCloudLab/frostfs-qos) + +## License and copyright + +Copyright 2023-2025 FrostFS contributors + +``` + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..35f0342 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module git.frostfs.info/TrueCloudLab/frostfs-qos + +go 1.22 + +require ( + github.com/stretchr/testify v1.9.0 + google.golang.org/grpc v1.69.2 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect + google.golang.org/protobuf v1.35.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +require golang.org/x/sync v0.10.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7f2985d --- /dev/null +++ b/go.sum @@ -0,0 +1,54 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/assert/cond.go b/internal/assert/cond.go new file mode 100644 index 0000000..4a1b201 --- /dev/null +++ b/internal/assert/cond.go @@ -0,0 +1,9 @@ +package assert + +import "strings" + +func Cond(cond bool, details ...string) { + if !cond { + panic(strings.Join(details, " ")) + } +} diff --git a/limiting/limiter.go b/limiting/limiter.go new file mode 100644 index 0000000..0f550f8 --- /dev/null +++ b/limiting/limiter.go @@ -0,0 +1,75 @@ +package limiting + +import ( + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" +) + +type ReleaseFunc func() + +type Limiter interface { + // Acquire attempts to reserve a slot without blocking. + // + // Returns a release function and true if successful. The function must be + // called to release the limiter. The function must be called exactly once. + // Calling the function more that once will cause incorrect behavior of the + // limiter. + // + // Returns nil and false if fails. + // + // If the key was not defined in the limiter, no limit is applied. + Acquire(key string) (ReleaseFunc, bool) +} + +type SemaphoreLimiter struct { + m map[string]*semaphore.Semaphore +} + +// KeyLimit defines a concurrency limit for a set of keys. +// +// All keys of one set share the same limit. +// Keys of different sets have separate limits. +// +// Sets must not overlap. +type KeyLimit struct { + Keys []string + Limit int64 +} + +func NewSemaphoreLimiter(limits []KeyLimit) (*SemaphoreLimiter, error) { + lr := SemaphoreLimiter{make(map[string]*semaphore.Semaphore)} + for _, limit := range limits { + if limit.Limit < 0 { + return nil, fmt.Errorf("invalid limit %d", limit.Limit) + } + sem := semaphore.NewSemaphore(limit.Limit) + + if err := lr.addLimit(&limit, sem); err != nil { + return nil, err + } + } + return &lr, nil +} + +func (lr *SemaphoreLimiter) addLimit(limit *KeyLimit, sem *semaphore.Semaphore) error { + for _, key := range limit.Keys { + if _, exists := lr.m[key]; exists { + return fmt.Errorf("duplicate key %q", key) + } + lr.m[key] = sem + } + return nil +} + +func (lr *SemaphoreLimiter) Acquire(key string) (ReleaseFunc, bool) { + sem, ok := lr.m[key] + if !ok { + return func() {}, true + } + + if ok := sem.Acquire(); ok { + return sem.Release, true + } + return nil, false +} diff --git a/limiting/limiter_test.go b/limiting/limiter_test.go new file mode 100644 index 0000000..c6087f1 --- /dev/null +++ b/limiting/limiter_test.go @@ -0,0 +1,138 @@ +package limiting_test + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting" + "github.com/stretchr/testify/require" +) + +const ( + operationDuration = 10 * time.Millisecond + operationCount = 64 +) + +type testCase struct { + keys []string + limit int64 + withoutLimit bool + failCount atomic.Int64 +} + +func TestLimiter(t *testing.T) { + testLimiter(t, func(kl []limiting.KeyLimit) (limiting.Limiter, error) { + return limiting.NewSemaphoreLimiter(kl) + }) +} + +func testLimiter(t *testing.T, getLimiter func([]limiting.KeyLimit) (limiting.Limiter, error)) { + t.Run("duplicate key", func(t *testing.T) { + _, err := getLimiter([]limiting.KeyLimit{ + {[]string{"A", "B"}, 10}, + {[]string{"B", "C"}, 10}, + }) + require.Error(t, err) + }) + + testCases := []*testCase{ + {keys: []string{"A"}, limit: operationCount / 4}, + {keys: []string{"B"}, limit: operationCount / 2}, + {keys: []string{"C", "D"}, limit: operationCount / 4}, + {keys: []string{"E"}, limit: 2 * operationCount}, + {keys: []string{"F"}, withoutLimit: true}, + } + + lr, err := getLimiter(getLimits(testCases)) + require.NoError(t, err) + + tasks := createTestTasks(testCases, lr) + + t.Run("first run", func(t *testing.T) { + executeTasks(tasks...) + verifyResults(t, testCases) + }) + + resetFailCounts(testCases) + + t.Run("repeated run", func(t *testing.T) { + executeTasks(tasks...) + verifyResults(t, testCases) + }) +} + +func getLimits(testCases []*testCase) []limiting.KeyLimit { + var limits []limiting.KeyLimit + for _, tc := range testCases { + if tc.withoutLimit { + continue + } + limits = append(limits, limiting.KeyLimit{ + Keys: tc.keys, + Limit: int64(tc.limit), + }) + } + return limits +} + +func createTestTasks(testCases []*testCase, lr limiting.Limiter) []func() { + var tasks []func() + for _, tc := range testCases { + for _, key := range tc.keys { + tasks = append(tasks, func() { + executeTaskN(operationCount, func() { acquireAndExecute(tc, lr, key) }) + }) + } + } + return tasks +} + +func acquireAndExecute(tc *testCase, lr limiting.Limiter, key string) { + release, ok := lr.Acquire(key) + if !ok { + tc.failCount.Add(1) + return + } + defer release() + time.Sleep(operationDuration) +} + +func executeTasks(tasks ...func()) { + var g sync.WaitGroup + + g.Add(len(tasks)) + for _, task := range tasks { + go func() { + defer g.Done() + task() + }() + } + g.Wait() +} + +func executeTaskN(N int, task func()) { + tasks := make([]func(), N) + for i := range N { + tasks[i] = task + } + executeTasks(tasks...) +} + +func verifyResults(t *testing.T, testCases []*testCase) { + for _, tc := range testCases { + var expectedFailCount int64 + if !tc.withoutLimit { + numKeys := int64(len(tc.keys)) + expectedFailCount = max(operationCount*numKeys-tc.limit, 0) + } + require.Equal(t, expectedFailCount, tc.failCount.Load()) + } +} + +func resetFailCounts(testCases []*testCase) { + for _, tc := range testCases { + tc.failCount.Store(0) + } +} diff --git a/limiting/semaphore/semaphore.go b/limiting/semaphore/semaphore.go new file mode 100644 index 0000000..c43dfc6 --- /dev/null +++ b/limiting/semaphore/semaphore.go @@ -0,0 +1,27 @@ +package semaphore + +import ( + "sync/atomic" +) + +type Semaphore struct { + count atomic.Int64 + limit int64 +} + +func NewSemaphore(size int64) *Semaphore { + return &Semaphore{limit: size} +} + +func (s *Semaphore) Acquire() bool { + v := s.count.Add(1) + if v > s.limit { + s.count.Add(-1) + return false + } + return true +} + +func (s *Semaphore) Release() { + s.count.Add(-1) +} diff --git a/limiting/semaphore/semaphore_bench.result b/limiting/semaphore/semaphore_bench.result new file mode 100644 index 0000000..5883c60 --- /dev/null +++ b/limiting/semaphore/semaphore_bench.result @@ -0,0 +1,26 @@ +goos: linux +goarch: amd64 +pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore +cpu: 12th Gen Intel(R) Core(TM) i5-1235U +BenchmarkSemaphore/semaphore_size=1/lock_duration=0s-12 6113605 1964 ns/op 383.1 acquire-ns/op 203.5 release-ns/op 0.6892 success-rate +BenchmarkSemaphore/semaphore_size=1/lock_duration=1µs-12 5826655 2067 ns/op 382.0 acquire-ns/op 307.0 release-ns/op 0.1460 success-rate +BenchmarkSemaphore/semaphore_size=1/lock_duration=10µs-12 5977272 2033 ns/op 370.4 acquire-ns/op 321.4 release-ns/op 0.05408 success-rate +BenchmarkSemaphore/semaphore_size=1/lock_duration=100µs-12 5862900 2030 ns/op 365.6 acquire-ns/op 343.1 release-ns/op 0.01242 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=0s-12 5637050 2173 ns/op 365.2 acquire-ns/op 261.7 release-ns/op 0.9765 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=1µs-12 5470316 2225 ns/op 390.4 acquire-ns/op 357.2 release-ns/op 0.9249 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=10µs-12 5584527 2134 ns/op 395.2 acquire-ns/op 339.0 release-ns/op 0.5409 success-rate +BenchmarkSemaphore/semaphore_size=10/lock_duration=100µs-12 5841032 2036 ns/op 369.4 acquire-ns/op 330.7 release-ns/op 0.1182 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=0s-12 5600013 2159 ns/op 369.9 acquire-ns/op 271.1 release-ns/op 0.9976 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=1µs-12 5323606 2280 ns/op 394.0 acquire-ns/op 368.9 release-ns/op 0.9697 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=10µs-12 5133394 2353 ns/op 405.8 acquire-ns/op 374.5 release-ns/op 0.9498 success-rate +BenchmarkSemaphore/semaphore_size=100/lock_duration=100µs-12 5238136 2303 ns/op 387.2 acquire-ns/op 362.2 release-ns/op 0.8749 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=0s-12 5408720 2180 ns/op 367.6 acquire-ns/op 271.5 release-ns/op 0.9992 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=1µs-12 5114854 2366 ns/op 407.9 acquire-ns/op 376.4 release-ns/op 0.9966 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=10µs-12 4659454 2438 ns/op 412.2 acquire-ns/op 385.9 release-ns/op 0.9800 success-rate +BenchmarkSemaphore/semaphore_size=1000/lock_duration=100µs-12 4837894 2482 ns/op 401.7 acquire-ns/op 380.9 release-ns/op 0.9725 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=0s-12 5403058 2188 ns/op 367.5 acquire-ns/op 273.1 release-ns/op 1.000 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=1µs-12 5086929 2306 ns/op 390.6 acquire-ns/op 376.3 release-ns/op 1.000 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=10µs-12 5059968 2378 ns/op 410.2 acquire-ns/op 384.5 release-ns/op 1.000 success-rate +BenchmarkSemaphore/semaphore_size=10000/lock_duration=100µs-12 4909206 2420 ns/op 408.4 acquire-ns/op 383.4 release-ns/op 1.000 success-rate +PASS +ok git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore 284.895s diff --git a/limiting/semaphore/semaphore_bench_test.go b/limiting/semaphore/semaphore_bench_test.go new file mode 100644 index 0000000..f4837e8 --- /dev/null +++ b/limiting/semaphore/semaphore_bench_test.go @@ -0,0 +1,95 @@ +package semaphore_test + +import ( + "fmt" + "sync" + "testing" + "time" + + semaphores "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +const maxWorkers = 10_000_000 + +type benchmarkSemaphoreMetrics struct { + mu sync.Mutex + + acquireDuration, + releaseDuration time.Duration + + acquireCount, + releaseCount uint64 +} + +func (c *benchmarkSemaphoreMetrics) reportAcquire(duration time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.acquireDuration += duration + c.acquireCount += 1 +} + +func (c *benchmarkSemaphoreMetrics) reportRelease(duration time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.releaseDuration += duration + c.releaseCount += 1 +} + +func (c *benchmarkSemaphoreMetrics) getResults() (timePerAcquire, timePerRelease, successRate float64) { + timePerAcquire = float64(c.acquireDuration) / float64(c.acquireCount) + timePerRelease = float64(c.releaseDuration) / float64(c.releaseCount) + successRate = float64(c.releaseCount) / float64(c.acquireCount) + return +} + +func BenchmarkSemaphore(b *testing.B) { + sizes := []int64{1, 10, 100, 1000, 10000} + lockDurations := []time.Duration{0, time.Microsecond, 10 * time.Microsecond, 100 * time.Microsecond} + + for _, size := range sizes { + for _, lockDuration := range lockDurations { + name := fmt.Sprintf("semaphore_size=%d/lock_duration=%v", size, lockDuration) + b.Run(name, func(b *testing.B) { + benchmarkSemaphore(b, semaphores.NewSemaphore(size), lockDuration) + }) + } + } +} + +func benchmarkSemaphore(b *testing.B, sem *semaphores.Semaphore, lockDuration time.Duration) { + var m benchmarkSemaphoreMetrics + var g errgroup.Group + g.SetLimit(maxWorkers) + + for range b.N { + g.Go(func() error { + now := time.Now() + ok := sem.Acquire() + m.reportAcquire(time.Since(now)) + + if !ok { + return nil + } + + time.Sleep(lockDuration) + + now = time.Now() + sem.Release() + m.reportRelease(time.Since(now)) + + return nil + }) + } + require.NoError(b, g.Wait()) + + require.Equal(b, uint64(b.N), m.acquireCount) + require.LessOrEqual(b, m.releaseCount, m.acquireCount) + + timePerAcquire, timePerRelease, successRate := m.getResults() + + b.ReportMetric(timePerAcquire, "acquire-ns/op") + b.ReportMetric(timePerRelease, "release-ns/op") + b.ReportMetric(successRate, "success-rate") +} diff --git a/scheduling/clock.go b/scheduling/clock.go new file mode 100644 index 0000000..6fa3d84 --- /dev/null +++ b/scheduling/clock.go @@ -0,0 +1,95 @@ +package scheduling + +import ( + "math" + "sync" + "time" +) + +type clock interface { + now() float64 + runAt(ts float64, f func()) + close() +} + +type scheduleInfo struct { + ts float64 + f func() +} + +type systemClock struct { + since time.Time + schedule chan scheduleInfo + wg sync.WaitGroup +} + +func newSystemClock() *systemClock { + c := &systemClock{ + since: time.Now(), + schedule: make(chan scheduleInfo), + } + c.start() + return c +} + +func (c *systemClock) now() float64 { + return time.Since(c.since).Seconds() +} + +func (c *systemClock) runAt(ts float64, f func()) { + c.schedule <- scheduleInfo{ts: ts, f: f} +} + +func (c *systemClock) close() { + close(c.schedule) + c.wg.Wait() +} + +func (c *systemClock) start() { + c.wg.Add(1) + go func() { + defer c.wg.Done() + t := time.NewTimer(0) + <-t.C + currentTs := math.MaxFloat64 + var currentTask func() + for { + select { + case <-t.C: + if currentTask != nil { + c.wg.Add(1) + f := currentTask + go func() { + defer c.wg.Done() + f() + }() + currentTask = nil + } + currentTs = math.MaxFloat64 + case s, ok := <-c.schedule: + if !ok { + return + } + if s.ts >= currentTs { + // current timer will fire earlier + // so next scheduleRequest will push new schedule event + continue + } + var d time.Duration + now := c.now() + if now < s.ts { + d = time.Duration((s.ts - now) * 1e9) + } + if !t.Stop() { + select { + case <-t.C: + default: + } + } + t.Reset(d) + currentTask = s.f + currentTs = s.ts + } + } + }() +} diff --git a/scheduling/mclock.go b/scheduling/mclock.go new file mode 100644 index 0000000..64c62a8 --- /dev/null +++ b/scheduling/mclock.go @@ -0,0 +1,406 @@ +package scheduling + +import ( + "container/heap" + "context" + "errors" + "math" + "sync" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert" +) + +const ( + invalidIndex = -1 + undefinedReservation float64 = -1.0 +) + +var ( + ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed") + ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded") + ErrMClockSchedulerUnknownTag = errors.New("unknown tag") + ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") + ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") + ErrTagRequestsProhibited = errors.New("tag requests are prohibited") +) + +type request struct { + tag string + ts float64 + + reservation float64 + limit float64 + shares float64 + + reservationIdx int + limitIdx int + sharesIdx int + readyIdx int + + scheduled chan struct{} + canceled chan struct{} +} + +// ReleaseFunc is the type of function that should be called after the request is completed. +type ReleaseFunc func() + +// TagInfo contains reserved IOPS, IOPS limit and share values for a tag. +type TagInfo struct { + ReservedIOPS *float64 + LimitIOPS *float64 + Share float64 + Prohibited bool +} + +// MClock is mClock scheduling algorithm implementation. +// +// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details. +type MClock struct { + runLimit uint64 + waitLimit int + clock clock + idleTimeout float64 + tagInfo map[string]TagInfo + + mtx sync.Mutex + previous map[string]*request + inProgress uint64 + reservationQueue *queue + limitQueue *queue + sharesQueue *queue + readyQueue *queue + closed bool +} + +// NewMClock creates new MClock scheduler instance with +// runLimit maximum allowed count of running requests and +// waitLimit maximum allowed count of waiting requests +// for tags specified by tagInfo. The value of idleTimeout defines +// the difference between the current time and the time of +// the previous request, at which the tag considered idle. +// If idleTimeout is negative, it means that there is no idle tags allowed. +// If waitLimit equals zero, it means that there is no limit on the +// number of waiting requests. +func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) { + if err := validateParams(runLimit, tagInfo); err != nil { + return nil, err + } + result := &MClock{ + runLimit: runLimit, + waitLimit: int(waitLimit), + clock: newSystemClock(), + idleTimeout: idleTimeout.Seconds(), + tagInfo: tagInfo, + + reservationQueue: &queue{}, + limitQueue: &queue{}, + sharesQueue: &queue{}, + readyQueue: &queue{}, + } + + previous := make(map[string]*request) + for tag := range tagInfo { + previous[tag] = &request{ + tag: tag, + reservationIdx: invalidIndex, + limitIdx: invalidIndex, + sharesIdx: invalidIndex, + } + } + result.previous = previous + + return result, nil +} + +// RequestArrival schedules new request with tag value. +// Method call is blocked until one of the following events occurs: +// request with the tag is scheduled for execution, +// context ctx is canceled or the scheduler is closed. +// If the method call returned non-nil ReleaseFunc, +// then it must be called after the request is completed. +func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) { + req, release, err := q.pushRequest(tag) + if err != nil { + return nil, err + } + select { + case <-ctx.Done(): + q.dropRequest(req) + return nil, ctx.Err() + case <-req.scheduled: + return release, nil + case <-req.canceled: + return nil, ErrMClockSchedulerClosed + } +} + +// Close closes MClock scheduler. +// No new requests for scheduling will be accepted after the closing. +func (q *MClock) Close() { + q.mtx.Lock() + q.closed = true + for q.limitQueue.Len() > 0 { + item := heap.Pop(q.limitQueue).(*limitMQueueItem) + close(item.r.canceled) + q.removeFromQueues(item.r) + } + q.mtx.Unlock() + + q.clock.close() +} + +func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { + if runLimit == 0 { + return ErrInvalidRunLimit + } + for _, v := range tagInfo { + if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) { + return ErrInvalidTagInfo + } + if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) { + return ErrInvalidTagInfo + } + if math.IsNaN(v.Share) || v.Share <= float64(0) { + return ErrInvalidTagInfo + } + } + return nil +} + +func (q *MClock) dropRequest(req *request) { + q.mtx.Lock() + defer q.mtx.Unlock() + + select { + case <-req.scheduled: + assert.Cond(q.inProgress > 0, "invalid requests count") + q.inProgress-- + default: + } + + q.removeFromQueues(req) +} + +func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { + q.mtx.Lock() + defer q.mtx.Unlock() + + if q.closed { + return nil, nil, ErrMClockSchedulerClosed + } + if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit { + return nil, nil, ErrMClockSchedulerRequestLimitExceeded + } + + now := q.clock.now() + tagInfo, ok := q.tagInfo[tag] + if !ok { + return nil, nil, ErrMClockSchedulerUnknownTag + } + if tagInfo.Prohibited { + return nil, nil, ErrTagRequestsProhibited + } + prev, ok := q.previous[tag] + assert.Cond(ok, "undefined previous:", tag) + + if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout + q.adjustTags(now, tag) + } + + r := &request{ + tag: tag, + ts: now, + shares: max(prev.shares+1.0/tagInfo.Share, now), + reservationIdx: invalidIndex, + limitIdx: invalidIndex, + sharesIdx: invalidIndex, + readyIdx: invalidIndex, + scheduled: make(chan struct{}), + canceled: make(chan struct{}), + } + if tagInfo.ReservedIOPS != nil { + r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now) + } else { + r.reservation = undefinedReservation + } + + if tagInfo.LimitIOPS != nil { + r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now) + } else { + r.limit = max(prev.limit, now) + } + + q.previous[tag] = r + if tagInfo.ReservedIOPS != nil { + heap.Push(q.reservationQueue, &reservationMQueueItem{r: r}) + } + heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) + heap.Push(q.limitQueue, &limitMQueueItem{r: r}) + q.scheduleRequestUnsafe() + + return r, q.requestCompleted, nil +} + +func (q *MClock) adjustTags(now float64, idleTag string) { + if q.sharesQueue.Len() == 0 { + return + } + minShare := q.sharesQueue.items[0].ts() + for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed + limitItem := item.(*limitMQueueItem) + if limitItem.r.tag == idleTag { + continue + } + limitItem.r.shares -= (minShare - now) + if limitItem.r.sharesIdx != invalidIndex { + heap.Fix(q.sharesQueue, limitItem.r.sharesIdx) + } + if limitItem.r.readyIdx != invalidIndex { + heap.Fix(q.readyQueue, limitItem.r.readyIdx) + } + } +} + +func (q *MClock) scheduleRequest() { + q.mtx.Lock() + defer q.mtx.Unlock() + + if q.closed { + return + } + + q.scheduleRequestUnsafe() +} + +func (q *MClock) scheduleRequestUnsafe() { + if q.inProgress >= q.runLimit { + return + } + now := q.clock.now() + q.scheduleByReservation(now) + if q.inProgress >= q.runLimit { + return + } + q.scheduleByLimitAndWeight(now) + if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) { + return + } + q.setNextScheduleTimer(now) +} + +func (q *MClock) setNextScheduleTimer(now float64) { + nextTs := math.MaxFloat64 + var hasNext bool + if q.reservationQueue.Len() > 0 { + nextTs = q.reservationQueue.items[0].ts() + hasNext = true + } + if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { + nextTs = q.limitQueue.items[0].ts() + hasNext = true + } + if nextTs <= now { + // should not happen as we always compare .ts() <= now + return + } + if !hasNext { + return + } + q.clock.runAt(nextTs, q.scheduleRequest) +} + +func (q *MClock) scheduleByLimitAndWeight(now float64) { + for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 { + ready := heap.Pop(q.limitQueue).(*limitMQueueItem) + heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) + } + + for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 { + next := heap.Pop(q.readyQueue).(*readyMQueueItem) + hadReservation := false + if next.r.reservationIdx != invalidIndex { + hadReservation = true + heap.Remove(q.reservationQueue, next.r.reservationIdx) + } + q.removeFromQueues(next.r) + + tagInfo, ok := q.tagInfo[next.r.tag] + assert.Cond(ok, "unknown tag:", next.r.tag) + if tagInfo.ReservedIOPS != nil && hadReservation { + var updated bool + for _, i := range q.reservationQueue.items { + ri := i.(*reservationMQueueItem) + if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation { + ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS + updated = true + } + } + if updated { + heap.Init(q.reservationQueue) + } + } + + select { + case <-next.r.canceled: + continue + default: + } + + assertIndexInvalid(next.r) + q.inProgress++ + close(next.r.scheduled) + } +} + +func (q *MClock) scheduleByReservation(now float64) { + for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 { + next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) + q.removeFromQueues(next.r) + + select { + case <-next.r.canceled: + continue + default: + } + + assertIndexInvalid(next.r) + q.inProgress++ + close(next.r.scheduled) + } +} + +func (q *MClock) removeFromQueues(r *request) { + if r.limitIdx != invalidIndex { + heap.Remove(q.limitQueue, r.limitIdx) + } + if r.sharesIdx != invalidIndex { + heap.Remove(q.sharesQueue, r.sharesIdx) + } + if r.readyIdx != invalidIndex { + heap.Remove(q.readyQueue, r.readyIdx) + } + if r.reservationIdx != invalidIndex { + heap.Remove(q.reservationQueue, r.reservationIdx) + } +} + +func (q *MClock) requestCompleted() { + q.mtx.Lock() + defer q.mtx.Unlock() + + if q.closed { + return + } + + assert.Cond(q.inProgress > 0, "invalid requests count") + q.inProgress-- + q.scheduleRequestUnsafe() +} + +func assertIndexInvalid(r *request) { + assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1") + assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1") + assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1") + assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1") +} diff --git a/scheduling/mclock_bench.result b/scheduling/mclock_bench.result new file mode 100644 index 0000000..bdd4834 --- /dev/null +++ b/scheduling/mclock_bench.result @@ -0,0 +1,172 @@ +Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -tags integration -bench ^BenchmarkMClock$ git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling -count=1 + +goos: linux +goarch: amd64 +pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling +cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz +BenchmarkMClock/impl=noop/parallelism=1-8 8623 136817 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=1-8 7368 140674 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=2-8 8486 140394 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=4-8 8500 141410 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=8-8 8268 142724 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=16-8 8431 142548 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=1-8 8505 142035 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=2-8 7845 142658 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=4-8 8473 140029 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=8-8 8518 142607 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=16-8 8578 141002 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=1-8 8557 141858 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=2-8 8353 142742 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=4-8 8475 142753 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=8-8 8433 141319 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=16-8 8480 141825 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=1-8 7827 141525 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=2-8 7935 140939 ns/op 370 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=4-8 8472 140988 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=8-8 8373 142260 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=16-8 8383 142239 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=1-8 5727 206852 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=2-8 6516 178739 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=4-8 7300 163438 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=8-8 7807 152344 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=16-8 8443 147051 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=1-8 6062 205018 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=2-8 6526 182511 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=4-8 7341 163028 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=8-8 7930 153741 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=16-8 7804 148216 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=1-8 5485 207763 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=2-8 5774 181830 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=4-8 7262 165102 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=8-8 7231 152958 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=16-8 7849 146705 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=1-8 5275 206549 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=2-8 6115 180053 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=4-8 7264 163943 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=8-8 7810 152008 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=16-8 7875 147107 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=noop/parallelism=8-8 8589 139356 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=1-8 7916 142917 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=2-8 8392 141914 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=4-8 8444 141011 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=8-8 8419 140638 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=16-8 8473 141018 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=1-8 8487 139941 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=2-8 7938 142745 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=4-8 8522 140837 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=8-8 8431 141361 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=16-8 8390 142171 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=1-8 8449 140695 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=2-8 8467 140622 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=4-8 8460 140925 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=8-8 8487 141316 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=16-8 7876 141374 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=1-8 7887 140590 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=2-8 8328 142214 ns/op 370 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=4-8 8475 141472 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=8-8 8402 141861 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=16-8 8509 142173 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=1-8 5490 207911 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=2-8 6481 182955 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=4-8 6816 165103 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=8-8 6901 155528 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=16-8 7690 148762 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=1-8 5437 205208 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=2-8 6092 183311 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=4-8 6907 162595 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=8-8 7756 151761 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=16-8 7855 146382 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=1-8 5468 206883 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=2-8 6061 180350 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=4-8 6795 163866 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=8-8 7350 152345 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=16-8 7869 145708 ns/op 374 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=1-8 5283 207099 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=2-8 6799 180029 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=4-8 7324 164306 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=8-8 7770 152377 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=16-8 8342 146888 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=noop/parallelism=32-8 8604 140481 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=1-8 8491 142215 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=2-8 8508 140537 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=4-8 8320 142631 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=8-8 8368 142430 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=16-8 8432 141733 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=1-8 7855 141754 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=2-8 7858 141304 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=4-8 8545 140996 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=8-8 8437 142022 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=16-8 8418 142653 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=1-8 8448 141117 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=2-8 8530 142164 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=4-8 7944 142449 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=8-8 8551 139223 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=16-8 8491 140160 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=1-8 8354 141835 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=2-8 7880 141608 ns/op 370 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=4-8 7940 140794 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=8-8 8414 140646 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=16-8 8373 140890 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=1-8 5256 209447 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=2-8 6451 183969 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=4-8 7326 163980 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=8-8 7862 152768 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=16-8 8390 147437 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=1-8 5228 206086 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=2-8 6471 181844 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=4-8 7318 163604 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=8-8 7827 151880 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=16-8 8362 146623 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=1-8 5541 210639 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=2-8 5818 183541 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=4-8 6910 163609 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=8-8 7797 152752 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=16-8 7344 146966 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=1-8 5746 206651 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=2-8 6490 182702 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=4-8 7250 164727 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=8-8 7386 152508 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=16-8 8379 146547 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=noop/parallelism=64-8 8486 138281 ns/op 0 B/op 0 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=1-8 8472 142782 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=2-8 8437 140925 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=4-8 8338 141035 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=8-8 8487 142288 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=16-8 8366 142353 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=1-8 8510 140838 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=2-8 7935 142844 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=4-8 8218 139362 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=8-8 7977 140291 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=16-8 8371 140322 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=1-8 8524 140484 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=2-8 8461 142431 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=4-8 8420 141652 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=8-8 8385 140956 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=16-8 8355 142509 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=1-8 7239 141018 ns/op 371 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=2-8 8467 141807 ns/op 370 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=4-8 8420 140763 ns/op 368 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=8-8 8474 140264 ns/op 366 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=16-8 8413 142191 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=1-8 5474 208031 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=2-8 5706 182794 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=4-8 7248 165044 ns/op 364 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=8-8 7825 153229 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=16-8 7879 148568 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=1-8 5278 211267 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=2-8 6108 183247 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=4-8 7338 163152 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=8-8 7339 154054 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=16-8 7750 146000 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=1-8 5716 208259 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=2-8 6450 185159 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=4-8 7285 168077 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=8-8 7357 151950 ns/op 372 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=16-8 8257 147548 ns/op 373 B/op 9 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=1-8 5245 207383 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=2-8 6115 179041 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=4-8 6831 164377 ns/op 367 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=8-8 7378 152743 ns/op 365 B/op 8 allocs/op +BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=16-8 7837 148694 ns/op 366 B/op 8 allocs/op +PASS +ok git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling 194.532s diff --git a/scheduling/mclock_bench_test.go b/scheduling/mclock_bench_test.go new file mode 100644 index 0000000..09989e6 --- /dev/null +++ b/scheduling/mclock_bench_test.go @@ -0,0 +1,87 @@ +package scheduling + +import ( + "context" + "fmt" + "math" + "math/rand/v2" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type noopMClockScheduler struct{} + +var ( + releaseStub ReleaseFunc = func() {} + defaultLimit float64 = 100_000 + shortReservation float64 = 1 + medReservation float64 = 100 + largeReservation float64 = 10_000 +) + +func (s *noopMClockScheduler) RequestArrival(context.Context, string) ReleaseFunc { + return releaseStub +} + +func BenchmarkMClock(b *testing.B) { + tagsCount := []int{1, 2, 4, 8, 16} + ioDuration := time.Millisecond + parallelismValues := []int{1, 8, 32, 64} + limits := []*float64{nil, &defaultLimit} + reservations := []*float64{nil, &shortReservation, &medReservation, &largeReservation} + for _, parallelism := range parallelismValues { + b.SetParallelism(parallelism) + + noopMClock := &noopMClockScheduler{} + b.Run(fmt.Sprintf("impl=noop/parallelism=%d", parallelism), func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + release := noopMClock.RequestArrival(context.Background(), "tag") + time.Sleep(ioDuration) + release() + } + }) + }) + + for _, limit := range limits { + for _, reservation := range reservations { + for _, tags := range tagsCount { + tagInfos := make(map[string]TagInfo) + for tag := 0; tag < tags; tag++ { + tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{Share: 50, LimitIOPS: limit, ReservedIOPS: reservation} + } + + mClockQ, _ := NewMClock(math.MaxUint64, math.MaxUint64, tagInfos, time.Hour) + + resStr := "no" + if reservation != nil { + resStr = strconv.FormatFloat(*reservation, 'f', 1, 64) + } + limitStr := "no" + if limit != nil { + limitStr = strconv.FormatFloat(*limit, 'f', 1, 64) + } + b.Run(fmt.Sprintf("impl=mclock/limit=%s/reservation=%s/parallelism=%d/tags=%d", limitStr, resStr, parallelism, tags), func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + tag := rand.Int64N(int64(tags)) + release, err := mClockQ.RequestArrival(context.Background(), "tag"+strconv.FormatInt(int64(tag), 10)) + require.NoError(b, err) + time.Sleep(ioDuration) + release() + } + }) + }) + } + } + } + + } +} diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go new file mode 100644 index 0000000..6433990 --- /dev/null +++ b/scheduling/mclock_test.go @@ -0,0 +1,584 @@ +package scheduling + +import ( + "context" + "math" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestMClockSharesScheduling(t *testing.T) { + t.Parallel() + reqCount := 1000 + reqCount = (reqCount / 2) * 2 + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2}, + "class2": {Share: 1}, + }, 100) + require.NoError(t, err) + q.clock = &noopClock{} + + var releases []ReleaseFunc + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + tag = "class2" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + + var result []string + var wg sync.WaitGroup + for i := 0; i < reqCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-requests[i].scheduled + result = append(result, requests[i].tag) + releases[i]() + }() + } + wg.Wait() + + // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., + // because the ratio is 2 to 1. + // However, there may be deviations due to rounding and sorting. + result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail + var class1Count int + var class2Count int + var class2MaxSeq int + for _, res := range result { + switch res { + case "class1": + class1Count++ + class2MaxSeq = 0 + case "class2": + class2Count++ + class2MaxSeq++ + require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row + default: + require.Fail(t, "unknown tag") + } + } + + require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) +} + +var _ clock = &noopClock{} + +type noopClock struct { + v float64 + runAtValue *float64 +} + +func (n *noopClock) now() float64 { + return n.v +} + +func (n *noopClock) runAt(ts float64, f func()) { + n.runAtValue = &ts +} + +func (n *noopClock) close() {} + +func TestMClockRequestCancel(t *testing.T) { + t.Parallel() + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2}, + "class2": {Share: 1}, + }, 100) + require.NoError(t, err) + q.clock = &noopClock{} + + release1, err := q.RequestArrival(context.Background(), "class1") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + release2, err := q.RequestArrival(ctx, "class1") + require.Nil(t, release2) + require.ErrorIs(t, err, context.DeadlineExceeded) + + require.Equal(t, 0, q.readyQueue.Len()) + require.Equal(t, 0, q.sharesQueue.Len()) + require.Equal(t, 0, q.limitQueue.Len()) + require.Equal(t, 0, q.reservationQueue.Len()) + + release1() +} + +func TestMClockLimitScheduling(t *testing.T) { + t.Parallel() + reqCount := 100 + reqCount = (reqCount / 2) * 2 + limit := 1.0 + cl := &noopClock{} + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2, LimitIOPS: &limit}, + "class2": {Share: 1, LimitIOPS: &limit}, + }, 100) + require.NoError(t, err) + q.clock = cl + + var releases []ReleaseFunc + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + tag = "class2" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + + q.scheduleRequest() + + for _, req := range requests { + select { + case <-req.scheduled: + require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") + default: + } + } + + cl.v = math.MaxFloat64 + + var result []string + var wg sync.WaitGroup + for i := 0; i < reqCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-requests[i].scheduled + result = append(result, requests[i].tag) + releases[i]() + }() + } + q.scheduleRequest() + wg.Wait() + + // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., + // because the ratio is 2 to 1. + // However, there may be deviations due to rounding and sorting. + result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail + var class1Count int + var class2Count int + var class2MaxSeq int + for _, res := range result { + switch res { + case "class1": + class1Count++ + class2MaxSeq = 0 + case "class2": + class2Count++ + class2MaxSeq++ + require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row + default: + require.Fail(t, "unknown tag") + } + } + + require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) + + require.Equal(t, 0, q.readyQueue.Len()) + require.Equal(t, 0, q.sharesQueue.Len()) + require.Equal(t, 0, q.limitQueue.Len()) + require.Equal(t, 0, q.reservationQueue.Len()) +} + +func TestMClockReservationScheduling(t *testing.T) { + t.Parallel() + reqCount := 1000 + reqCount = (reqCount / 2) * 2 + limit := 0.01 // 1 request in 100 seconds + resevation := 100.0 // 100 RPS + cl := &noopClock{v: float64(1.0)} + q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 2, LimitIOPS: &limit}, + "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, + }, 100) + require.NoError(t, err) + q.clock = cl + + var releases []ReleaseFunc + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + tag = "class2" + for i := 0; i < reqCount/2; i++ { + req, release, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + releases = append(releases, release) + } + + q.scheduleRequest() + + count := 0 + for _, req := range requests { + select { + case <-req.scheduled: + require.Equal(t, req.tag, "class2") + count++ + default: + } + } + require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled") + + cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy + q.scheduleRequest() + + var result []string + for i, req := range requests { + select { + case <-req.scheduled: + result = append(result, requests[i].tag) + releases[i]() + default: + } + } + + require.Equal(t, 200, len(result)) + for _, res := range result { + require.Equal(t, "class2", res) + } + + cl.v = math.MaxFloat64 + q.scheduleRequest() + + require.Equal(t, 0, q.readyQueue.Len()) + require.Equal(t, 0, q.sharesQueue.Len()) + require.Equal(t, 0, q.limitQueue.Len()) + require.Equal(t, 0, q.reservationQueue.Len()) +} + +func TestMClockIdleTag(t *testing.T) { + t.Parallel() + reqCount := 100 + idleTimeout := 2 * time.Second + cl := &noopClock{} + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 1}, + "class2": {Share: 1}, + }, idleTimeout) + require.NoError(t, err) + q.clock = cl + + var requests []*request + tag := "class1" + for i := 0; i < reqCount/2; i++ { + cl.v += idleTimeout.Seconds() / 2 + req, _, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + } + + // class1 requests have shares [1.0; 2.0; 3.0; ... ] + + cl.v += 2 * idleTimeout.Seconds() + + tag = "class2" + req, _, err := q.pushRequest(tag) + require.NoError(t, err) + requests = append(requests, req) + + // class2 must be defined as idle, so all shares tags must be adjusted. + + for _, req := range requests { + select { + case <-req.scheduled: + default: + require.True(t, req.shares >= cl.v) + } + } +} + +func TestMClockClose(t *testing.T) { + t.Parallel() + q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.NoError(t, err) + q.clock = &noopClock{} + + requestRunning := make(chan struct{}) + checkDone := make(chan struct{}) + eg, ctx := errgroup.WithContext(context.Background()) + tag := "class1" + eg.Go(func() error { + release, err := q.RequestArrival(ctx, tag) + if err != nil { + return err + } + defer release() + close(requestRunning) + <-checkDone + return nil + }) + <-requestRunning + + eg.Go(func() error { + release, err := q.RequestArrival(ctx, tag) + require.Nil(t, release) + require.ErrorIs(t, err, ErrMClockSchedulerClosed) + return nil + }) + + // wait until second request will be blocked on wait + for q.waitingCount() == 0 { + time.Sleep(1 * time.Second) + } + + q.Close() + + release, err := q.RequestArrival(context.Background(), tag) + require.Nil(t, release) + require.ErrorIs(t, err, ErrMClockSchedulerClosed) + + close(checkDone) + + require.NoError(t, eg.Wait()) +} + +func TestMClockWaitLimit(t *testing.T) { + t.Parallel() + q, err := NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.NoError(t, err) + q.clock = &noopClock{} + defer q.Close() + + requestRunning := make(chan struct{}) + checkDone := make(chan struct{}) + eg, ctx := errgroup.WithContext(context.Background()) + tag := "class1" + // running request + eg.Go(func() error { + release, err := q.RequestArrival(ctx, tag) + if err != nil { + return err + } + defer release() + close(requestRunning) + <-checkDone + return nil + }) + + // waiting request + eg.Go(func() error { + <-requestRunning + release, err := q.RequestArrival(ctx, tag) + require.NotNil(t, release) + require.NoError(t, err) + defer release() + <-checkDone + return nil + }) + + // wait until second request will be waiting + for q.waitingCount() == 0 { + time.Sleep(1 * time.Second) + } + + release, err := q.RequestArrival(ctx, tag) + require.Nil(t, release) + require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded) + + close(checkDone) + require.NoError(t, eg.Wait()) +} + +func TestMClockParameterValidation(t *testing.T) { + _, err := NewMClock(0, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidRunLimit) + _, err = NewMClock(1, 0, map[string]TagInfo{ + "class1": {Share: 1}, + }, 1000) + require.NoError(t, err) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, -1.0) + require.NoError(t, err) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1}, + }, 0) + require.NoError(t, err) + negativeValue := -1.0 + zeroValue := float64(0) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: negativeValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: zeroValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, ReservedIOPS: &zeroValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, ReservedIOPS: &negativeValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, LimitIOPS: &zeroValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) + _, err = NewMClock(1, 1, map[string]TagInfo{ + "class1": {Share: 1.0, LimitIOPS: &negativeValue}, + }, 1000) + require.ErrorIs(t, err, ErrInvalidTagInfo) +} + +func (q *MClock) waitingCount() int { + q.mtx.Lock() + defer q.mtx.Unlock() + + return q.sharesQueue.Len() +} + +func TestMClockTimeBasedSchedule(t *testing.T) { + t.Parallel() + limit := 1.0 // 1 request per second allowed + cl := &noopClock{v: float64(1.5)} + q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{ + "class1": {Share: 1, LimitIOPS: &limit}, + }, 100) + require.NoError(t, err) + defer q.Close() + q.clock = cl + + running := make(chan struct{}) + checked := make(chan struct{}) + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + defer release() + close(running) + <-checked + return nil + }) + + <-running + // request must be scheduled at 2.0 + _, _, err = q.pushRequest("class1") + require.NoError(t, err) + require.NotNil(t, cl.runAtValue) + require.Equal(t, cl.v+1.0/limit, *cl.runAtValue) + close(checked) + require.NoError(t, eg.Wait()) +} + +func TestMClockLowLimit(t *testing.T) { + t.Parallel() + limit := 2.0 + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(5) + eg.Go(func() error { + for range 3 { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + release() + } + return nil + }) + require.NoError(t, eg.Wait()) +} + +func TestMClockLimitTotalTime(t *testing.T) { + t.Parallel() + limit := 10.0 // 10 RPS -> 1 request per 100 ms + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + // 10 requests, each request runs for 500 ms, + // but they should be scheduled as soon as possible, + // so total duration must be less than 1 second + eg, ctx := errgroup.WithContext(context.Background()) + startedAt := time.Now() + for range 10 { + eg.Go(func() error { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + release() + return nil + }) + } + require.NoError(t, eg.Wait()) + require.True(t, time.Since(startedAt) <= 1*time.Second) + + // 11 requests, limit = 10 RPS, so 10 requests should be + // scheduled as soon as possible, but last request should be + // scheduled at now + 1.0 s + eg, ctx = errgroup.WithContext(context.Background()) + startedAt = time.Now() + for range 11 { + eg.Go(func() error { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + release() + return nil + }) + } + require.NoError(t, eg.Wait()) + require.True(t, time.Since(startedAt) >= 1500*time.Millisecond) + require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests +} + +func TestMClockRestictTagRequests(t *testing.T) { + t.Parallel() + limit := 10.0 + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + "class2": {Share: 50, LimitIOPS: &limit, Prohibited: true}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + release, err := q.RequestArrival(context.Background(), "class1") + require.NoError(t, err) + release() + + release, err = q.RequestArrival(context.Background(), "class2") + require.ErrorIs(t, err, ErrTagRequestsProhibited) + require.Nil(t, release) +} diff --git a/scheduling/queue.go b/scheduling/queue.go new file mode 100644 index 0000000..12dd44a --- /dev/null +++ b/scheduling/queue.go @@ -0,0 +1,100 @@ +package scheduling + +type queueItem interface { + ts() float64 + setIndex(idx int) +} + +type queue struct { + items []queueItem +} + +// Len implements heap.Interface. +func (q *queue) Len() int { + return len(q.items) +} + +// Less implements heap.Interface. +func (q *queue) Less(i int, j int) bool { + return q.items[i].ts() < q.items[j].ts() +} + +// Pop implements heap.Interface. +func (q *queue) Pop() any { + n := len(q.items) + item := q.items[n-1] + q.items[n-1] = nil + q.items = q.items[0 : n-1] + item.setIndex(invalidIndex) + return item +} + +// Push implements heap.Interface. +func (q *queue) Push(x any) { + it := x.(queueItem) + it.setIndex(q.Len()) + q.items = append(q.items, it) +} + +// Swap implements heap.Interface. +func (q *queue) Swap(i int, j int) { + q.items[i], q.items[j] = q.items[j], q.items[i] + q.items[i].setIndex(i) + q.items[j].setIndex(j) +} + +var _ queueItem = &reservationMQueueItem{} + +type reservationMQueueItem struct { + r *request +} + +func (i *reservationMQueueItem) ts() float64 { + return i.r.reservation +} + +func (i *reservationMQueueItem) setIndex(idx int) { + i.r.reservationIdx = idx +} + +var _ queueItem = &limitMQueueItem{} + +type limitMQueueItem struct { + r *request +} + +func (i *limitMQueueItem) ts() float64 { + return i.r.limit +} + +func (i *limitMQueueItem) setIndex(idx int) { + i.r.limitIdx = idx +} + +var _ queueItem = &sharesMQueueItem{} + +type sharesMQueueItem struct { + r *request +} + +func (i *sharesMQueueItem) ts() float64 { + return i.r.shares +} + +func (i *sharesMQueueItem) setIndex(idx int) { + i.r.sharesIdx = idx +} + +var _ queueItem = &readyMQueueItem{} + +type readyMQueueItem struct { + r *request +} + +func (i *readyMQueueItem) ts() float64 { + return i.r.shares +} + +func (i *readyMQueueItem) setIndex(idx int) { + i.r.readyIdx = idx +} diff --git a/tagging/context.go b/tagging/context.go new file mode 100644 index 0000000..3b6c113 --- /dev/null +++ b/tagging/context.go @@ -0,0 +1,21 @@ +package tagging + +import "context" + +type tagContextKeyType struct{} + +var currentTagKey = tagContextKeyType{} + +func ContextWithIOTag(parent context.Context, ioTag string) context.Context { + return context.WithValue(parent, currentTagKey, ioTag) +} + +func IOTagFromContext(ctx context.Context) (string, bool) { + if ctx == nil { + panic("context must be non nil") + } + if tag, ok := ctx.Value(currentTagKey).(string); ok { + return tag, true + } + return "", false +} diff --git a/tagging/context_test.go b/tagging/context_test.go new file mode 100644 index 0000000..b13b253 --- /dev/null +++ b/tagging/context_test.go @@ -0,0 +1,23 @@ +package tagging + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestContext(t *testing.T) { + ctx := context.Background() + tag, ok := IOTagFromContext(ctx) + require.False(t, ok) + require.Equal(t, "", tag) + ctx = ContextWithIOTag(ctx, "tag1") + tag, ok = IOTagFromContext(ctx) + require.True(t, ok) + require.Equal(t, "tag1", tag) + ctx = ContextWithIOTag(ctx, "tag2") + tag, ok = IOTagFromContext(ctx) + require.True(t, ok) + require.Equal(t, "tag2", tag) +} diff --git a/tagging/grpc.go b/tagging/grpc.go new file mode 100644 index 0000000..4e2fcfe --- /dev/null +++ b/tagging/grpc.go @@ -0,0 +1,96 @@ +package tagging + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + ioTagHeader = "x-frostfs-io-tag" +) + +// NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. +func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) + } +} + +// NewStreamClientInterceptor creates new gRPC stream interceptor to set an IO tag to gRPC metadata. +func NewStreamClientInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return streamer(setIOTagToGRPCMetadata(ctx), desc, cc, method, opts...) + } +} + +// NewUnaryServerInterceptor creates new gRPC unary interceptor to extract an IO tag to gRPC metadata. +func NewUnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return handler(extractIOTagFromGRPCMetadata(ctx), req) + } +} + +// NewStreamServerInterceptor creates new gRPC stream interceptor to extract an IO tag to gRPC metadata. +func NewStreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, &serverStream{origin: ss}) + } +} + +func setIOTagToGRPCMetadata(ctx context.Context) context.Context { + ioTag, ok := IOTagFromContext(ctx) + if !ok { + return ctx + } + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.MD{} + } + md.Set(ioTagHeader, ioTag) + return metadata.NewOutgoingContext(ctx, md) +} + +func extractIOTagFromGRPCMetadata(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return ctx + } + + values := md.Get(ioTagHeader) + if len(values) > 0 { + return ContextWithIOTag(ctx, values[0]) + } + return ctx +} + +var _ grpc.ServerStream = &serverStream{} + +type serverStream struct { + origin grpc.ServerStream +} + +func (s *serverStream) Context() context.Context { + return extractIOTagFromGRPCMetadata(s.origin.Context()) +} + +func (s *serverStream) RecvMsg(m any) error { + return s.origin.RecvMsg(m) +} + +func (s *serverStream) SendHeader(md metadata.MD) error { + return s.origin.SendHeader(md) +} + +func (s *serverStream) SendMsg(m any) error { + return s.origin.SendMsg(m) +} + +func (s *serverStream) SetHeader(md metadata.MD) error { + return s.origin.SetHeader(md) +} + +func (s *serverStream) SetTrailer(md metadata.MD) { + s.origin.SetTrailer(md) +}