WIP: billing service #1032

Closed
dstepanov-yadro wants to merge 4 commits from dstepanov-yadro/frostfs-node:feat/billing_api into master
30 changed files with 1258 additions and 13 deletions

View file

@ -0,0 +1,61 @@
package main
import (
"context"
"net"
billingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/billing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/billing"
billingSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/billing/server"
"go.uber.org/zap"
"google.golang.org/grpc"
)
const serviceNameBilling = "billing"
func initBillingService(c *cfg) {
endpoint := billingconfig.GRPC(c.appCfg).Endpoint()
if endpoint == billingconfig.GRPCEndpointDefault {
c.log.Info(logs.FrostFSNodeBillingServiceDisabled)
return
}
pubs := billingconfig.AuthorizedKeys(c.appCfg)
rawPubs := make([][]byte, 0, len(pubs)+1)
rawPubs = append(rawPubs, c.key.PublicKey().Bytes())
for i := range pubs {
rawPubs = append(rawPubs, pubs[i].Bytes())
}
billingSvc := billingSvc.New(
&c.key.PrivateKey,
rawPubs,
c.cfgObject.cnrSource,
c.cfgObject.cfgLocalStorage.localStorage,
)
lis, err := net.Listen("tcp", endpoint)
if err != nil {
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpointBilling, zap.Error(err))
return
}
c.cfgBillingService.server = grpc.NewServer()
c.onShutdown(func() {
stopGRPC("FrostFS Billing API", c.cfgBillingService.server, c.log)
})
billing.RegisterBillingServiceServer(c.cfgBillingService.server, billingSvc)
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
runAndLog(ctx, c, serviceNameBilling, false, func(context.Context, *cfg) {
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
zap.String("service", serviceNameBilling),
zap.String("endpoint", endpoint))
fatalOnErr(c.cfgBillingService.server.Serve(lis))
})
}))
}

View file

@ -457,6 +457,7 @@ type cfg struct {
cfgControlService cfgControlService
cfgObject cfgObject
cfgNotifications cfgNotifications
cfgBillingService cfgBillingService
}
// ReadCurrentNetMap reads network map which has been cached at the
@ -643,6 +644,10 @@ type cfgControlService struct {
server *grpc.Server
}
type cfgBillingService struct {
server *grpc.Server
}
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
func initCfg(appCfg *config.Config) *cfg {

View file

@ -0,0 +1,58 @@
package billing
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
type GRPCConfig struct {
cfg *config.Config
}
const (
subsection = "billing"
grpcSubsection = "grpc"
// GRPCEndpointDefault is a default endpoint of gRPC Billing service.
GRPCEndpointDefault = ""
)
// AuthorizedKeys parses and returns an array of "authorized_keys" config
// parameter from "control" section.
//
// Returns an empty list if not set.
func AuthorizedKeys(c *config.Config) keys.PublicKeys {
strKeys := config.StringSliceSafe(c.Sub(subsection), "authorized_keys")
pubs := make(keys.PublicKeys, 0, len(strKeys))
for i := range strKeys {
pub, err := keys.NewPublicKeyFromString(strKeys[i])
if err != nil {
panic(fmt.Errorf("invalid permitted key for Billing service %s: %w", strKeys[i], err))
}
pubs = append(pubs, pub)
}
return pubs
}
func GRPC(c *config.Config) GRPCConfig {
return GRPCConfig{
c.Sub(subsection).Sub(grpcSubsection),
}
}
// Endpoint returns the value of "endpoint" config parameter.
//
// Returns GRPCEndpointDefault if the value is not a non-empty string.
func (g GRPCConfig) Endpoint() string {
v := config.String(g.cfg, "endpoint")
if v != "" {
return v
}
return GRPCEndpointDefault
}

View file

@ -0,0 +1,37 @@
package billing_test
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
billingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/billing"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
func TestBillingSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
empty := configtest.EmptyConfig()
require.Empty(t, billingconfig.AuthorizedKeys(empty))
require.Equal(t, billingconfig.GRPCEndpointDefault, billingconfig.GRPC(empty).Endpoint())
})
const path = "../../../../config/example/node"
pubs := make(keys.PublicKeys, 2)
pubs[0], _ = keys.NewPublicKeyFromString("035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11")
pubs[1], _ = keys.NewPublicKeyFromString("028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6")
fileConfigTest := func(c *config.Config) {
require.Equal(t, pubs, billingconfig.AuthorizedKeys(c))
require.Equal(t, "localhost:8092", billingconfig.GRPC(c).Endpoint())
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}

View file

@ -115,6 +115,7 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService)
initAndLog(c, "billing", initBillingService)
initAndLog(c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) })
}

View file

@ -56,6 +56,10 @@ FROSTFS_GRPC_1_TLS_ENABLED=false
FROSTFS_CONTROL_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
FROSTFS_CONTROL_GRPC_ENDPOINT=localhost:8090
# Billing service section
FROSTFS_BILLING_AUTHORIZED_KEYS="035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
FROSTFS_BILLING_GRPC_ENDPOINT=localhost:8092
# Contracts section
FROSTFS_CONTRACTS_BALANCE=5263abba1abedbf79bb57f3e40b50b4425d2d6cd
FROSTFS_CONTRACTS_CONTAINER=5d084790d7aa36cea7b53fe897380dab11d2cd3c

View file

@ -91,6 +91,15 @@
"endpoint": "localhost:8090"
}
},
"billing": {
"authorized_keys": [
"035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11",
"028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6"
],
"grpc": {
"endpoint": "localhost:8092"
}
},
"contracts": {
"balance": "5263abba1abedbf79bb57f3e40b50b4425d2d6cd",
"container": "5d084790d7aa36cea7b53fe897380dab11d2cd3c",

View file

@ -77,6 +77,13 @@ control:
grpc:
endpoint: localhost:8090 # endpoint that is listened by the Control Service
billing:
authorized_keys: # list of hex-encoded public keys that have rights to use the Billing Service
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
grpc:
endpoint: localhost:8092 # endpoint that is listened by the Billing Service
contracts: # side chain NEOFS contract script hashes; optional, override values retrieved from NNS contract
balance: 5263abba1abedbf79bb57f3e40b50b4425d2d6cd
container: 5d084790d7aa36cea7b53fe897380dab11d2cd3c

View file

@ -48,6 +48,8 @@
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8081",
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
"FROSTFS_BILLING_GRPC_ENDPOINT":"127.0.0.1:8082",
"FROSTFS_BILLING_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state",

View file

@ -18,6 +18,7 @@ There are some custom types used for brevity:
| `pprof` | [PProf configuration](#pprof-section) |
| `prometheus` | [Prometheus metrics configuration](#prometheus-section) |
| `control` | [Control service configuration](#control-section) |
| `billing` | [Billing service configuration](#billing-section) |
| `contracts` | [Override FrostFS contracts hashes](#contracts-section) |
| `morph` | [N3 blockchain client configuration](#morph-section) |
| `apiclient` | [FrostFS API client configuration](#apiclient-section) |
@ -41,6 +42,20 @@ control:
| `authorized_keys` | `[]public key` | empty | List of public keys which are used to authorize requests to the control service. |
| `grpc.endpoint` | `string` | empty | Address that control service listener binds to. |
# `billing` section
```yaml
billing:
authorized_keys:
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
grpc:
endpoint: 127.0.0.1:8092
```
| Parameter | Type | Default value | Description |
|-------------------|----------------|---------------|----------------------------------------------------------------------------------|
| `authorized_keys` | `[]public key` | empty | List of public keys which are used to authorize requests to the billing service. |
| `grpc.endpoint` | `string` | empty | Address that billing service listener binds to. |
# `grpc` section
```yaml
grpc:

View file

@ -487,6 +487,8 @@ const (
FrostFSNodeRemovingAllTreesForContainer = "removing all trees for container"
FrostFSNodeContainerRemovalEventReceivedButTreesWerentRemoved = "container removal event received, but trees weren't removed"
FrostFSNodeCantListenGRPCEndpointControl = "can't listen gRPC endpoint (control)"
FrostFSNodeCantListenGRPCEndpointBilling = "can't listen gRPC endpoint (billing)"
FrostFSNodeBillingServiceDisabled = "billing service is disabled"
FrostFSNodePolicerIsDisabled = "policer is disabled"
CommonApplicationStarted = "application started"
ShardGCCollectingExpiredObjectsStarted = "collecting expired objects started"

View file

@ -0,0 +1,167 @@
package engine
import (
"bytes"
"context"
"errors"
"sort"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
type ContainerStatPrm struct {
ContainerID []cid.ID
Limit uint32
StartFromContainerID *cid.ID
}
type ContainerStatRes struct {
ContainerStats []ContainerStat
Partial bool
}
type ContainerStat struct {
ContainerID cid.ID
SizeLogic uint64
CountPhy, CountLogic, CountUser uint64
}
var errInvalidLimit = errors.New("limit must be greater than zero")
func (e *StorageEngine) ContainerStat(ctx context.Context, prm ContainerStatPrm) (*ContainerStatRes, error) {
if e.metrics != nil {
defer elapsed("ContainerStat", e.metrics.AddMethodDuration)()
}
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.ContainerStat",
trace.WithAttributes(
attribute.Int("container_ids", len(prm.ContainerID)),
attribute.Int64("limit", int64(prm.Limit)),
attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil),
))
defer span.End()
if len(prm.ContainerID) == 0 && prm.Limit == 0 {
return nil, errInvalidLimit
}
var result *ContainerStatRes
err := e.execIfNotBlocked(func() error {
var sErr error
result, sErr = e.containerStat(ctx, prm)
return sErr
})
return result, err
}
func (e *StorageEngine) containerStat(ctx context.Context, prm ContainerStatPrm) (*ContainerStatRes, error) {
e.mtx.RLock()
defer e.mtx.RUnlock()
if len(prm.ContainerID) > 0 {
sort.Slice(prm.ContainerID, func(i, j int) bool {
return bytes.Compare(prm.ContainerID[i][:], prm.ContainerID[j][:]) < 0
})
}
shardResults, partial, err := e.collectShardContainerStats(ctx, prm)
if err != nil {
return nil, err
}
return &ContainerStatRes{
ContainerStats: e.mergeShardContainerStats(shardResults, int(prm.Limit)),
Partial: partial,
}, nil
}
func (e *StorageEngine) collectShardContainerStats(ctx context.Context, prm ContainerStatPrm) ([][]shard.ContainerStat, bool, error) {
if len(prm.ContainerID) > 0 {
sort.Slice(prm.ContainerID, func(i, j int) bool {
return bytes.Compare(prm.ContainerID[i][:], prm.ContainerID[j][:]) < 0
})
}
var shardResults [][]shard.ContainerStat
var shardErrors []error
var resultsGuard sync.Mutex
shPrm := shard.ContainerStatPrm{
ContainerID: prm.ContainerID,
Limit: prm.Limit,
StartFromContainerID: prm.StartFromContainerID,
}
eg, egCtx := errgroup.WithContext(ctx)
var shardsCount int
e.iterateOverUnsortedShards(func(hs hashedShard) (stop bool) {
shardsCount++
eg.Go(func() error {
s, err := hs.ContainerStat(egCtx, shPrm)
resultsGuard.Lock()
defer resultsGuard.Unlock()
if err != nil {
shardErrors = append(shardErrors, err)
return nil
}
if len(s) > 0 {
shardResults = append(shardResults, s)
}
return nil
})
return false
})
if err := eg.Wait(); err != nil {
return nil, false, err
}
if shardsCount == len(shardErrors) {
return nil, false, errors.Join(shardErrors...)
}
return shardResults, len(shardErrors) > 0, nil
}
func (e *StorageEngine) mergeShardContainerStats(shardResults [][]shard.ContainerStat, limit int) []ContainerStat {
var stats []ContainerStat
for len(stats) <= limit && len(shardResults) > 0 {
// shard results are sorted by container ID
sort.Slice(shardResults, func(i, j int) bool {
return bytes.Compare(shardResults[i][0].ContainerID[:], shardResults[j][0].ContainerID[:]) < 0
})
if len(stats) > 0 && stats[len(stats)-1].ContainerID == shardResults[0][0].ContainerID {
stats[len(stats)-1].SizeLogic += shardResults[0][0].SizeLogic
stats[len(stats)-1].CountPhy += shardResults[0][0].CountPhy
stats[len(stats)-1].CountLogic += shardResults[0][0].CountLogic
stats[len(stats)-1].CountUser += shardResults[0][0].CountUser
} else {
stats = append(stats, ContainerStat{
ContainerID: shardResults[0][0].ContainerID,
SizeLogic: shardResults[0][0].SizeLogic,
CountPhy: shardResults[0][0].CountPhy,
CountLogic: shardResults[0][0].CountLogic,
CountUser: shardResults[0][0].CountUser,
})
}
if len(shardResults[0]) == 1 { // last item for shard
shardResults = shardResults[1:]
} else {
shardResults[0] = shardResults[0][1:]
}
}
if len(stats) > limit {
stats = stats[:limit]
}
return stats
}

View file

@ -0,0 +1,221 @@
package engine
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestContainerList(t *testing.T) {
t.Parallel()
s1 := testNewShard(t)
s2 := testNewShard(t)
s3 := testNewShard(t)
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
e.log = test.NewLogger(t)
defer e.Close(context.Background())
const containerCount = 10
expStat := testPutComplexObject(t, []*shard.Shard{s1, s2, s3}, containerCount, nil)
expStat = testPutSimpleObject(t, []*shard.Shard{s1, s2, s3}, containerCount, expStat)
expStat = testPutSimpleObject(t, []*shard.Shard{s1, s2}, containerCount, expStat)
expStat = testPutSimpleObject(t, []*shard.Shard{s2, s3}, containerCount, expStat)
expStat = testPutSimpleObject(t, []*shard.Shard{s1}, containerCount, expStat)
expStat = testPutSimpleObject(t, []*shard.Shard{s2}, containerCount, expStat)
expStat = testPutSimpleObject(t, []*shard.Shard{s3}, containerCount, expStat)
t.Run("with default limit", func(t *testing.T) {
var prm ContainerStatPrm
prm.Limit = 10_000
res, err := e.ContainerStat(context.Background(), prm)
require.NoError(t, err)
require.NotNil(t, res)
require.ElementsMatch(t, expStat, res.ContainerStats)
require.False(t, res.Partial)
})
t.Run("with limit, batched", func(t *testing.T) {
var prm ContainerStatPrm
prm.Limit = 1
var stats []ContainerStat
for {
res, err := e.ContainerStat(context.Background(), prm)
require.NoError(t, err)
require.NotNil(t, res)
require.False(t, res.Partial)
if len(res.ContainerStats) == 0 {
break
}
stats = append(stats, res.ContainerStats...)
last := res.ContainerStats[len(res.ContainerStats)-1].ContainerID
prm.StartFromContainerID = &last
prm.Limit += 1
}
require.ElementsMatch(t, expStat, stats)
})
t.Run("by container id", func(t *testing.T) {
for _, cc := range []int{1, 2, 3, 4, 5} {
var prm ContainerStatPrm
for idx := 0; idx+cc < len(expStat); idx += cc {
prm.ContainerID = nil
for i := 0; i < cc; i++ {
prm.ContainerID = append(prm.ContainerID, expStat[idx+i].ContainerID)
}
prm.Limit = uint32(len(prm.ContainerID))
res, err := e.ContainerStat(context.Background(), prm)
require.NoError(t, err)
require.NotNil(t, res)
require.False(t, res.Partial)
require.ElementsMatch(t, expStat[idx:idx+cc], res.ContainerStats)
}
}
})
t.Run("unknown container id", func(t *testing.T) {
var prm ContainerStatPrm
prm.ContainerID = append(prm.ContainerID, cidtest.ID())
prm.Limit = uint32(len(prm.ContainerID))
res, err := e.ContainerStat(context.Background(), prm)
require.NoError(t, err)
require.NotNil(t, res)
require.False(t, res.Partial)
require.ElementsMatch(t, []ContainerStat{{ContainerID: prm.ContainerID[0]}}, res.ContainerStats)
})
t.Run("degraded shard", func(t *testing.T) {
s1.SetMode(mode.Degraded)
var prm ContainerStatPrm
prm.Limit = 10_000
res, err := e.ContainerStat(context.Background(), prm)
require.NoError(t, err)
require.NotNil(t, res)
require.True(t, res.Partial)
})
}
func testPutComplexObject(t *testing.T, shards []*shard.Shard, count int, stats []ContainerStat) []ContainerStat {
const payloadSize = 10 * 1024
for count > 0 {
var cnr cid.ID
var stat *ContainerStat
var newStat bool
if len(stats) == 0 || count%2 == 0 {
cnr = cidtest.ID()
stat = &ContainerStat{ContainerID: cnr}
newStat = true
} else {
cnr = stats[count%len(stats)].ContainerID
stat = &stats[count%len(stats)]
}
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
const childCount = 10
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload(make([]byte, payloadSize))
children[i].SetPayloadSize(payloadSize)
childIDs[i], _ = children[i].ID()
stat.SizeLogic += payloadSize
stat.CountLogic += 1
stat.CountPhy += 1
}
stat.CountUser += 1
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
stat.CountLogic += 1
stat.CountPhy += 1
stat.SizeLogic += link.PayloadSize()
for i := range children {
sh := shards[i%len(shards)]
var putPrm shard.PutPrm
putPrm.SetObject(children[i])
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
}
sh := shards[count%len(shards)]
var putPrm shard.PutPrm
putPrm.SetObject(link)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
if newStat {
stats = append(stats, *stat)
}
count--
}
return stats
}
func testPutSimpleObject(t *testing.T, shards []*shard.Shard, count int, stats []ContainerStat) []ContainerStat {
const payloadSize = 7 * 1024
for count > 0 {
var cnr cid.ID
var stat *ContainerStat
var newStat bool
if len(stats) == 0 || count%2 == 0 {
cnr = cidtest.ID()
stat = &ContainerStat{ContainerID: cnr}
newStat = true
} else {
cnr = stats[count%len(stats)].ContainerID
stat = &stats[count%len(stats)]
}
obj := testutil.GenerateObjectWithCID(cnr)
obj.SetPayload(make([]byte, payloadSize))
obj.SetPayloadSize(payloadSize)
stat.SizeLogic += payloadSize
stat.CountLogic += 1
stat.CountPhy += 1
stat.CountUser += 1
sh := shards[count%len(shards)]
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
if newStat {
stats = append(stats, *stat)
}
count--
}
return stats
}

View file

@ -0,0 +1,237 @@
package meta
import (
"bytes"
"context"
"crypto/sha256"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type ContainerStatPrm struct {
ContainerID []cid.ID
Limit uint32
StartFromContainerID *cid.ID
}
type ContainerStat struct {
ContainerID cid.ID
SizeLogic uint64
CountPhy, CountLogic, CountUser uint64
}
// ContainerStat returns object count and size for containers.
// If len(prm.ContainerID) > 0, then result slice contains records in the same order as prm.ContainerID.
// Otherwise result slice sorted by ContainerID.
func (db *DB) ContainerStat(ctx context.Context, prm ContainerStatPrm) ([]ContainerStat, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ContainerStat", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerStat",
trace.WithAttributes(
attribute.Int("container_ids", len(prm.ContainerID)),
attribute.Int64("limit", int64(prm.Limit)),
attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
if len(prm.ContainerID) > 0 {
return db.containerStatByContainerID(prm.ContainerID)
}
return db.containerStatByLimit(prm.StartFromContainerID, prm.Limit)
}
func (db *DB) containerStatByContainerID(containerID []cid.ID) ([]ContainerStat, error) {
var result []ContainerStat
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for _, contID := range containerID {
var stat ContainerStat
stat.ContainerID = contID
stat.SizeLogic = db.containerSize(tx, contID)
counters, err := db.containerCounters(tx, contID)
if err != nil {
return err
}
stat.CountPhy = counters.Phy
stat.CountLogic = counters.Logic
stat.CountUser = counters.User
result = append(result, stat)
}
return nil
})
if err != nil {
return nil, metaerr.Wrap(err)
}
return result, nil
}
func (db *DB) containerStatByLimit(startFrom *cid.ID, limit uint32) ([]ContainerStat, error) {
var result []ContainerStat
var lastKey []byte
if startFrom != nil {
lastKey = make([]byte, sha256.Size)
startFrom.Encode(lastKey)
}
var counts []containerIDObjectCounters
var sizes []containerIDSize
err := db.boltDB.View(func(tx *bbolt.Tx) error {
var e error
counts, e = getContainerCountersBatch(tx, lastKey, limit)
if e != nil {
return e
}
sizes, e = getContainerSizesBatch(tx, lastKey, limit)
return e
})
if err != nil {
return nil, metaerr.Wrap(err)
}
result = mergeSizeAndCounts(counts, sizes)
if len(result) > int(limit) {
result = result[:limit]
}
return result, nil
}
type containerIDObjectCounters struct {
ContainerID cid.ID
ObjectCounters
}
func getContainerCountersBatch(tx *bbolt.Tx, lastKey []byte, limit uint32) ([]containerIDObjectCounters, error) {
var result []containerIDObjectCounters
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return result, nil
}
c := b.Cursor()
var key, value []byte
for key, value = c.Seek(lastKey); key != nil && uint32(len(result)) < limit; key, value = c.Next() {
if bytes.Equal(lastKey, key) {
continue
}
cnrID, err := parseContainerCounterKey(key)
if err != nil {
return nil, err
}
ent, err := parseContainerCounterValue(value)
if err != nil {
return nil, err
}
result = append(result, containerIDObjectCounters{
ContainerID: cnrID,
ObjectCounters: ent,
})
}
return result, nil
}
type containerIDSize struct {
ContainerID cid.ID
Size uint64
}
func getContainerSizesBatch(tx *bbolt.Tx, lastKey []byte, limit uint32) ([]containerIDSize, error) {
var result []containerIDSize
b := tx.Bucket(containerVolumeBucketName)
c := b.Cursor()
var key, value []byte
for key, value = c.Seek(lastKey); key != nil && uint32(len(result)) < limit; key, value = c.Next() {
if bytes.Equal(lastKey, key) {
continue
}
var r containerIDSize
r.Size = parseContainerSize(value)
if err := r.ContainerID.Decode(key); err != nil {
return nil, err
}
result = append(result, r)
}
return result, nil
}
// mergeSizeAndCounts merges sizes and counts.
// As records are deleted in background, it can happen that metabase contains size record for container,
// but doesn't contain record for count.
func mergeSizeAndCounts(counts []containerIDObjectCounters, sizes []containerIDSize) []ContainerStat {
var result []ContainerStat
for len(counts) > 0 || len(sizes) > 0 {
if len(counts) == 0 {
result = append(result, ContainerStat{
ContainerID: sizes[0].ContainerID,
SizeLogic: sizes[0].Size,
})
sizes = sizes[1:]
continue
}
if len(sizes) == 0 {
result = append(result, ContainerStat{
ContainerID: counts[0].ContainerID,
CountPhy: counts[0].Phy,
CountLogic: counts[0].Logic,
CountUser: counts[0].User,
})
counts = counts[1:]
continue
}
v := bytes.Compare(sizes[0].ContainerID[:], counts[0].ContainerID[:])
if v == 0 { // equal
result = append(result, ContainerStat{
ContainerID: counts[0].ContainerID,
CountPhy: counts[0].Phy,
CountLogic: counts[0].Logic,
CountUser: counts[0].User,
SizeLogic: sizes[0].Size,
})
counts = counts[1:]
sizes = sizes[1:]
} else if v < 0 { // from sizes
result = append(result, ContainerStat{
ContainerID: sizes[0].ContainerID,
SizeLogic: sizes[0].Size,
})
sizes = sizes[1:]
} else { // from counts
result = append(result, ContainerStat{
ContainerID: counts[0].ContainerID,
CountPhy: counts[0].Phy,
CountLogic: counts[0].Logic,
CountUser: counts[0].User,
})
counts = counts[1:]
}
}
return result
}

View file

@ -65,20 +65,19 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
}
err = db.boltDB.View(func(tx *bbolt.Tx) error {
size, err = db.containerSize(tx, id)
return err
size = db.containerSize(tx, id)
return nil
})
return size, metaerr.Wrap(err)
}
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) {
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) uint64 {
containerVolume := tx.Bucket(containerVolumeBucketName)
key := make([]byte, cidSize)
id.Encode(key)
return parseContainerSize(containerVolume.Get(key)), nil
return parseContainerSize(containerVolume.Get(key))
}
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {

View file

@ -211,21 +211,28 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
var result ObjectCounters
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
key := make([]byte, cidSize)
id.Encode(key)
v := b.Get(key)
if v == nil {
return nil
}
var err error
result, err = parseContainerCounterValue(v)
result, err = db.containerCounters(tx, id)
return err
})
return result, metaerr.Wrap(err)
}
func (*DB) containerCounters(tx *bbolt.Tx, id cid.ID) (ObjectCounters, error) {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return ObjectCounters{}, nil
}
key := make([]byte, cidSize)
id.Encode(key)
v := b.Get(key)
if v == nil {
return ObjectCounters{}, nil
}
return parseContainerCounterValue(v)
}
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err)

View file

@ -0,0 +1,63 @@
package shard
import (
"context"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type ContainerStatPrm struct {
ContainerID []cid.ID
Limit uint32
StartFromContainerID *cid.ID
}
type ContainerStat struct {
ContainerID cid.ID
SizeLogic uint64
CountPhy, CountLogic, CountUser uint64
}
// ContainerStat returns object count and size for containers from metabase.
func (s *Shard) ContainerStat(ctx context.Context, prm ContainerStatPrm) ([]ContainerStat, error) {
_, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerStat",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Int("container_ids", len(prm.ContainerID)),
attribute.Int64("limit", int64(prm.Limit)),
attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.GetMode().NoMetabase() {
return nil, ErrDegradedMode
}
res, err := s.metaBase.ContainerStat(ctx, meta.ContainerStatPrm{
ContainerID: prm.ContainerID,
Limit: prm.Limit,
StartFromContainerID: prm.StartFromContainerID,
})
if err != nil {
return nil, err
}
result := make([]ContainerStat, 0, len(res))
for _, r := range res {
result = append(result, ContainerStat{
ContainerID: r.ContainerID,
SizeLogic: r.SizeLogic,
CountPhy: r.CountPhy,
CountLogic: r.CountLogic,
CountUser: r.CountUser,
})
}
return result, nil
}

View file

@ -0,0 +1,137 @@
package server
import (
"context"
"errors"
"fmt"
containerApi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/billing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
defaultLimit = 1000
maxLimit = 10000
)
var (
errInvalidContainerIDLenght = errors.New("count of container ID array must be lower or equal 10 000 items")
errInvalidLimit = errors.New("limit value must be lower or equal 10 000")
)
func (s *Server) ListContainers(ctx context.Context, req *billing.ListContainersRequest) (*billing.ListContainersResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
enginePrm, err := convertToEngineContainerStatPrm(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
res, err := s.se.ContainerStat(ctx, enginePrm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &billing.ListContainersResponse{
Body: &billing.ListContainersResponse_Body{
Result: s.addContainerInfo(res.ContainerStats),
NextPageToken: containerListNextPageToken(res.ContainerStats, enginePrm.Limit),
Partial: res.Partial,
},
}
if err = SignMessage(s.key, resp); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) addContainerInfo(engineStats []engine.ContainerStat) []*billing.ListContainersResponse_Body_ContainerInfo {
var result []*billing.ListContainersResponse_Body_ContainerInfo
for _, engineStat := range engineStats {
containerInfo := &billing.ListContainersResponse_Body_ContainerInfo{
ContainerId: engineStat.ContainerID[:],
Count: &billing.ListContainersResponse_Body_ContainerInfo_Count{
Phy: engineStat.CountPhy,
Logic: engineStat.CountLogic,
User: engineStat.CountUser,
},
Size: &billing.ListContainersResponse_Body_ContainerInfo_Size{
Logic: engineStat.SizeLogic,
},
ContainerStatus: billing.ListContainersResponse_Body_ContainerInfo_UNDEFINED,
}
cnr, err := s.cnrSrc.Get(engineStat.ContainerID)
if err != nil {
if client.IsErrContainerNotFound(err) {
existed, errWasRemoved := containercore.WasRemoved(s.cnrSrc, engineStat.ContainerID)
if errWasRemoved == nil && existed {
containerInfo.ContainerStatus = billing.ListContainersResponse_Body_ContainerInfo_DELETED
}
}
} else {
containerInfo.ContainerStatus = billing.ListContainersResponse_Body_ContainerInfo_AVAILABLE
containerInfo.Attributes = &billing.ListContainersResponse_Body_ContainerInfo_Attributes{
OwnerWallet: cnr.Value.Owner().WalletBytes(),
Zone: cnr.Value.Attribute(containerApi.SysAttributeZone),
}
}
result = append(result, containerInfo)
}
return result
}
func containerListNextPageToken(engineStats []engine.ContainerStat, limit uint32) []byte {
if uint32(len(engineStats)) <= limit {
return nil
}
return engineStats[len(engineStats)-1].ContainerID[:]
}
func convertToEngineContainerStatPrm(req *billing.ListContainersRequest) (engine.ContainerStatPrm, error) {
var result engine.ContainerStatPrm
if len(req.GetBody().GetContainerId()) > 10000 {
return result, errInvalidContainerIDLenght
}
if len(req.GetBody().GetContainerId()) > 0 {
for idx, contIDBytes := range req.GetBody().GetContainerId() {
var contID cid.ID
if err := contID.Decode(contIDBytes); err != nil {
return result, fmt.Errorf("failed to decode container ID at index %d: %w", idx, err)
}
result.ContainerID = append(result.ContainerID, contID)
}
return result, nil
}
result.Limit = defaultLimit
if req.GetBody().GetLimit() > maxLimit {
return result, errInvalidLimit
}
if req.GetBody().GetLimit() > 0 {
result.Limit = req.GetBody().GetLimit()
}
if len(req.GetBody().GetNextPageToken()) > 0 {
var contID cid.ID
if err := contID.Decode(req.GetBody().GetNextPageToken()); err != nil {
return result, fmt.Errorf("invalid next page token: %w", err)
}
result.StartFromContainerID = &contID
}
return result, nil
}

View file

@ -0,0 +1,34 @@
package server
import (
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
)
type cfg struct {
key *ecdsa.PrivateKey
allowedKeys [][]byte
cnrSrc container.Source
se *engine.StorageEngine
}
type Server struct {
*cfg
}
func New(key *ecdsa.PrivateKey,
allowedKeys [][]byte,
cnrSrc container.Source,
se *engine.StorageEngine,
) *Server {
return &Server{
cfg: &cfg{
key: key,
allowedKeys: allowedKeys,
cnrSrc: cnrSrc,
se: se,
},
}
}

View file

@ -0,0 +1,92 @@
package server
import (
"bytes"
"crypto/ecdsa"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/billing"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
)
// SignedMessage is an interface of Control service message.
type SignedMessage interface {
ReadSignedData([]byte) ([]byte, error)
GetSignature() *billing.Signature
SetSignature(*billing.Signature)
}
var (
errDisallowedKey = errors.New("key is not in the allowed list")
errMissingSignature = errors.New("missing signature")
errInvalidSignature = errors.New("invalid signature")
)
func (s *Server) isValidRequest(req SignedMessage) error {
sign := req.GetSignature()
if sign == nil {
return errMissingSignature
}
var (
key = sign.GetKey()
allowed = false
)
for i := range s.allowedKeys {
if allowed = bytes.Equal(s.allowedKeys[i], key); allowed {
break
}
}
if !allowed {
return errDisallowedKey
}
binBody, err := req.ReadSignedData(nil)
if err != nil {
return fmt.Errorf("marshal request body: %w", err)
}
var sigV2 refs.Signature
sigV2.SetKey(sign.GetKey())
sigV2.SetSign(sign.GetSignature())
sigV2.SetScheme(refs.ECDSA_SHA512)
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(sigV2); err != nil {
return fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binBody) {
return errInvalidSignature
}
return nil
}
func SignMessage(key *ecdsa.PrivateKey, msg SignedMessage) error {
binBody, err := msg.ReadSignedData(nil)
if err != nil {
return fmt.Errorf("marshal request body: %w", err)
}
var sig frostfscrypto.Signature
err = sig.Calculate(frostfsecdsa.Signer(*key), binBody)
if err != nil {
return fmt.Errorf("calculate signature: %w", err)
}
var sigV2 refs.Signature
sig.WriteToV2(&sigV2)
var sigBilling billing.Signature
sigBilling.Key = sigV2.GetKey()
sigBilling.Signature = sigV2.GetSign()
msg.SetSignature(&sigBilling)
return nil
}

BIN
pkg/services/billing/service.pb.go generated Normal file

Binary file not shown.

View file

@ -0,0 +1,87 @@
syntax = "proto3";
package billing;
option go_package = "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/billing";
// `BillingService` provides an interface to get usage statistics.
service BillingService {
// Returns list that contains container usage statistic.
rpc ListContainers(ListContainersRequest) returns (ListContainersResponse);
}
// Signature of some message.
message Signature {
// Public key used for signing.
bytes key = 1;
// Binary signature.
bytes signature = 2;
}
message ListContainersRequest {
message Body {
// Container IDs to get container metrics. Not required. Maximum count is
// 10000 items.
repeated bytes container_id = 1;
// Max items count in the response. Not required. Default value is 1000. Not
// applicable if container_id is specified. Maximum value is 10000.
uint32 limit = 2;
// Next page token used to continue listing. Not required. If null or empty,
// listing starts from the beginning.
bytes next_page_token = 3;
}
Body body = 1;
Signature signature = 2;
}
message ListContainersResponse {
message Body {
message ContainerInfo {
message Count {
uint64 phy = 1;
uint64 logic = 2;
uint64 user = 3;
}
message Size { uint64 logic = 1; }
message Attributes {
// Container owner's wallet bytes.
bytes owner_wallet = 1;
// Container zone.
string zone = 2;
}
enum Status {
// Undefined status, default value.
UNDEFINED = 0;
// Container is available.
AVAILABLE = 1;
// Container is deleted.
DELETED = 2;
}
// Container ID.
bytes container_id = 1;
// Container status.
Status container_status = 2;
// Container attributes. May be null if container attributes are
// unavailable for current moment or container already deleted.
Attributes attributes = 3;
// Count of the objects in container.
Count count = 4;
// Size of the objects in container.
Size size = 5;
}
repeated ContainerInfo result = 1;
// Next page token used to get next batch. If returned value is null or
// empty, then listing completed.
bytes next_page_token = 2;
// Returned result is partial. This could happend if some shards are in
// degraded mode or returned an error.
bool partial = 3;
}
Body body = 1;
Signature signature = 2;
}

BIN
pkg/services/billing/service_frostfs.pb.go generated Normal file

Binary file not shown.

BIN
pkg/services/billing/service_grpc.pb.go generated Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.