Compare commits
No commits in common. "master" and "fix/patch/close_panic" have entirely different histories.
master
...
fix/patch/
59 changed files with 235 additions and 1429 deletions
|
@ -40,6 +40,8 @@ morph:
|
|||
- address: wss://{{.}}/ws{{end}}
|
||||
{{if not .Relay }}
|
||||
storage:
|
||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||
|
||||
shard:
|
||||
default: # section with the default shard parameters
|
||||
metabase:
|
||||
|
|
|
@ -1,118 +0,0 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
object "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/modules/object"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const (
|
||||
FullInfoFlag = "full"
|
||||
FullInfoFlagUsage = "Print full ShardInfo."
|
||||
)
|
||||
|
||||
var locateObjectCmd = &cobra.Command{
|
||||
Use: "locate-object",
|
||||
Short: "List shards storing the object",
|
||||
Long: "List shards storing the object",
|
||||
Run: locateObject,
|
||||
}
|
||||
|
||||
func initControlLocateObjectCmd() {
|
||||
initControlFlags(locateObjectCmd)
|
||||
|
||||
flags := locateObjectCmd.Flags()
|
||||
|
||||
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
_ = locateObjectCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||
|
||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||
_ = locateObjectCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||
|
||||
flags.Bool(commonflags.JSON, false, "Print shard info as a JSON array. Requires --full flag.")
|
||||
flags.Bool(FullInfoFlag, false, FullInfoFlagUsage)
|
||||
}
|
||||
|
||||
func locateObject(cmd *cobra.Command, _ []string) {
|
||||
var cnr cid.ID
|
||||
var obj oid.ID
|
||||
|
||||
_ = object.ReadObjectAddress(cmd, &cnr, &obj)
|
||||
|
||||
pk := key.Get(cmd)
|
||||
|
||||
body := new(control.ListShardsForObjectRequest_Body)
|
||||
body.SetContainerId(cnr.EncodeToString())
|
||||
body.SetObjectId(obj.EncodeToString())
|
||||
req := new(control.ListShardsForObjectRequest)
|
||||
req.SetBody(body)
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var err error
|
||||
var resp *control.ListShardsForObjectResponse
|
||||
err = cli.ExecRaw(func(client *rawclient.Client) error {
|
||||
resp, err = control.ListShardsForObject(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
shardIDs := resp.GetBody().GetShard_ID()
|
||||
|
||||
isFull, _ := cmd.Flags().GetBool(FullInfoFlag)
|
||||
if !isFull {
|
||||
for _, id := range shardIDs {
|
||||
cmd.Println(base58.Encode(id))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// get full shard info
|
||||
listShardsReq := new(control.ListShardsRequest)
|
||||
listShardsReq.SetBody(new(control.ListShardsRequest_Body))
|
||||
signRequest(cmd, pk, listShardsReq)
|
||||
var listShardsResp *control.ListShardsResponse
|
||||
err = cli.ExecRaw(func(client *rawclient.Client) error {
|
||||
listShardsResp, err = control.ListShards(client, listShardsReq)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, listShardsResp.GetSignature(), listShardsResp.GetBody())
|
||||
|
||||
shards := listShardsResp.GetBody().GetShards()
|
||||
sortShardsByID(shards)
|
||||
shards = filterShards(shards, shardIDs)
|
||||
|
||||
isJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
|
||||
if isJSON {
|
||||
prettyPrintShardsJSON(cmd, shards)
|
||||
} else {
|
||||
prettyPrintShards(cmd, shards)
|
||||
}
|
||||
}
|
||||
|
||||
func filterShards(info []control.ShardInfo, ids [][]byte) []control.ShardInfo {
|
||||
var res []control.ShardInfo
|
||||
for _, id := range ids {
|
||||
for _, inf := range info {
|
||||
if bytes.Equal(inf.Shard_ID, id) {
|
||||
res = append(res, inf)
|
||||
}
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
|
@ -39,7 +39,6 @@ func init() {
|
|||
listRulesCmd,
|
||||
getRuleCmd,
|
||||
listTargetsCmd,
|
||||
locateObjectCmd,
|
||||
)
|
||||
|
||||
initControlHealthCheckCmd()
|
||||
|
@ -53,5 +52,4 @@ func init() {
|
|||
initControlListRulesCmd()
|
||||
initControGetRuleCmd()
|
||||
initControlListTargetsCmd()
|
||||
initControlLocateObjectCmd()
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func deleteObject(cmd *cobra.Command, _ []string) {
|
|||
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("required flag \"%s\" not set", commonflags.OIDFlag))
|
||||
}
|
||||
|
||||
objAddr = ReadObjectAddress(cmd, &cnr, &obj)
|
||||
objAddr = readObjectAddress(cmd, &cnr, &obj)
|
||||
}
|
||||
|
||||
pk := key.GetOrGenerate(cmd)
|
||||
|
|
|
@ -46,7 +46,7 @@ func getObject(cmd *cobra.Command, _ []string) {
|
|||
var cnr cid.ID
|
||||
var obj oid.ID
|
||||
|
||||
objAddr := ReadObjectAddress(cmd, &cnr, &obj)
|
||||
objAddr := readObjectAddress(cmd, &cnr, &obj)
|
||||
|
||||
filename := cmd.Flag(fileFlag).Value.String()
|
||||
out, closer := createOutWriter(cmd, filename)
|
||||
|
|
|
@ -52,7 +52,7 @@ func getObjectHash(cmd *cobra.Command, _ []string) {
|
|||
var cnr cid.ID
|
||||
var obj oid.ID
|
||||
|
||||
objAddr := ReadObjectAddress(cmd, &cnr, &obj)
|
||||
objAddr := readObjectAddress(cmd, &cnr, &obj)
|
||||
|
||||
ranges, err := getRangeList(cmd)
|
||||
commonCmd.ExitOnErr(cmd, "", err)
|
||||
|
|
|
@ -47,7 +47,7 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
|
|||
var cnr cid.ID
|
||||
var obj oid.ID
|
||||
|
||||
objAddr := ReadObjectAddress(cmd, &cnr, &obj)
|
||||
objAddr := readObjectAddress(cmd, &cnr, &obj)
|
||||
pk := key.GetOrGenerate(cmd)
|
||||
|
||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||
|
|
|
@ -101,7 +101,7 @@ func initObjectNodesCmd() {
|
|||
func objectNodes(cmd *cobra.Command, _ []string) {
|
||||
var cnrID cid.ID
|
||||
var objID oid.ID
|
||||
ReadObjectAddress(cmd, &cnrID, &objID)
|
||||
readObjectAddress(cmd, &cnrID, &objID)
|
||||
|
||||
pk := key.GetOrGenerate(cmd)
|
||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||
|
|
|
@ -56,7 +56,7 @@ func patch(cmd *cobra.Command, _ []string) {
|
|||
var cnr cid.ID
|
||||
var obj oid.ID
|
||||
|
||||
objAddr := ReadObjectAddress(cmd, &cnr, &obj)
|
||||
objAddr := readObjectAddress(cmd, &cnr, &obj)
|
||||
|
||||
ranges, err := getRangeSlice(cmd)
|
||||
commonCmd.ExitOnErr(cmd, "", err)
|
||||
|
|
|
@ -47,7 +47,7 @@ func getObjectRange(cmd *cobra.Command, _ []string) {
|
|||
var cnr cid.ID
|
||||
var obj oid.ID
|
||||
|
||||
objAddr := ReadObjectAddress(cmd, &cnr, &obj)
|
||||
objAddr := readObjectAddress(cmd, &cnr, &obj)
|
||||
|
||||
ranges, err := getRangeList(cmd)
|
||||
commonCmd.ExitOnErr(cmd, "", err)
|
||||
|
|
|
@ -74,7 +74,7 @@ func parseXHeaders(cmd *cobra.Command) []string {
|
|||
return xs
|
||||
}
|
||||
|
||||
func ReadObjectAddress(cmd *cobra.Command, cnr *cid.ID, obj *oid.ID) oid.Address {
|
||||
func readObjectAddress(cmd *cobra.Command, cnr *cid.ID, obj *oid.ID) oid.Address {
|
||||
readCID(cmd, cnr)
|
||||
readOID(cmd, obj)
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ func _client() (tree.TreeServiceClient, error) {
|
|||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
grpc.WithDisableServiceConfig(),
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
||||
|
|
|
@ -117,6 +117,7 @@ type applicationConfiguration struct {
|
|||
|
||||
EngineCfg struct {
|
||||
errorThreshold uint32
|
||||
shardPoolSize uint32
|
||||
shards []shardCfg
|
||||
lowMem bool
|
||||
}
|
||||
|
@ -249,6 +250,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
// Storage Engine
|
||||
|
||||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||
|
||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||
|
@ -891,6 +893,7 @@ func (c *cfg) engineOpts() []engine.Option {
|
|||
var opts []engine.Option
|
||||
|
||||
opts = append(opts,
|
||||
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
||||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||
engine.WithLogger(c.log),
|
||||
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||
|
@ -1045,7 +1048,6 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
|||
}
|
||||
if c.metricsCollector != nil {
|
||||
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
|
||||
shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics())
|
||||
}
|
||||
|
||||
var sh shardOptsWithID
|
||||
|
|
|
@ -12,10 +12,13 @@ import (
|
|||
func TestConfigDir(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
cfgFileName := path.Join(dir, "cfg_01.yml")
|
||||
cfgFileName0 := path.Join(dir, "cfg_00.json")
|
||||
cfgFileName1 := path.Join(dir, "cfg_01.yml")
|
||||
|
||||
require.NoError(t, os.WriteFile(cfgFileName, []byte("logger:\n level: debug"), 0o777))
|
||||
require.NoError(t, os.WriteFile(cfgFileName0, []byte(`{"storage":{"shard_pool_size":15}}`), 0o777))
|
||||
require.NoError(t, os.WriteFile(cfgFileName1, []byte("logger:\n level: debug"), 0o777))
|
||||
|
||||
c := New("", dir, "")
|
||||
require.Equal(t, "debug", cast.ToString(c.Sub("logger").Value("level")))
|
||||
require.EqualValues(t, 15, cast.ToUint32(c.Sub("storage").Value("shard_pool_size")))
|
||||
}
|
||||
|
|
|
@ -11,6 +11,10 @@ import (
|
|||
|
||||
const (
|
||||
subsection = "storage"
|
||||
|
||||
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
||||
// process object PUT operations in a storage engine.
|
||||
ShardPoolSizeDefault = 20
|
||||
)
|
||||
|
||||
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
||||
|
@ -61,6 +65,18 @@ func IterateShards(c *config.Config, required bool, f func(*shardconfig.Config)
|
|||
return nil
|
||||
}
|
||||
|
||||
// ShardPoolSize returns the value of "shard_pool_size" config parameter from "storage" section.
|
||||
//
|
||||
// Returns ShardPoolSizeDefault if the value is not a positive number.
|
||||
func ShardPoolSize(c *config.Config) uint32 {
|
||||
v := config.Uint32Safe(c.Sub(subsection), "shard_pool_size")
|
||||
if v > 0 {
|
||||
return v
|
||||
}
|
||||
|
||||
return ShardPoolSizeDefault
|
||||
}
|
||||
|
||||
// ShardErrorThreshold returns the value of "shard_ro_error_threshold" config parameter from "storage" section.
|
||||
//
|
||||
// Returns 0 if the the value is missing.
|
||||
|
|
|
@ -54,6 +54,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.False(t, handlerCalled)
|
||||
|
||||
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
|
||||
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
||||
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
|
||||
})
|
||||
|
||||
|
@ -63,6 +64,7 @@ func TestEngineSection(t *testing.T) {
|
|||
num := 0
|
||||
|
||||
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
|
||||
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
|
||||
|
||||
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
|
||||
defer func() {
|
||||
|
|
|
@ -47,7 +47,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
}
|
||||
ioTag, err := qos.FromRawString(rawTag)
|
||||
if err != nil {
|
||||
s.logger.Debug(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,6 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
return ctx
|
||||
}
|
||||
}
|
||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
case qos.IOTagInternal:
|
||||
for _, pk := range s.allowedInternalPubs {
|
||||
|
@ -88,10 +87,9 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
return ctx
|
||||
}
|
||||
}
|
||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
default:
|
||||
s.logger.Debug(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,6 +97,7 @@ FROSTFS_RPC_LIMITS_1_METHODS="/neo.fs.v2.object.ObjectService/Get"
|
|||
FROSTFS_RPC_LIMITS_1_MAX_OPS=10000
|
||||
|
||||
# Storage engine section
|
||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||
## 0 shard
|
||||
### Flag to refill Metabase from BlobStor
|
||||
|
|
|
@ -158,6 +158,7 @@
|
|||
]
|
||||
},
|
||||
"storage": {
|
||||
"shard_pool_size": 15,
|
||||
"shard_ro_error_threshold": 100,
|
||||
"shard": {
|
||||
"0": {
|
||||
|
|
|
@ -135,6 +135,7 @@ rpc:
|
|||
|
||||
storage:
|
||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
||||
|
||||
shard:
|
||||
|
|
|
@ -170,6 +170,7 @@ Local storage engine configuration.
|
|||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
||||
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
||||
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
||||
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
|
||||
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
||||
|
|
8
go.mod
8
go.mod
|
@ -4,15 +4,15 @@ go 1.22
|
|||
|
||||
require (
|
||||
code.gitea.io/sdk/gitea v0.17.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1-0.20241205083807-762d7f9f9f08
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/VictoriaMetrics/easyproto v0.1.4
|
||||
|
|
16
go.sum
16
go.sum
|
@ -1,25 +1,25 @@
|
|||
code.gitea.io/sdk/gitea v0.17.1 h1:3jCPOG2ojbl8AcfaUCRYLT5MUcBMFwS0OSK2mA5Zok8=
|
||||
code.gitea.io/sdk/gitea v0.17.1/go.mod h1:aCnBqhHpoEWA180gMbaCtdX9Pl6BWBAuuP2miadoTNM=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1 h1:k1Qw8dWUQczfo0eVXlhrq9eXEbUMyDLW8jEMzY+gxMc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1/go.mod h1:5fSm/l5xSjGWqsPUffSdboiGFUHa7y/1S0fvxzQowN8=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1-0.20241205083807-762d7f9f9f08 h1:tl1TT+zNk1lF/J5EaD3syDrTaYbQwvJKVOVENM4oQ+k=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.21.1-0.20241205083807-762d7f9f9f08/go.mod h1:5fSm/l5xSjGWqsPUffSdboiGFUHa7y/1S0fvxzQowN8=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d h1:uJ/wvuMdepbkaV8XMS5uN9B0FQWMep0CttSuDZiDhq0=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529 h1:CBreXSxGoYJAdZ1QdJPsDs1UCXGF5psinII0lxtohsc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250310135838-3e7ca9403529/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9 h1:svCl6NDAPZ/KuQPjdVKo74RkCIANesxUPM45zQZDhSw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250306092416-69b0711d12d9/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3 h1:QnAt5b2R6+hQthMOIn5ECfLAlVD8IAE5JRm1NCCOmuE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972/go.mod h1:2hM42MBrlhvN6XToaW6OWNk5ZLcu1FhaukGgxtfpDDI=
|
||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 h1:gPaqGsk6gSWQyNVjaStydfUz6Z/loHc9XyvGrJ5qSPY=
|
||||
git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b h1:M50kdfrf/h8c3cz0bJ2AEUcbXvAlPFVC1Wp1WkfZ/8E=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b/go.mod h1:GZTk55RI4dKzsK6BCn5h2xxE28UHNfgoq/NJxW/LQ6A=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc=
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA=
|
||||
|
|
|
@ -252,7 +252,6 @@ const (
|
|||
ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage"
|
||||
ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects"
|
||||
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
|
||||
ShardCouldNotFindObject = "could not find object"
|
||||
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
|
||||
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
|
||||
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
|
||||
|
@ -513,5 +512,4 @@ const (
|
|||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||
)
|
||||
|
|
|
@ -23,7 +23,6 @@ const (
|
|||
policerSubsystem = "policer"
|
||||
commonCacheSubsystem = "common_cache"
|
||||
multinetSubsystem = "multinet"
|
||||
qosSubsystem = "qos"
|
||||
|
||||
successLabel = "success"
|
||||
shardIDLabel = "shard_id"
|
||||
|
@ -44,7 +43,6 @@ const (
|
|||
hitLabel = "hit"
|
||||
cacheLabel = "cache"
|
||||
sourceIPLabel = "source_ip"
|
||||
ioTagLabel = "io_tag"
|
||||
|
||||
readWriteMode = "READ_WRITE"
|
||||
readOnlyMode = "READ_ONLY"
|
||||
|
|
|
@ -26,7 +26,6 @@ type NodeMetrics struct {
|
|||
morphCache *morphCacheMetrics
|
||||
log logger.LogMetrics
|
||||
multinet *multinetMetrics
|
||||
qos *QoSMetrics
|
||||
// nolint: unused
|
||||
appInfo *ApplicationInfo
|
||||
}
|
||||
|
@ -56,7 +55,6 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
log: logger.NewLogMetrics(namespace),
|
||||
appInfo: NewApplicationInfo(misc.Version),
|
||||
multinet: newMultinetMetrics(namespace),
|
||||
qos: newQoSMetrics(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,7 +126,3 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
|||
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
|
||||
return m.multinet
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) QoSMetrics() *QoSMetrics {
|
||||
return m.qos
|
||||
}
|
||||
|
|
|
@ -9,14 +9,13 @@ import (
|
|||
)
|
||||
|
||||
type ObjectServiceMetrics interface {
|
||||
AddRequestDuration(method string, d time.Duration, success bool, ioTag string)
|
||||
AddRequestDuration(method string, d time.Duration, success bool)
|
||||
AddPayloadSize(method string, size int)
|
||||
}
|
||||
|
||||
type objectServiceMetrics struct {
|
||||
methodDuration *prometheus.HistogramVec
|
||||
payloadCounter *prometheus.CounterVec
|
||||
ioTagOpsCounter *prometheus.CounterVec
|
||||
methodDuration *prometheus.HistogramVec
|
||||
payloadCounter *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func newObjectServiceMetrics() *objectServiceMetrics {
|
||||
|
@ -33,24 +32,14 @@ func newObjectServiceMetrics() *objectServiceMetrics {
|
|||
Name: "request_payload_bytes",
|
||||
Help: "Object Service request payload",
|
||||
}, []string{methodLabel}),
|
||||
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: objectSubsystem,
|
||||
Name: "requests_total",
|
||||
Help: "Count of requests for each IO tag",
|
||||
}, []string{methodLabel, ioTagLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool, ioTag string) {
|
||||
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool) {
|
||||
m.methodDuration.With(prometheus.Labels{
|
||||
methodLabel: method,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
}).Observe(d.Seconds())
|
||||
m.ioTagOpsCounter.With(prometheus.Labels{
|
||||
ioTagLabel: ioTag,
|
||||
methodLabel: method,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
func (m *objectServiceMetrics) AddPayloadSize(method string, size int) {
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type QoSMetrics struct {
|
||||
opsCounter *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
func newQoSMetrics() *QoSMetrics {
|
||||
return &QoSMetrics{
|
||||
opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: qosSubsystem,
|
||||
Name: "operations_total",
|
||||
Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard",
|
||||
}, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) {
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "pending",
|
||||
}).Set(float64(pending))
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "in_progress",
|
||||
}).Set(float64(inProgress))
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "completed",
|
||||
}).Set(float64(completed))
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "resource_exhausted",
|
||||
}).Set(float64(resourceExhausted))
|
||||
}
|
||||
|
||||
func (m *QoSMetrics) Close(shardID string) {
|
||||
m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
}
|
|
@ -12,14 +12,12 @@ type TreeMetricsRegister interface {
|
|||
AddReplicateTaskDuration(time.Duration, bool)
|
||||
AddReplicateWaitDuration(time.Duration, bool)
|
||||
AddSyncDuration(time.Duration, bool)
|
||||
AddOperation(string, string)
|
||||
}
|
||||
|
||||
type treeServiceMetrics struct {
|
||||
replicateTaskDuration *prometheus.HistogramVec
|
||||
replicateWaitDuration *prometheus.HistogramVec
|
||||
syncOpDuration *prometheus.HistogramVec
|
||||
ioTagOpsCounter *prometheus.CounterVec
|
||||
}
|
||||
|
||||
var _ TreeMetricsRegister = (*treeServiceMetrics)(nil)
|
||||
|
@ -44,12 +42,6 @@ func newTreeServiceMetrics() *treeServiceMetrics {
|
|||
Name: "sync_duration_seconds",
|
||||
Help: "Duration of synchronization operations",
|
||||
}, []string{successLabel}),
|
||||
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: treeServiceSubsystem,
|
||||
Name: "requests_total",
|
||||
Help: "Count of requests for each IO tag",
|
||||
}, []string{methodLabel, ioTagLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,10 +62,3 @@ func (m *treeServiceMetrics) AddSyncDuration(d time.Duration, success bool) {
|
|||
successLabel: strconv.FormatBool(success),
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
func (m *treeServiceMetrics) AddOperation(op string, ioTag string) {
|
||||
m.ioTagOpsCounter.With(prometheus.Labels{
|
||||
ioTagLabel: ioTag,
|
||||
methodLabel: op,
|
||||
}).Inc()
|
||||
}
|
||||
|
|
|
@ -4,8 +4,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
|
@ -17,9 +15,6 @@ import (
|
|||
const (
|
||||
defaultIdleTimeout time.Duration = 0
|
||||
defaultShare float64 = 1.0
|
||||
minusOne = ^uint64(0)
|
||||
|
||||
defaultMetricsCollectTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
type ReleaseFunc scheduling.ReleaseFunc
|
||||
|
@ -27,8 +22,6 @@ type ReleaseFunc scheduling.ReleaseFunc
|
|||
type Limiter interface {
|
||||
ReadRequest(context.Context) (ReleaseFunc, error)
|
||||
WriteRequest(context.Context) (ReleaseFunc, error)
|
||||
SetParentID(string)
|
||||
SetMetrics(Metrics)
|
||||
Close()
|
||||
}
|
||||
|
||||
|
@ -41,6 +34,10 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
|||
if err := validateConfig(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
read, write := c.Read(), c.Write()
|
||||
if isNoop(read, write) {
|
||||
return noopLimiterInstance, nil
|
||||
}
|
||||
readScheduler, err := createScheduler(c.Read())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create read scheduler: %w", err)
|
||||
|
@ -49,18 +46,10 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("create write scheduler: %w", err)
|
||||
}
|
||||
l := &mClockLimiter{
|
||||
return &mClockLimiter{
|
||||
readScheduler: readScheduler,
|
||||
writeScheduler: writeScheduler,
|
||||
closeCh: make(chan struct{}),
|
||||
wg: &sync.WaitGroup{},
|
||||
readStats: createStats(),
|
||||
writeStats: createStats(),
|
||||
}
|
||||
l.shardID.Store(&shardID{})
|
||||
l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}})
|
||||
l.startMetricsCollect()
|
||||
return l, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func createScheduler(config limits.OpConfig) (scheduler, error) {
|
||||
|
@ -102,7 +91,7 @@ var (
|
|||
)
|
||||
|
||||
func NewNoopLimiter() Limiter {
|
||||
return noopLimiterInstance
|
||||
return &noopLimiter{}
|
||||
}
|
||||
|
||||
type noopLimiter struct{}
|
||||
|
@ -115,109 +104,43 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
|
|||
return releaseStub, nil
|
||||
}
|
||||
|
||||
func (n *noopLimiter) SetParentID(string) {}
|
||||
|
||||
func (n *noopLimiter) Close() {}
|
||||
|
||||
func (n *noopLimiter) SetMetrics(Metrics) {}
|
||||
|
||||
var _ Limiter = (*mClockLimiter)(nil)
|
||||
|
||||
type shardID struct {
|
||||
id string
|
||||
}
|
||||
|
||||
type mClockLimiter struct {
|
||||
readScheduler scheduler
|
||||
writeScheduler scheduler
|
||||
|
||||
readStats map[string]*stat
|
||||
writeStats map[string]*stat
|
||||
|
||||
shardID atomic.Pointer[shardID]
|
||||
metrics atomic.Pointer[metricsHolder]
|
||||
closeCh chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
return requestArrival(ctx, n.readScheduler, n.readStats)
|
||||
return requestArrival(ctx, n.readScheduler)
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
return requestArrival(ctx, n.writeScheduler, n.writeStats)
|
||||
return requestArrival(ctx, n.writeScheduler)
|
||||
}
|
||||
|
||||
func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) {
|
||||
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
tag = IOTagClient.String()
|
||||
}
|
||||
stat := getStat(tag, stats)
|
||||
stat.pending.Add(1)
|
||||
if tag == IOTagCritical.String() {
|
||||
stat.inProgress.Add(1)
|
||||
return func() {
|
||||
stat.completed.Add(1)
|
||||
}, nil
|
||||
return releaseStub, nil
|
||||
}
|
||||
rel, err := s.RequestArrival(ctx, tag)
|
||||
stat.inProgress.Add(1)
|
||||
if err != nil {
|
||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||
errors.Is(err, errSemaphoreLimitExceeded) {
|
||||
stat.resourceExhausted.Add(1)
|
||||
return nil, &apistatus.ResourceExhausted{}
|
||||
}
|
||||
stat.completed.Add(1)
|
||||
return nil, err
|
||||
}
|
||||
return func() {
|
||||
rel()
|
||||
stat.completed.Add(1)
|
||||
}, nil
|
||||
return ReleaseFunc(rel), nil
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) Close() {
|
||||
n.readScheduler.Close()
|
||||
n.writeScheduler.Close()
|
||||
close(n.closeCh)
|
||||
n.wg.Wait()
|
||||
n.metrics.Load().metrics.Close(n.shardID.Load().id)
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) SetParentID(parentID string) {
|
||||
n.shardID.Store(&shardID{id: parentID})
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) SetMetrics(m Metrics) {
|
||||
n.metrics.Store(&metricsHolder{metrics: m})
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) startMetricsCollect() {
|
||||
n.wg.Add(1)
|
||||
go func() {
|
||||
defer n.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(defaultMetricsCollectTimeout)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-n.closeCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
shardID := n.shardID.Load().id
|
||||
if shardID == "" {
|
||||
continue
|
||||
}
|
||||
metrics := n.metrics.Load().metrics
|
||||
for tag, s := range n.readStats {
|
||||
metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
|
||||
}
|
||||
for tag, s := range n.writeStats {
|
||||
metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
package qos
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
type Metrics interface {
|
||||
SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64)
|
||||
Close(shardID string)
|
||||
}
|
||||
|
||||
var _ Metrics = (*noopMetrics)(nil)
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) {
|
||||
}
|
||||
|
||||
func (n *noopMetrics) Close(string) {}
|
||||
|
||||
// stat presents limiter statistics cumulative counters.
|
||||
//
|
||||
// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`.
|
||||
type stat struct {
|
||||
completed atomic.Uint64
|
||||
pending atomic.Uint64
|
||||
resourceExhausted atomic.Uint64
|
||||
inProgress atomic.Uint64
|
||||
}
|
||||
|
||||
type metricsHolder struct {
|
||||
metrics Metrics
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
package qos
|
||||
|
||||
const unknownStatsTag = "unknown"
|
||||
|
||||
var statTags = map[string]struct{}{
|
||||
IOTagClient.String(): {},
|
||||
IOTagBackground.String(): {},
|
||||
IOTagInternal.String(): {},
|
||||
IOTagPolicer.String(): {},
|
||||
IOTagWritecache.String(): {},
|
||||
IOTagCritical.String(): {},
|
||||
unknownStatsTag: {},
|
||||
}
|
||||
|
||||
func createStats() map[string]*stat {
|
||||
result := make(map[string]*stat)
|
||||
for tag := range statTags {
|
||||
result[tag] = &stat{}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func getStat(tag string, stats map[string]*stat) *stat {
|
||||
if v, ok := stats[tag]; ok {
|
||||
return v
|
||||
}
|
||||
return stats[unknownStatsTag]
|
||||
}
|
|
@ -1,11 +1,6 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
)
|
||||
import "fmt"
|
||||
|
||||
type IOTag string
|
||||
|
||||
|
@ -42,11 +37,3 @@ func FromRawString(s string) (IOTag, error) {
|
|||
func (t IOTag) String() string {
|
||||
return string(t)
|
||||
}
|
||||
|
||||
func IOTagFromContext(ctx context.Context) string {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
tag = "undefined"
|
||||
}
|
||||
return tag
|
||||
}
|
||||
|
|
|
@ -90,3 +90,12 @@ func float64Value(f *float64) float64 {
|
|||
}
|
||||
return *f
|
||||
}
|
||||
|
||||
func isNoop(read, write limits.OpConfig) bool {
|
||||
return read.MaxRunningOps == limits.NoLimit &&
|
||||
read.MaxWaitingOps == limits.NoLimit &&
|
||||
write.MaxRunningOps == limits.NoLimit &&
|
||||
write.MaxWaitingOps == limits.NoLimit &&
|
||||
len(read.Tags) == 0 &&
|
||||
len(write.Tags) == 0
|
||||
}
|
||||
|
|
|
@ -153,10 +153,16 @@ func (e *StorageEngine) Close(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// closes all shards. Never returns an error, shard errors are logged.
|
||||
func (e *StorageEngine) close(ctx context.Context) error {
|
||||
func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
if releasePools {
|
||||
for _, p := range e.shardPools {
|
||||
p.Release()
|
||||
}
|
||||
}
|
||||
|
||||
for id, sh := range e.shards {
|
||||
if err := sh.Close(ctx); err != nil {
|
||||
e.log.Debug(ctx, logs.EngineCouldNotCloseShard,
|
||||
|
@ -207,7 +213,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
|
|||
return e.open(ctx)
|
||||
}
|
||||
} else if prevErr == nil { // ok -> block
|
||||
return e.close(ctx)
|
||||
return e.close(ctx, errors.Is(err, errClosed))
|
||||
}
|
||||
|
||||
// otherwise do nothing
|
||||
|
|
|
@ -245,6 +245,7 @@ func TestReload(t *testing.T) {
|
|||
|
||||
// no new paths => no new shards
|
||||
require.Equal(t, shardNum, len(e.shards))
|
||||
require.Equal(t, shardNum, len(e.shardPools))
|
||||
|
||||
newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum))
|
||||
|
||||
|
@ -256,6 +257,7 @@ func TestReload(t *testing.T) {
|
|||
require.NoError(t, e.Reload(context.Background(), rcfg))
|
||||
|
||||
require.Equal(t, shardNum+1, len(e.shards))
|
||||
require.Equal(t, shardNum+1, len(e.shardPools))
|
||||
|
||||
require.NoError(t, e.Close(context.Background()))
|
||||
})
|
||||
|
@ -275,6 +277,7 @@ func TestReload(t *testing.T) {
|
|||
|
||||
// removed one
|
||||
require.Equal(t, shardNum-1, len(e.shards))
|
||||
require.Equal(t, shardNum-1, len(e.shardPools))
|
||||
|
||||
require.NoError(t, e.Close(context.Background()))
|
||||
})
|
||||
|
@ -308,6 +311,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
|
|||
}
|
||||
|
||||
require.Equal(t, num, len(e.shards))
|
||||
require.Equal(t, num, len(e.shardPools))
|
||||
|
||||
return e, currShards
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -28,6 +29,8 @@ type StorageEngine struct {
|
|||
|
||||
shards map[string]hashedShard
|
||||
|
||||
shardPools map[string]util.WorkerPool
|
||||
|
||||
closeCh chan struct{}
|
||||
setModeCh chan setModeRequest
|
||||
wg sync.WaitGroup
|
||||
|
@ -190,6 +193,8 @@ type cfg struct {
|
|||
|
||||
metrics MetricRegister
|
||||
|
||||
shardPoolSize uint32
|
||||
|
||||
lowMem bool
|
||||
|
||||
containerSource atomic.Pointer[containerSource]
|
||||
|
@ -197,8 +202,9 @@ type cfg struct {
|
|||
|
||||
func defaultCfg() *cfg {
|
||||
res := &cfg{
|
||||
log: logger.NewLoggerWrapper(zap.L()),
|
||||
metrics: noopMetrics{},
|
||||
log: logger.NewLoggerWrapper(zap.L()),
|
||||
shardPoolSize: 20,
|
||||
metrics: noopMetrics{},
|
||||
}
|
||||
res.containerSource.Store(&containerSource{})
|
||||
return res
|
||||
|
@ -215,6 +221,7 @@ func New(opts ...Option) *StorageEngine {
|
|||
return &StorageEngine{
|
||||
cfg: c,
|
||||
shards: make(map[string]hashedShard),
|
||||
shardPools: make(map[string]util.WorkerPool),
|
||||
closeCh: make(chan struct{}),
|
||||
setModeCh: make(chan setModeRequest),
|
||||
evacuateLimiter: &evacuationLimiter{},
|
||||
|
@ -234,6 +241,13 @@ func WithMetrics(v MetricRegister) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithShardPoolSize returns option to specify size of worker pool for each shard.
|
||||
func WithShardPoolSize(sz uint32) Option {
|
||||
return func(c *cfg) {
|
||||
c.shardPoolSize = sz
|
||||
}
|
||||
}
|
||||
|
||||
// WithErrorThreshold returns an option to specify size amount of errors after which
|
||||
// shard is moved to read-only mode.
|
||||
func WithErrorThreshold(sz uint32) Option {
|
||||
|
|
|
@ -57,6 +57,7 @@ func (te *testEngineWrapper) setShardsNumOpts(
|
|||
te.shardIDs[i] = shard.ID()
|
||||
}
|
||||
require.Len(t, te.engine.shards, num)
|
||||
require.Len(t, te.engine.shardPools, num)
|
||||
return te
|
||||
}
|
||||
|
||||
|
@ -162,8 +163,6 @@ type testQoSLimiter struct {
|
|||
write atomic.Int64
|
||||
}
|
||||
|
||||
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
|
||||
|
||||
func (t *testQoSLimiter) Close() {
|
||||
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
|
||||
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
|
||||
|
@ -178,5 +177,3 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error)
|
|||
t.write.Add(1)
|
||||
return func() { t.write.Add(-1) }, nil
|
||||
}
|
||||
|
||||
func (t *testQoSLimiter) SetParentID(string) {}
|
||||
|
|
|
@ -46,6 +46,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
var testShards [2]*testShard
|
||||
|
||||
te := testNewEngine(t,
|
||||
WithShardPoolSize(1),
|
||||
WithErrorThreshold(errThreshold),
|
||||
).
|
||||
setShardsNumOpts(t, 2, func(id int) []shard.Option {
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
|
@ -200,6 +201,11 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
|||
return res
|
||||
}
|
||||
|
||||
type pooledShard struct {
|
||||
hashedShard
|
||||
pool util.WorkerPool
|
||||
}
|
||||
|
||||
var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
|
||||
|
||||
// Evacuate moves data from one shard to the others.
|
||||
|
@ -246,7 +252,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro
|
|||
}
|
||||
|
||||
var mtx sync.RWMutex
|
||||
copyShards := func() []hashedShard {
|
||||
copyShards := func() []pooledShard {
|
||||
mtx.RLock()
|
||||
defer mtx.RUnlock()
|
||||
t := slices.Clone(shards)
|
||||
|
@ -260,7 +266,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) erro
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
var err error
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
||||
|
@ -382,7 +388,7 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
||||
|
@ -406,7 +412,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.Cancel
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||
) error {
|
||||
sh := shardsToEvacuate[shardID]
|
||||
|
@ -479,7 +485,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
getShards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
sh := shardsToEvacuate[shardID]
|
||||
shards := getShards()
|
||||
|
@ -509,7 +515,7 @@ func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string,
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
|
||||
prm EvacuateShardPrm, res *EvacuateShardRes, shards []hashedShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
|
||||
trace.WithAttributes(
|
||||
|
@ -577,7 +583,7 @@ func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.S
|
|||
}
|
||||
|
||||
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
|
||||
prm EvacuateShardPrm, shards []hashedShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
prm EvacuateShardPrm, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) (bool, string, error) {
|
||||
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, shardsToEvacuate)
|
||||
if err != nil {
|
||||
|
@ -647,15 +653,15 @@ func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shar
|
|||
|
||||
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
|
||||
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
|
||||
shards []hashedShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) (hashedShard, bool, error) {
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) (pooledShard, bool, error) {
|
||||
hrw.SortHasherSliceByValue(shards, hrw.StringHash(tree.CID.EncodeToString()))
|
||||
var result hashedShard
|
||||
var result pooledShard
|
||||
var found bool
|
||||
for _, target := range shards {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return hashedShard{}, false, ctx.Err()
|
||||
return pooledShard{}, false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
|
@ -683,7 +689,7 @@ func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilora
|
|||
return result, found, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]hashedShard, error) {
|
||||
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, error) {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
|
@ -713,15 +719,18 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
|||
// We must have all shards, to have correct information about their
|
||||
// indexes in a sorted slice and set appropriate marks in the metabase.
|
||||
// Evacuated shard is skipped during put.
|
||||
shards := make([]hashedShard, 0, len(e.shards))
|
||||
shards := make([]pooledShard, 0, len(e.shards))
|
||||
for id := range e.shards {
|
||||
shards = append(shards, e.shards[id])
|
||||
shards = append(shards, pooledShard{
|
||||
hashedShard: e.shards[id],
|
||||
pool: e.shardPools[id],
|
||||
})
|
||||
}
|
||||
return shards, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
getShards func() []hashedShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container,
|
||||
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
|
||||
defer span.End()
|
||||
|
@ -791,7 +800,7 @@ func (e *StorageEngine) isNotRepOne(c *container.Container) bool {
|
|||
}
|
||||
|
||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
||||
shards []hashedShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container,
|
||||
) (bool, error) {
|
||||
hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString()))
|
||||
for j := range shards {
|
||||
|
@ -804,7 +813,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
|
||||
continue
|
||||
}
|
||||
switch e.putToShard(ctx, shards[j], addr, object, container.IsIndexedContainer(cnr)).status {
|
||||
switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object, container.IsIndexedContainer(cnr)).status {
|
||||
case putToShardSuccess:
|
||||
res.objEvacuated.Add(1)
|
||||
e.log.Debug(ctx, logs.EngineObjectIsMovedToAnotherShard,
|
||||
|
|
|
@ -196,6 +196,7 @@ func TestEvacuateShardObjects(t *testing.T) {
|
|||
|
||||
e.mtx.Lock()
|
||||
delete(e.shards, evacuateShardID)
|
||||
delete(e.shardPools, evacuateShardID)
|
||||
e.mtx.Unlock()
|
||||
|
||||
checkHasObjects(t)
|
||||
|
|
|
@ -205,7 +205,7 @@ func BenchmarkInhumeMultipart(b *testing.B) {
|
|||
func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
|
||||
b.StopTimer()
|
||||
|
||||
engine := testNewEngine(b).
|
||||
engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
|
||||
setShardsNum(b, numShards).prepare(b).engine
|
||||
defer func() { require.NoError(b, engine.Close(context.Background())) }()
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -98,13 +99,13 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
var shRes putToShardRes
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
e.mtx.RLock()
|
||||
_, ok := e.shards[sh.ID().String()]
|
||||
pool, ok := e.shardPools[sh.ID().String()]
|
||||
e.mtx.RUnlock()
|
||||
if !ok {
|
||||
// Shard was concurrently removed, skip.
|
||||
return false
|
||||
}
|
||||
shRes = e.putToShard(ctx, sh, addr, prm.Object, prm.IsIndexedContainer)
|
||||
shRes = e.putToShard(ctx, sh, pool, addr, prm.Object, prm.IsIndexedContainer)
|
||||
return shRes.status != putToShardUnknown
|
||||
})
|
||||
switch shRes.status {
|
||||
|
@ -121,59 +122,70 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
|
||||
// putToShard puts object to sh.
|
||||
// Return putToShardStatus and error if it is necessary to propagate an error upper.
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard,
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool,
|
||||
addr oid.Address, obj *objectSDK.Object, isIndexedContainer bool,
|
||||
) (res putToShardRes) {
|
||||
var existPrm shard.ExistsPrm
|
||||
existPrm.Address = addr
|
||||
exitCh := make(chan struct{})
|
||||
|
||||
exists, err := sh.Exists(ctx, existPrm)
|
||||
if err != nil {
|
||||
if shard.IsErrObjectExpired(err) {
|
||||
// object is already found but
|
||||
// expired => do nothing with it
|
||||
if err := pool.Submit(func() {
|
||||
defer close(exitCh)
|
||||
|
||||
var existPrm shard.ExistsPrm
|
||||
existPrm.Address = addr
|
||||
|
||||
exists, err := sh.Exists(ctx, existPrm)
|
||||
if err != nil {
|
||||
if shard.IsErrObjectExpired(err) {
|
||||
// object is already found but
|
||||
// expired => do nothing with it
|
||||
res.status = putToShardExists
|
||||
} else {
|
||||
e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||
}
|
||||
|
||||
if exists.Exists() {
|
||||
res.status = putToShardExists
|
||||
} else {
|
||||
e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||
}
|
||||
|
||||
if exists.Exists() {
|
||||
res.status = putToShardExists
|
||||
return
|
||||
}
|
||||
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.SetObject(obj)
|
||||
putPrm.SetIndexAttributes(isIndexedContainer)
|
||||
|
||||
_, err = sh.Put(ctx, putPrm)
|
||||
if err != nil {
|
||||
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
|
||||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
||||
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Error(err))
|
||||
res.status = putToShardRemoved
|
||||
res.err = err
|
||||
return
|
||||
}
|
||||
|
||||
e.reportShardError(ctx, sh, "could not put object to shard", err, zap.Stringer("address", addr))
|
||||
return
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.SetObject(obj)
|
||||
putPrm.SetIndexAttributes(isIndexedContainer)
|
||||
|
||||
_, err = sh.Put(ctx, putPrm)
|
||||
if err != nil {
|
||||
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
|
||||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
||||
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
if client.IsErrObjectAlreadyRemoved(err) {
|
||||
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Error(err))
|
||||
res.status = putToShardRemoved
|
||||
res.err = err
|
||||
return
|
||||
}
|
||||
|
||||
e.reportShardError(ctx, sh, "could not put object to shard", err, zap.Stringer("address", addr))
|
||||
return
|
||||
}
|
||||
|
||||
res.status = putToShardSuccess
|
||||
}); err != nil {
|
||||
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard, zap.Error(err))
|
||||
close(exitCh)
|
||||
}
|
||||
|
||||
res.status = putToShardSuccess
|
||||
<-exitCh
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -11,12 +11,10 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"github.com/google/uuid"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
@ -180,6 +178,11 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
|||
e.mtx.Lock()
|
||||
defer e.mtx.Unlock()
|
||||
|
||||
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create pool: %w", err)
|
||||
}
|
||||
|
||||
strID := sh.ID().String()
|
||||
if _, ok := e.shards[strID]; ok {
|
||||
return fmt.Errorf("shard with id %s was already added", strID)
|
||||
|
@ -193,6 +196,8 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
|||
hash: hrw.StringHash(strID),
|
||||
}
|
||||
|
||||
e.shardPools[strID] = pool
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -217,6 +222,12 @@ func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
|
|||
ss = append(ss, sh)
|
||||
delete(e.shards, id)
|
||||
|
||||
pool, ok := e.shardPools[id]
|
||||
if ok {
|
||||
pool.Release()
|
||||
delete(e.shardPools, id)
|
||||
}
|
||||
|
||||
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
|
||||
zap.String("id", id))
|
||||
}
|
||||
|
@ -415,6 +426,12 @@ func (e *StorageEngine) deleteShards(ctx context.Context, ids []*shard.ID) ([]ha
|
|||
|
||||
delete(e.shards, idStr)
|
||||
|
||||
pool, ok := e.shardPools[idStr]
|
||||
if ok {
|
||||
pool.Release()
|
||||
delete(e.shardPools, idStr)
|
||||
}
|
||||
|
||||
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
|
||||
zap.String("id", idStr))
|
||||
}
|
||||
|
@ -425,46 +442,3 @@ func (e *StorageEngine) deleteShards(ctx context.Context, ids []*shard.ID) ([]ha
|
|||
func (s hashedShard) Hash() uint64 {
|
||||
return s.hash
|
||||
}
|
||||
|
||||
func (e *StorageEngine) ListShardsForObject(ctx context.Context, obj oid.Address) ([]shard.Info, error) {
|
||||
var err error
|
||||
var info []shard.Info
|
||||
prm := shard.ExistsPrm{
|
||||
Address: obj,
|
||||
}
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
|
||||
e.iterateOverUnsortedShards(func(hs hashedShard) (stop bool) {
|
||||
res, exErr := hs.Exists(ctx, prm)
|
||||
if exErr != nil {
|
||||
if client.IsErrObjectAlreadyRemoved(exErr) {
|
||||
err = new(apistatus.ObjectAlreadyRemoved)
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if error is either SplitInfoError or ECInfoError.
|
||||
// True means the object is virtual.
|
||||
if errors.As(exErr, &siErr) || errors.As(exErr, &ecErr) {
|
||||
info = append(info, hs.DumpInfo())
|
||||
return false
|
||||
}
|
||||
|
||||
if shard.IsErrObjectExpired(exErr) {
|
||||
err = exErr
|
||||
return true
|
||||
}
|
||||
|
||||
if !client.IsErrObjectNotFound(exErr) {
|
||||
e.reportShardError(ctx, hs, "could not check existence of object in shard", exErr, zap.Stringer("address", prm.Address))
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
if res.Exists() {
|
||||
info = append(info, hs.DumpInfo())
|
||||
}
|
||||
return false
|
||||
})
|
||||
return info, err
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ func TestRemoveShard(t *testing.T) {
|
|||
e, ids := te.engine, te.shardIDs
|
||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||
|
||||
require.Equal(t, numOfShards, len(e.shardPools))
|
||||
require.Equal(t, numOfShards, len(e.shards))
|
||||
|
||||
removedNum := numOfShards / 2
|
||||
|
@ -36,6 +37,7 @@ func TestRemoveShard(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
require.Equal(t, numOfShards-removedNum, len(e.shardPools))
|
||||
require.Equal(t, numOfShards-removedNum, len(e.shards))
|
||||
|
||||
for id, removed := range mSh {
|
||||
|
|
|
@ -61,7 +61,6 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
|||
if s.pilorama != nil {
|
||||
s.pilorama.SetParentID(s.info.ID.String())
|
||||
}
|
||||
s.opsLimiter.SetParentID(s.info.ID.String())
|
||||
|
||||
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
||||
|
|
|
@ -2,7 +2,6 @@ package writecache
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -60,15 +59,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
|
||||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||
func (c *cache) putBig(ctx context.Context, prm common.PutPrm) error {
|
||||
if prm.RawData == nil { // foolproof: RawData should be marshalled by shard.
|
||||
data, err := prm.Object.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot marshal object: %w", err)
|
||||
}
|
||||
prm.RawData = data
|
||||
}
|
||||
size := uint64(len(prm.RawData))
|
||||
if !c.hasEnoughSpace(size) {
|
||||
if !c.hasEnoughSpaceFS() {
|
||||
return ErrOutOfSpace
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,10 @@ func (c *cache) estimateCacheSize() (uint64, uint64) {
|
|||
return count, size
|
||||
}
|
||||
|
||||
func (c *cache) hasEnoughSpaceFS() bool {
|
||||
return c.hasEnoughSpace(c.maxObjectSize)
|
||||
}
|
||||
|
||||
func (c *cache) hasEnoughSpace(objectSize uint64) bool {
|
||||
count, size := c.estimateCacheSize()
|
||||
if c.maxCacheCount > 0 && count+1 > c.maxCacheCount {
|
||||
|
|
|
@ -32,7 +32,6 @@ const (
|
|||
rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides"
|
||||
rpcDetachShards = "DetachShards"
|
||||
rpcStartShardRebuild = "StartShardRebuild"
|
||||
rpcListShardsForObject = "ListShardsForObject"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -365,22 +364,3 @@ func StartShardRebuild(cli *client.Client, req *StartShardRebuildRequest, opts .
|
|||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// ListShardsForObject executes ControlService.ListShardsForObject RPC.
|
||||
func ListShardsForObject(
|
||||
cli *client.Client,
|
||||
req *ListShardsForObjectRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*ListShardsForObjectResponse, error) {
|
||||
wResp := newResponseWrapper[ListShardsForObjectResponse]()
|
||||
|
||||
wReq := &requestWrapper{
|
||||
m: req,
|
||||
}
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcListShardsForObject), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
|
|
@ -1,66 +0,0 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) ListShardsForObject(ctx context.Context, req *control.ListShardsForObjectRequest) (*control.ListShardsForObjectResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
var obj oid.ID
|
||||
err = obj.DecodeString(req.GetBody().GetObjectId())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
var cnr cid.ID
|
||||
err = cnr.DecodeString(req.GetBody().GetContainerId())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
resp := new(control.ListShardsForObjectResponse)
|
||||
body := new(control.ListShardsForObjectResponse_Body)
|
||||
resp.SetBody(body)
|
||||
|
||||
var objAddr oid.Address
|
||||
objAddr.SetContainer(cnr)
|
||||
objAddr.SetObject(obj)
|
||||
info, err := s.s.ListShardsForObject(ctx, objAddr)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if len(info) == 0 {
|
||||
return nil, status.Error(codes.NotFound, logs.ShardCouldNotFindObject)
|
||||
}
|
||||
|
||||
body.SetShard_ID(shardInfoToProto(info))
|
||||
|
||||
// Sign the response
|
||||
if err := ctrlmessage.Sign(s.key, resp); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func shardInfoToProto(infos []shard.Info) [][]byte {
|
||||
shardInfos := make([][]byte, 0, len(infos))
|
||||
for _, info := range infos {
|
||||
shardInfos = append(shardInfos, *info.ID)
|
||||
}
|
||||
|
||||
return shardInfos
|
||||
}
|
|
@ -89,9 +89,6 @@ service ControlService {
|
|||
|
||||
// StartShardRebuild starts shard rebuild process.
|
||||
rpc StartShardRebuild(StartShardRebuildRequest) returns (StartShardRebuildResponse);
|
||||
|
||||
// ListShardsForObject returns shard info where object is stored.
|
||||
rpc ListShardsForObject(ListShardsForObjectRequest) returns (ListShardsForObjectResponse);
|
||||
}
|
||||
|
||||
// Health check request.
|
||||
|
@ -732,23 +729,3 @@ message StartShardRebuildResponse {
|
|||
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message ListShardsForObjectRequest {
|
||||
message Body {
|
||||
string object_id = 1;
|
||||
string container_id = 2;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message ListShardsForObjectResponse {
|
||||
message Body {
|
||||
// List of the node's shards storing object.
|
||||
repeated bytes shard_ID = 1;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
|
724
pkg/services/control/service_frostfs.pb.go
generated
724
pkg/services/control/service_frostfs.pb.go
generated
|
@ -17303,727 +17303,3 @@ func (x *StartShardRebuildResponse) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
|||
in.Consumed()
|
||||
}
|
||||
}
|
||||
|
||||
type ListShardsForObjectRequest_Body struct {
|
||||
ObjectId string `json:"objectId"`
|
||||
ContainerId string `json:"containerId"`
|
||||
}
|
||||
|
||||
var (
|
||||
_ encoding.ProtoMarshaler = (*ListShardsForObjectRequest_Body)(nil)
|
||||
_ encoding.ProtoUnmarshaler = (*ListShardsForObjectRequest_Body)(nil)
|
||||
_ json.Marshaler = (*ListShardsForObjectRequest_Body)(nil)
|
||||
_ json.Unmarshaler = (*ListShardsForObjectRequest_Body)(nil)
|
||||
)
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *ListShardsForObjectRequest_Body) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.StringSize(1, x.ObjectId)
|
||||
size += proto.StringSize(2, x.ContainerId)
|
||||
return size
|
||||
}
|
||||
|
||||
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
|
||||
func (x *ListShardsForObjectRequest_Body) MarshalProtobuf(dst []byte) []byte {
|
||||
m := pool.MarshalerPool.Get()
|
||||
defer pool.MarshalerPool.Put(m)
|
||||
x.EmitProtobuf(m.MessageMarshaler())
|
||||
dst = m.Marshal(dst)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (x *ListShardsForObjectRequest_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
if len(x.ObjectId) != 0 {
|
||||
mm.AppendString(1, x.ObjectId)
|
||||
}
|
||||
if len(x.ContainerId) != 0 {
|
||||
mm.AppendString(2, x.ContainerId)
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
|
||||
func (x *ListShardsForObjectRequest_Body) UnmarshalProtobuf(src []byte) (err error) {
|
||||
var fc easyproto.FieldContext
|
||||
for len(src) > 0 {
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read next field in %s", "ListShardsForObjectRequest_Body")
|
||||
}
|
||||
switch fc.FieldNum {
|
||||
case 1: // ObjectId
|
||||
data, ok := fc.String()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "ObjectId")
|
||||
}
|
||||
x.ObjectId = data
|
||||
case 2: // ContainerId
|
||||
data, ok := fc.String()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "ContainerId")
|
||||
}
|
||||
x.ContainerId = data
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectRequest_Body) GetObjectId() string {
|
||||
if x != nil {
|
||||
return x.ObjectId
|
||||
}
|
||||
return ""
|
||||
}
|
||||
func (x *ListShardsForObjectRequest_Body) SetObjectId(v string) {
|
||||
x.ObjectId = v
|
||||
}
|
||||
func (x *ListShardsForObjectRequest_Body) GetContainerId() string {
|
||||
if x != nil {
|
||||
return x.ContainerId
|
||||
}
|
||||
return ""
|
||||
}
|
||||
func (x *ListShardsForObjectRequest_Body) SetContainerId(v string) {
|
||||
x.ContainerId = v
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (x *ListShardsForObjectRequest_Body) MarshalJSON() ([]byte, error) {
|
||||
w := jwriter.Writer{}
|
||||
x.MarshalEasyJSON(&w)
|
||||
return w.Buffer.BuildBytes(), w.Error
|
||||
}
|
||||
func (x *ListShardsForObjectRequest_Body) MarshalEasyJSON(out *jwriter.Writer) {
|
||||
if x == nil {
|
||||
out.RawString("null")
|
||||
return
|
||||
}
|
||||
first := true
|
||||
out.RawByte('{')
|
||||
{
|
||||
if !first {
|
||||
out.RawByte(',')
|
||||
} else {
|
||||
first = false
|
||||
}
|
||||
const prefix string = "\"objectId\":"
|
||||
out.RawString(prefix)
|
||||
out.String(x.ObjectId)
|
||||
}
|
||||
{
|
||||
if !first {
|
||||
out.RawByte(',')
|
||||
} else {
|
||||
first = false
|
||||
}
|
||||
const prefix string = "\"containerId\":"
|
||||
out.RawString(prefix)
|
||||
out.String(x.ContainerId)
|
||||
}
|
||||
out.RawByte('}')
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (x *ListShardsForObjectRequest_Body) UnmarshalJSON(data []byte) error {
|
||||
r := jlexer.Lexer{Data: data}
|
||||
x.UnmarshalEasyJSON(&r)
|
||||
return r.Error()
|
||||
}
|
||||
func (x *ListShardsForObjectRequest_Body) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
||||
isTopLevel := in.IsStart()
|
||||
if in.IsNull() {
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
in.Skip()
|
||||
return
|
||||
}
|
||||
in.Delim('{')
|
||||
for !in.IsDelim('}') {
|
||||
key := in.UnsafeFieldName(false)
|
||||
in.WantColon()
|
||||
if in.IsNull() {
|
||||
in.Skip()
|
||||
in.WantComma()
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "objectId":
|
||||
{
|
||||
var f string
|
||||
f = in.String()
|
||||
x.ObjectId = f
|
||||
}
|
||||
case "containerId":
|
||||
{
|
||||
var f string
|
||||
f = in.String()
|
||||
x.ContainerId = f
|
||||
}
|
||||
}
|
||||
in.WantComma()
|
||||
}
|
||||
in.Delim('}')
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
}
|
||||
|
||||
type ListShardsForObjectRequest struct {
|
||||
Body *ListShardsForObjectRequest_Body `json:"body"`
|
||||
Signature *Signature `json:"signature"`
|
||||
}
|
||||
|
||||
var (
|
||||
_ encoding.ProtoMarshaler = (*ListShardsForObjectRequest)(nil)
|
||||
_ encoding.ProtoUnmarshaler = (*ListShardsForObjectRequest)(nil)
|
||||
_ json.Marshaler = (*ListShardsForObjectRequest)(nil)
|
||||
_ json.Unmarshaler = (*ListShardsForObjectRequest)(nil)
|
||||
)
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *ListShardsForObjectRequest) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *ListShardsForObjectRequest) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *ListShardsForObjectRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().MarshalProtobuf(buf), nil
|
||||
}
|
||||
|
||||
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
|
||||
func (x *ListShardsForObjectRequest) MarshalProtobuf(dst []byte) []byte {
|
||||
m := pool.MarshalerPool.Get()
|
||||
defer pool.MarshalerPool.Put(m)
|
||||
x.EmitProtobuf(m.MessageMarshaler())
|
||||
dst = m.Marshal(dst)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (x *ListShardsForObjectRequest) EmitProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
if x.Body != nil {
|
||||
x.Body.EmitProtobuf(mm.AppendMessage(1))
|
||||
}
|
||||
if x.Signature != nil {
|
||||
x.Signature.EmitProtobuf(mm.AppendMessage(2))
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
|
||||
func (x *ListShardsForObjectRequest) UnmarshalProtobuf(src []byte) (err error) {
|
||||
var fc easyproto.FieldContext
|
||||
for len(src) > 0 {
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read next field in %s", "ListShardsForObjectRequest")
|
||||
}
|
||||
switch fc.FieldNum {
|
||||
case 1: // Body
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "Body")
|
||||
}
|
||||
x.Body = new(ListShardsForObjectRequest_Body)
|
||||
if err := x.Body.UnmarshalProtobuf(data); err != nil {
|
||||
return fmt.Errorf("unmarshal: %w", err)
|
||||
}
|
||||
case 2: // Signature
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "Signature")
|
||||
}
|
||||
x.Signature = new(Signature)
|
||||
if err := x.Signature.UnmarshalProtobuf(data); err != nil {
|
||||
return fmt.Errorf("unmarshal: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectRequest) GetBody() *ListShardsForObjectRequest_Body {
|
||||
if x != nil {
|
||||
return x.Body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectRequest) SetBody(v *ListShardsForObjectRequest_Body) {
|
||||
x.Body = v
|
||||
}
|
||||
func (x *ListShardsForObjectRequest) GetSignature() *Signature {
|
||||
if x != nil {
|
||||
return x.Signature
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectRequest) SetSignature(v *Signature) {
|
||||
x.Signature = v
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (x *ListShardsForObjectRequest) MarshalJSON() ([]byte, error) {
|
||||
w := jwriter.Writer{}
|
||||
x.MarshalEasyJSON(&w)
|
||||
return w.Buffer.BuildBytes(), w.Error
|
||||
}
|
||||
func (x *ListShardsForObjectRequest) MarshalEasyJSON(out *jwriter.Writer) {
|
||||
if x == nil {
|
||||
out.RawString("null")
|
||||
return
|
||||
}
|
||||
first := true
|
||||
out.RawByte('{')
|
||||
{
|
||||
if !first {
|
||||
out.RawByte(',')
|
||||
} else {
|
||||
first = false
|
||||
}
|
||||
const prefix string = "\"body\":"
|
||||
out.RawString(prefix)
|
||||
x.Body.MarshalEasyJSON(out)
|
||||
}
|
||||
{
|
||||
if !first {
|
||||
out.RawByte(',')
|
||||
} else {
|
||||
first = false
|
||||
}
|
||||
const prefix string = "\"signature\":"
|
||||
out.RawString(prefix)
|
||||
x.Signature.MarshalEasyJSON(out)
|
||||
}
|
||||
out.RawByte('}')
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (x *ListShardsForObjectRequest) UnmarshalJSON(data []byte) error {
|
||||
r := jlexer.Lexer{Data: data}
|
||||
x.UnmarshalEasyJSON(&r)
|
||||
return r.Error()
|
||||
}
|
||||
func (x *ListShardsForObjectRequest) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
||||
isTopLevel := in.IsStart()
|
||||
if in.IsNull() {
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
in.Skip()
|
||||
return
|
||||
}
|
||||
in.Delim('{')
|
||||
for !in.IsDelim('}') {
|
||||
key := in.UnsafeFieldName(false)
|
||||
in.WantColon()
|
||||
if in.IsNull() {
|
||||
in.Skip()
|
||||
in.WantComma()
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "body":
|
||||
{
|
||||
var f *ListShardsForObjectRequest_Body
|
||||
f = new(ListShardsForObjectRequest_Body)
|
||||
f.UnmarshalEasyJSON(in)
|
||||
x.Body = f
|
||||
}
|
||||
case "signature":
|
||||
{
|
||||
var f *Signature
|
||||
f = new(Signature)
|
||||
f.UnmarshalEasyJSON(in)
|
||||
x.Signature = f
|
||||
}
|
||||
}
|
||||
in.WantComma()
|
||||
}
|
||||
in.Delim('}')
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
}
|
||||
|
||||
type ListShardsForObjectResponse_Body struct {
|
||||
Shard_ID [][]byte `json:"shardID"`
|
||||
}
|
||||
|
||||
var (
|
||||
_ encoding.ProtoMarshaler = (*ListShardsForObjectResponse_Body)(nil)
|
||||
_ encoding.ProtoUnmarshaler = (*ListShardsForObjectResponse_Body)(nil)
|
||||
_ json.Marshaler = (*ListShardsForObjectResponse_Body)(nil)
|
||||
_ json.Unmarshaler = (*ListShardsForObjectResponse_Body)(nil)
|
||||
)
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *ListShardsForObjectResponse_Body) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||
return size
|
||||
}
|
||||
|
||||
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
|
||||
func (x *ListShardsForObjectResponse_Body) MarshalProtobuf(dst []byte) []byte {
|
||||
m := pool.MarshalerPool.Get()
|
||||
defer pool.MarshalerPool.Put(m)
|
||||
x.EmitProtobuf(m.MessageMarshaler())
|
||||
dst = m.Marshal(dst)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (x *ListShardsForObjectResponse_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
for j := range x.Shard_ID {
|
||||
mm.AppendBytes(1, x.Shard_ID[j])
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
|
||||
func (x *ListShardsForObjectResponse_Body) UnmarshalProtobuf(src []byte) (err error) {
|
||||
var fc easyproto.FieldContext
|
||||
for len(src) > 0 {
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read next field in %s", "ListShardsForObjectResponse_Body")
|
||||
}
|
||||
switch fc.FieldNum {
|
||||
case 1: // Shard_ID
|
||||
data, ok := fc.Bytes()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "Shard_ID")
|
||||
}
|
||||
x.Shard_ID = append(x.Shard_ID, data)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectResponse_Body) GetShard_ID() [][]byte {
|
||||
if x != nil {
|
||||
return x.Shard_ID
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectResponse_Body) SetShard_ID(v [][]byte) {
|
||||
x.Shard_ID = v
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (x *ListShardsForObjectResponse_Body) MarshalJSON() ([]byte, error) {
|
||||
w := jwriter.Writer{}
|
||||
x.MarshalEasyJSON(&w)
|
||||
return w.Buffer.BuildBytes(), w.Error
|
||||
}
|
||||
func (x *ListShardsForObjectResponse_Body) MarshalEasyJSON(out *jwriter.Writer) {
|
||||
if x == nil {
|
||||
out.RawString("null")
|
||||
return
|
||||
}
|
||||
first := true
|
||||
out.RawByte('{')
|
||||
{
|
||||
if !first {
|
||||
out.RawByte(',')
|
||||
} else {
|
||||
first = false
|
||||
}
|
||||
const prefix string = "\"shardID\":"
|
||||
out.RawString(prefix)
|
||||
out.RawByte('[')
|
||||
for i := range x.Shard_ID {
|
||||
if i != 0 {
|
||||
out.RawByte(',')
|
||||
}
|
||||
if x.Shard_ID[i] != nil {
|
||||
out.Base64Bytes(x.Shard_ID[i])
|
||||
} else {
|
||||
out.String("")
|
||||
}
|
||||
}
|
||||
out.RawByte(']')
|
||||
}
|
||||
out.RawByte('}')
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (x *ListShardsForObjectResponse_Body) UnmarshalJSON(data []byte) error {
|
||||
r := jlexer.Lexer{Data: data}
|
||||
x.UnmarshalEasyJSON(&r)
|
||||
return r.Error()
|
||||
}
|
||||
func (x *ListShardsForObjectResponse_Body) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
||||
isTopLevel := in.IsStart()
|
||||
if in.IsNull() {
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
in.Skip()
|
||||
return
|
||||
}
|
||||
in.Delim('{')
|
||||
for !in.IsDelim('}') {
|
||||
key := in.UnsafeFieldName(false)
|
||||
in.WantColon()
|
||||
if in.IsNull() {
|
||||
in.Skip()
|
||||
in.WantComma()
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "shardID":
|
||||
{
|
||||
var f []byte
|
||||
var list [][]byte
|
||||
in.Delim('[')
|
||||
for !in.IsDelim(']') {
|
||||
{
|
||||
tmp := in.Bytes()
|
||||
if len(tmp) == 0 {
|
||||
tmp = nil
|
||||
}
|
||||
f = tmp
|
||||
}
|
||||
list = append(list, f)
|
||||
in.WantComma()
|
||||
}
|
||||
x.Shard_ID = list
|
||||
in.Delim(']')
|
||||
}
|
||||
}
|
||||
in.WantComma()
|
||||
}
|
||||
in.Delim('}')
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
}
|
||||
|
||||
type ListShardsForObjectResponse struct {
|
||||
Body *ListShardsForObjectResponse_Body `json:"body"`
|
||||
Signature *Signature `json:"signature"`
|
||||
}
|
||||
|
||||
var (
|
||||
_ encoding.ProtoMarshaler = (*ListShardsForObjectResponse)(nil)
|
||||
_ encoding.ProtoUnmarshaler = (*ListShardsForObjectResponse)(nil)
|
||||
_ json.Marshaler = (*ListShardsForObjectResponse)(nil)
|
||||
_ json.Unmarshaler = (*ListShardsForObjectResponse)(nil)
|
||||
)
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *ListShardsForObjectResponse) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *ListShardsForObjectResponse) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *ListShardsForObjectResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().MarshalProtobuf(buf), nil
|
||||
}
|
||||
|
||||
// MarshalProtobuf implements the encoding.ProtoMarshaler interface.
|
||||
func (x *ListShardsForObjectResponse) MarshalProtobuf(dst []byte) []byte {
|
||||
m := pool.MarshalerPool.Get()
|
||||
defer pool.MarshalerPool.Put(m)
|
||||
x.EmitProtobuf(m.MessageMarshaler())
|
||||
dst = m.Marshal(dst)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (x *ListShardsForObjectResponse) EmitProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
if x.Body != nil {
|
||||
x.Body.EmitProtobuf(mm.AppendMessage(1))
|
||||
}
|
||||
if x.Signature != nil {
|
||||
x.Signature.EmitProtobuf(mm.AppendMessage(2))
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
|
||||
func (x *ListShardsForObjectResponse) UnmarshalProtobuf(src []byte) (err error) {
|
||||
var fc easyproto.FieldContext
|
||||
for len(src) > 0 {
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read next field in %s", "ListShardsForObjectResponse")
|
||||
}
|
||||
switch fc.FieldNum {
|
||||
case 1: // Body
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "Body")
|
||||
}
|
||||
x.Body = new(ListShardsForObjectResponse_Body)
|
||||
if err := x.Body.UnmarshalProtobuf(data); err != nil {
|
||||
return fmt.Errorf("unmarshal: %w", err)
|
||||
}
|
||||
case 2: // Signature
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot unmarshal field %s", "Signature")
|
||||
}
|
||||
x.Signature = new(Signature)
|
||||
if err := x.Signature.UnmarshalProtobuf(data); err != nil {
|
||||
return fmt.Errorf("unmarshal: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectResponse) GetBody() *ListShardsForObjectResponse_Body {
|
||||
if x != nil {
|
||||
return x.Body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectResponse) SetBody(v *ListShardsForObjectResponse_Body) {
|
||||
x.Body = v
|
||||
}
|
||||
func (x *ListShardsForObjectResponse) GetSignature() *Signature {
|
||||
if x != nil {
|
||||
return x.Signature
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (x *ListShardsForObjectResponse) SetSignature(v *Signature) {
|
||||
x.Signature = v
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (x *ListShardsForObjectResponse) MarshalJSON() ([]byte, error) {
|
||||
w := jwriter.Writer{}
|
||||
x.MarshalEasyJSON(&w)
|
||||
return w.Buffer.BuildBytes(), w.Error
|
||||
}
|
||||
func (x *ListShardsForObjectResponse) MarshalEasyJSON(out *jwriter.Writer) {
|
||||
if x == nil {
|
||||
out.RawString("null")
|
||||
return
|
||||
}
|
||||
first := true
|
||||
out.RawByte('{')
|
||||
{
|
||||
if !first {
|
||||
out.RawByte(',')
|
||||
} else {
|
||||
first = false
|
||||
}
|
||||
const prefix string = "\"body\":"
|
||||
out.RawString(prefix)
|
||||
x.Body.MarshalEasyJSON(out)
|
||||
}
|
||||
{
|
||||
if !first {
|
||||
out.RawByte(',')
|
||||
} else {
|
||||
first = false
|
||||
}
|
||||
const prefix string = "\"signature\":"
|
||||
out.RawString(prefix)
|
||||
x.Signature.MarshalEasyJSON(out)
|
||||
}
|
||||
out.RawByte('}')
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (x *ListShardsForObjectResponse) UnmarshalJSON(data []byte) error {
|
||||
r := jlexer.Lexer{Data: data}
|
||||
x.UnmarshalEasyJSON(&r)
|
||||
return r.Error()
|
||||
}
|
||||
func (x *ListShardsForObjectResponse) UnmarshalEasyJSON(in *jlexer.Lexer) {
|
||||
isTopLevel := in.IsStart()
|
||||
if in.IsNull() {
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
in.Skip()
|
||||
return
|
||||
}
|
||||
in.Delim('{')
|
||||
for !in.IsDelim('}') {
|
||||
key := in.UnsafeFieldName(false)
|
||||
in.WantColon()
|
||||
if in.IsNull() {
|
||||
in.Skip()
|
||||
in.WantComma()
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "body":
|
||||
{
|
||||
var f *ListShardsForObjectResponse_Body
|
||||
f = new(ListShardsForObjectResponse_Body)
|
||||
f.UnmarshalEasyJSON(in)
|
||||
x.Body = f
|
||||
}
|
||||
case "signature":
|
||||
{
|
||||
var f *Signature
|
||||
f = new(Signature)
|
||||
f.UnmarshalEasyJSON(in)
|
||||
x.Signature = f
|
||||
}
|
||||
}
|
||||
in.WantComma()
|
||||
}
|
||||
in.Delim('}')
|
||||
if isTopLevel {
|
||||
in.Consumed()
|
||||
}
|
||||
}
|
||||
|
|
39
pkg/services/control/service_grpc.pb.go
generated
39
pkg/services/control/service_grpc.pb.go
generated
|
@ -41,7 +41,6 @@ const (
|
|||
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
|
||||
ControlService_DetachShards_FullMethodName = "/control.ControlService/DetachShards"
|
||||
ControlService_StartShardRebuild_FullMethodName = "/control.ControlService/StartShardRebuild"
|
||||
ControlService_ListShardsForObject_FullMethodName = "/control.ControlService/ListShardsForObject"
|
||||
)
|
||||
|
||||
// ControlServiceClient is the client API for ControlService service.
|
||||
|
@ -96,8 +95,6 @@ type ControlServiceClient interface {
|
|||
DetachShards(ctx context.Context, in *DetachShardsRequest, opts ...grpc.CallOption) (*DetachShardsResponse, error)
|
||||
// StartShardRebuild starts shard rebuild process.
|
||||
StartShardRebuild(ctx context.Context, in *StartShardRebuildRequest, opts ...grpc.CallOption) (*StartShardRebuildResponse, error)
|
||||
// ListShardsForObject returns shard info where object is stored.
|
||||
ListShardsForObject(ctx context.Context, in *ListShardsForObjectRequest, opts ...grpc.CallOption) (*ListShardsForObjectResponse, error)
|
||||
}
|
||||
|
||||
type controlServiceClient struct {
|
||||
|
@ -306,15 +303,6 @@ func (c *controlServiceClient) StartShardRebuild(ctx context.Context, in *StartS
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *controlServiceClient) ListShardsForObject(ctx context.Context, in *ListShardsForObjectRequest, opts ...grpc.CallOption) (*ListShardsForObjectResponse, error) {
|
||||
out := new(ListShardsForObjectResponse)
|
||||
err := c.cc.Invoke(ctx, ControlService_ListShardsForObject_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ControlServiceServer is the server API for ControlService service.
|
||||
// All implementations should embed UnimplementedControlServiceServer
|
||||
// for forward compatibility
|
||||
|
@ -367,8 +355,6 @@ type ControlServiceServer interface {
|
|||
DetachShards(context.Context, *DetachShardsRequest) (*DetachShardsResponse, error)
|
||||
// StartShardRebuild starts shard rebuild process.
|
||||
StartShardRebuild(context.Context, *StartShardRebuildRequest) (*StartShardRebuildResponse, error)
|
||||
// ListShardsForObject returns shard info where object is stored.
|
||||
ListShardsForObject(context.Context, *ListShardsForObjectRequest) (*ListShardsForObjectResponse, error)
|
||||
}
|
||||
|
||||
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
||||
|
@ -441,9 +427,6 @@ func (UnimplementedControlServiceServer) DetachShards(context.Context, *DetachSh
|
|||
func (UnimplementedControlServiceServer) StartShardRebuild(context.Context, *StartShardRebuildRequest) (*StartShardRebuildResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method StartShardRebuild not implemented")
|
||||
}
|
||||
func (UnimplementedControlServiceServer) ListShardsForObject(context.Context, *ListShardsForObjectRequest) (*ListShardsForObjectResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ListShardsForObject not implemented")
|
||||
}
|
||||
|
||||
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
||||
|
@ -852,24 +835,6 @@ func _ControlService_StartShardRebuild_Handler(srv interface{}, ctx context.Cont
|
|||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ControlService_ListShardsForObject_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListShardsForObjectRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ControlServiceServer).ListShardsForObject(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ControlService_ListShardsForObject_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).ListShardsForObject(ctx, req.(*ListShardsForObjectRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
|
@ -965,10 +930,6 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
|
|||
MethodName: "StartShardRebuild",
|
||||
Handler: _ControlService_StartShardRebuild_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "ListShardsForObject",
|
||||
Handler: _ControlService_ListShardsForObject_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: "pkg/services/control/service.proto",
|
||||
|
|
|
@ -163,7 +163,7 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error
|
|||
if err != nil {
|
||||
a.failed = true
|
||||
}
|
||||
if err != nil && !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||
audit.LogRequestWithKey(ctx, a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key,
|
||||
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||
!a.failed)
|
||||
|
@ -224,7 +224,7 @@ func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) e
|
|||
if err != nil {
|
||||
a.failed = true
|
||||
}
|
||||
if err != nil && !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||
audit.LogRequestWithKey(ctx, a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
|
||||
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||
!a.failed)
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
)
|
||||
|
@ -35,7 +34,7 @@ type (
|
|||
}
|
||||
|
||||
MetricRegister interface {
|
||||
AddRequestDuration(string, time.Duration, bool, string)
|
||||
AddRequestDuration(string, time.Duration, bool)
|
||||
AddPayloadSize(string, int)
|
||||
}
|
||||
)
|
||||
|
@ -52,7 +51,7 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er
|
|||
if m.enabled {
|
||||
t := time.Now()
|
||||
defer func() {
|
||||
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
|
||||
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil)
|
||||
}()
|
||||
err = m.next.Get(req, &getStreamMetric{
|
||||
ServerStream: stream,
|
||||
|
@ -107,7 +106,7 @@ func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingl
|
|||
|
||||
res, err := m.next.PutSingle(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil)
|
||||
if err == nil {
|
||||
m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload()))
|
||||
}
|
||||
|
@ -123,7 +122,7 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
|
|||
|
||||
res, err := m.next.Head(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -136,7 +135,7 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
|
|||
|
||||
err := m.next.Search(req, stream)
|
||||
|
||||
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
|
||||
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -149,7 +148,7 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque
|
|||
|
||||
res, err := m.next.Delete(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil)
|
||||
return res, err
|
||||
}
|
||||
return m.next.Delete(ctx, request)
|
||||
|
@ -161,7 +160,7 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
|
|||
|
||||
err := m.next.GetRange(req, stream)
|
||||
|
||||
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
|
||||
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -174,7 +173,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
|
|||
|
||||
res, err := m.next.GetRangeHash(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -210,7 +209,7 @@ func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error
|
|||
func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
||||
res, err := s.stream.CloseAndRecv(ctx)
|
||||
|
||||
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
|
||||
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -224,7 +223,7 @@ func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) e
|
|||
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||
res, err := s.stream.CloseAndRecv(ctx)
|
||||
|
||||
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
|
||||
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -110,7 +110,6 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
|||
),
|
||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
grpc.WithDisableServiceConfig(),
|
||||
}
|
||||
|
||||
if !netAddr.IsTLSEnabled() {
|
||||
|
|
|
@ -6,7 +6,6 @@ type MetricsRegister interface {
|
|||
AddReplicateTaskDuration(time.Duration, bool)
|
||||
AddReplicateWaitDuration(time.Duration, bool)
|
||||
AddSyncDuration(time.Duration, bool)
|
||||
AddOperation(string, string)
|
||||
}
|
||||
|
||||
type defaultMetricsRegister struct{}
|
||||
|
@ -14,4 +13,3 @@ type defaultMetricsRegister struct{}
|
|||
func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {}
|
||||
func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {}
|
||||
func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {}
|
||||
func (defaultMetricsRegister) AddOperation(string, string) {}
|
||||
|
|
|
@ -105,7 +105,6 @@ func (s *Service) Shutdown() {
|
|||
}
|
||||
|
||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
defer s.metrics.AddOperation("Add", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -149,7 +148,6 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
|||
}
|
||||
|
||||
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
defer s.metrics.AddOperation("AddByPath", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -205,7 +203,6 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
}
|
||||
|
||||
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
defer s.metrics.AddOperation("Remove", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -250,7 +247,6 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
// Move applies client operation to the specified tree and pushes in queue
|
||||
// for replication on other nodes.
|
||||
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
defer s.metrics.AddOperation("Move", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -294,7 +290,6 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
}
|
||||
|
||||
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
defer s.metrics.AddOperation("GetNodeByPath", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -368,7 +363,6 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
|||
}
|
||||
|
||||
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
defer s.metrics.AddOperation("GetSubTree", qos.IOTagFromContext(srv.Context()))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
@ -596,7 +590,6 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
|
|||
|
||||
// Apply locally applies operation from the remote node to the tree.
|
||||
func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
||||
defer s.metrics.AddOperation("Apply", qos.IOTagFromContext(ctx))
|
||||
err := verifyMessage(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -640,7 +633,6 @@ func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse,
|
|||
}
|
||||
|
||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
defer s.metrics.AddOperation("GetOpLog", qos.IOTagFromContext(srv.Context()))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
@ -705,7 +697,6 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
defer s.metrics.AddOperation("TreeList", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
|
|
@ -355,7 +355,6 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
|||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
grpc.WithDisableServiceConfig(),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue