Replace custom Redis config struct with go-redis UniversalOptions

Huge help from @milosgajdos who figured out how to do the entire
marshalling/unmarshalling for the configs

Signed-off-by: Anders Ingemann <aim@orbit.online>
This commit is contained in:
Anders Ingemann 2024-03-19 11:02:38 +01:00
parent bc6e81e1b9
commit b63cbb3318
No known key found for this signature in database
7 changed files with 179 additions and 155 deletions

View file

@ -20,11 +20,10 @@ http:
headers: headers:
X-Content-Type-Options: [nosniff] X-Content-Type-Options: [nosniff]
redis: redis:
addr: localhost:6379 addrs: [localhost:6379]
pool: maxidleconns: 16
maxidle: 16 poolsize: 64
maxactive: 64 connmaxidletime: 300s
idletimeout: 300s
dialtimeout: 10ms dialtimeout: 10ms
readtimeout: 10ms readtimeout: 10ms
writetimeout: 10ms writetimeout: 10ms

View file

@ -8,6 +8,8 @@ import (
"reflect" "reflect"
"strings" "strings"
"time" "time"
"github.com/redis/go-redis/v9"
) )
// Configuration is a versioned registry configuration, intended to be provided by a yaml file, and // Configuration is a versioned registry configuration, intended to be provided by a yaml file, and
@ -277,44 +279,6 @@ type FileChecker struct {
Threshold int `yaml:"threshold,omitempty"` Threshold int `yaml:"threshold,omitempty"`
} }
// Redis configures the redis pool available to the registry webapp.
type Redis struct {
// Addr specifies the redis instance available to the application.
Addr string `yaml:"addr,omitempty"`
// Usernames can be used as a finer-grained permission control since the introduction of the redis 6.0.
Username string `yaml:"username,omitempty"`
// Password string to use when making a connection.
Password string `yaml:"password,omitempty"`
// DB specifies the database to connect to on the redis instance.
DB int `yaml:"db,omitempty"`
// TLS configures settings for redis in-transit encryption
TLS struct {
Enabled bool `yaml:"enabled,omitempty"`
} `yaml:"tls,omitempty"`
DialTimeout time.Duration `yaml:"dialtimeout,omitempty"` // timeout for connect
ReadTimeout time.Duration `yaml:"readtimeout,omitempty"` // timeout for reads of data
WriteTimeout time.Duration `yaml:"writetimeout,omitempty"` // timeout for writes of data
// Pool configures the behavior of the redis connection pool.
Pool struct {
// MaxIdle sets the maximum number of idle connections.
MaxIdle int `yaml:"maxidle,omitempty"`
// MaxActive sets the maximum number of connections that should be
// opened before blocking a connection request.
MaxActive int `yaml:"maxactive,omitempty"`
// IdleTimeout sets the amount time to wait before closing
// inactive connections.
IdleTimeout time.Duration `yaml:"idletimeout,omitempty"`
} `yaml:"pool,omitempty"`
}
// HTTPChecker is a type of entry in the health section for checking HTTP URIs. // HTTPChecker is a type of entry in the health section for checking HTTP URIs.
type HTTPChecker struct { type HTTPChecker struct {
// Timeout is the duration to wait before timing out the HTTP request // Timeout is the duration to wait before timing out the HTTP request
@ -688,3 +652,124 @@ func Parse(rd io.Reader) (*Configuration, error) {
return config, nil return config, nil
} }
type Redis struct {
redis.UniversalOptions
}
func (c Redis) MarshalYAML() (interface{}, error) {
fields := make(map[string]interface{})
val := reflect.ValueOf(c.UniversalOptions)
typ := val.Type()
for i := 0; i < val.NumField(); i++ {
field := typ.Field(i)
fieldValue := val.Field(i)
// ignore imports and funcs
if field.PkgPath != "" || fieldValue.Kind() == reflect.Func {
continue
}
fields[strings.ToLower(field.Name)] = fieldValue.Interface()
}
return fields, nil
}
func (c *Redis) UnmarshalYAML(unmarshal func(interface{}) error) error {
var fields map[string]interface{}
err := unmarshal(&fields)
if err != nil {
return err
}
val := reflect.ValueOf(&c.UniversalOptions).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
fieldName := strings.ToLower(field.Name)
if value, ok := fields[fieldName]; ok {
fieldValue := val.Field(i)
if fieldValue.CanSet() {
switch field.Type {
case reflect.TypeOf(time.Duration(0)):
durationStr, ok := value.(string)
if !ok {
return fmt.Errorf("invalid duration value for field: %s", fieldName)
}
duration, err := time.ParseDuration(durationStr)
if err != nil {
return fmt.Errorf("failed to parse duration for field: %s, error: %v", fieldName, err)
}
fieldValue.Set(reflect.ValueOf(duration))
default:
if err := setFieldValue(fieldValue, value); err != nil {
return fmt.Errorf("failed to set value for field: %s, error: %v", fieldName, err)
}
}
}
}
}
return nil
}
func setFieldValue(field reflect.Value, value interface{}) error {
if value == nil {
return nil
}
switch field.Kind() {
case reflect.String:
stringValue, ok := value.(string)
if !ok {
return fmt.Errorf("failed to convert value to string")
}
field.SetString(stringValue)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
intValue, ok := value.(int)
if !ok {
return fmt.Errorf("failed to convert value to integer")
}
field.SetInt(int64(intValue))
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
uintValue, ok := value.(uint)
if !ok {
return fmt.Errorf("failed to convert value to unsigned integer")
}
field.SetUint(uint64(uintValue))
case reflect.Float32, reflect.Float64:
floatValue, ok := value.(float64)
if !ok {
return fmt.Errorf("failed to convert value to float")
}
field.SetFloat(floatValue)
case reflect.Bool:
boolValue, ok := value.(bool)
if !ok {
return fmt.Errorf("failed to convert value to boolean")
}
field.SetBool(boolValue)
case reflect.Slice:
slice := reflect.MakeSlice(field.Type(), 0, 0)
valueSlice, ok := value.([]interface{})
if !ok {
return fmt.Errorf("failed to convert value to slice")
}
for _, item := range valueSlice {
sliceValue := reflect.New(field.Type().Elem()).Elem()
if err := setFieldValue(sliceValue, item); err != nil {
return err
}
slice = reflect.Append(slice, sliceValue)
}
field.Set(slice)
default:
return fmt.Errorf("unsupported field type: %v", field.Type())
}
return nil
}

View file

@ -8,6 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -131,23 +132,19 @@ var configStruct = Configuration{
}, },
}, },
Redis: Redis{ Redis: Redis{
Addr: "localhost:6379", redis.UniversalOptions{
Addrs: []string{"localhost:6379"},
Username: "alice", Username: "alice",
Password: "123456", Password: "123456",
DB: 1, DB: 1,
Pool: struct { MaxIdleConns: 16,
MaxIdle int `yaml:"maxidle,omitempty"` PoolSize: 64,
MaxActive int `yaml:"maxactive,omitempty"` ConnMaxIdleTime: time.Second * 300,
IdleTimeout time.Duration `yaml:"idletimeout,omitempty"`
}{
MaxIdle: 16,
MaxActive: 64,
IdleTimeout: time.Second * 300,
},
DialTimeout: time.Millisecond * 10, DialTimeout: time.Millisecond * 10,
ReadTimeout: time.Millisecond * 10, ReadTimeout: time.Millisecond * 10,
WriteTimeout: time.Millisecond * 10, WriteTimeout: time.Millisecond * 10,
}, },
},
} }
// configYamlV0_1 is a Version 0.1 yaml document representing configStruct // configYamlV0_1 is a Version 0.1 yaml document representing configStruct
@ -190,14 +187,13 @@ http:
headers: headers:
X-Content-Type-Options: [nosniff] X-Content-Type-Options: [nosniff]
redis: redis:
addr: localhost:6379 addrs: [localhost:6379]
username: alice username: alice
password: 123456 password: "123456"
db: 1 db: 1
pool: maxidleconns: 16
maxidle: 16 poolsize: 64
maxactive: 64 connmaxidletime: 300s
idletimeout: 300s
dialtimeout: 10ms dialtimeout: 10ms
readtimeout: 10ms readtimeout: 10ms
writetimeout: 10ms writetimeout: 10ms

View file

@ -241,16 +241,15 @@ notifications:
actions: actions:
- pull - pull
redis: redis:
addr: localhost:6379 addrs: [localhost:6379]
password: asecret password: asecret
db: 0 db: 0
dialtimeout: 10ms dialtimeout: 10ms
readtimeout: 10ms readtimeout: 10ms
writetimeout: 10ms writetimeout: 10ms
pool: maxidleconns: 16
maxidle: 16 poolsize: 64
maxactive: 64 connmaxidletime: 300s
idletimeout: 300s
tls: tls:
enabled: false enabled: false
health: health:
@ -952,72 +951,31 @@ The `events` structure configures the information provided in event notification
## `redis` ## `redis`
Declare parameters for constructing the `redis` connections. Registry instances
may use the Redis instance for several applications. Currently, it caches
information about immutable blobs. Most of the `redis` options control
how the registry connects to the `redis` instance.
You should configure Redis with the **allkeys-lru** eviction policy, because the
registry does not set an expiration value on keys.
Under the hood distribution uses [`go-redis`](https://redis.uptrace.dev/) for
redis connectivity and its [`UniversalOptions`](https://pkg.go.dev/github.com/redis/go-redis/v9#UniversalOptions)
struct.
```yaml ```yaml
redis: redis:
addr: localhost:6379 addrs: [localhost:6379]
password: asecret password: asecret
db: 0 db: 0
dialtimeout: 10ms dialtimeout: 10ms
readtimeout: 10ms readtimeout: 10ms
writetimeout: 10ms writetimeout: 10ms
pool: maxidleconns: 16
maxidle: 16 poolsize: 64
maxactive: 64 connmaxidletime: 300s
idletimeout: 300s
tls:
enabled: false
``` ```
Declare parameters for constructing the `redis` connections. Registry instances
may use the Redis instance for several applications. Currently, it caches
information about immutable blobs. Most of the `redis` options control
how the registry connects to the `redis` instance. You can control the pool's
behavior with the [pool](#pool) subsection. Additionally, you can control
TLS connection settings with the [tls](#tls) subsection (in-transit encryption).
You should configure Redis with the **allkeys-lru** eviction policy, because the
registry does not set an expiration value on keys.
| Parameter | Required | Description |
|-----------|----------|-------------------------------------------------------|
| `addr` | yes | The address (host and port) of the Redis instance. |
| `password`| no | A password used to authenticate to the Redis instance.|
| `db` | no | The name of the database to use for each connection. |
| `dialtimeout` | no | The timeout for connecting to the Redis instance. |
| `readtimeout` | no | The timeout for reading from the Redis instance. |
| `writetimeout` | no | The timeout for writing to the Redis instance. |
### `pool`
```yaml
pool:
maxidle: 16
maxactive: 64
idletimeout: 300s
```
Use these settings to configure the behavior of the Redis connection pool.
| Parameter | Required | Description |
|-----------|----------|-------------------------------------------------------|
| `maxidle` | no | The maximum number of idle connections in the pool. |
| `maxactive`| no | The maximum number of connections which can be open before blocking a connection request. |
| `idletimeout`| no | How long to wait before closing inactive connections. |
### `tls`
```yaml
tls:
enabled: false
```
Use these settings to configure Redis TLS.
| Parameter | Required | Description |
|-----------|----------|-------------------------------------- |
| `enabled` | no | Whether or not to use TLS in-transit. |
## `health` ## `health`
```yaml ```yaml

View file

@ -77,7 +77,7 @@ type App struct {
source notifications.SourceRecord source notifications.SourceRecord
} }
redis *redis.Client redis redis.UniversalClient
// isCache is true if this registry is configured as a pull through cache // isCache is true if this registry is configured as a pull through cache
isCache bool isCache bool
@ -487,12 +487,12 @@ func (app *App) configureEvents(configuration *configuration.Configuration) {
} }
func (app *App) configureRedis(cfg *configuration.Configuration) { func (app *App) configureRedis(cfg *configuration.Configuration) {
if cfg.Redis.Addr == "" { if len(cfg.Redis.Addrs) == 0 {
dcontext.GetLogger(app).Infof("redis not configured") dcontext.GetLogger(app).Infof("redis not configured")
return return
} }
app.redis = app.createPool(cfg.Redis) app.redis = app.createPool(cfg.Redis.UniversalOptions)
// Enable metrics instrumentation. // Enable metrics instrumentation.
if err := redisotel.InstrumentMetrics(app.redis); err != nil { if err := redisotel.InstrumentMetrics(app.redis); err != nil {
@ -514,25 +514,12 @@ func (app *App) configureRedis(cfg *configuration.Configuration) {
})) }))
} }
func (app *App) createPool(cfg configuration.Redis) *redis.Client { func (app *App) createPool(cfg redis.UniversalOptions) redis.UniversalClient {
return redis.NewClient(&redis.Options{ cfg.OnConnect = func(ctx context.Context, cn *redis.Conn) error {
Addr: cfg.Addr,
OnConnect: func(ctx context.Context, cn *redis.Conn) error {
res := cn.Ping(ctx) res := cn.Ping(ctx)
return res.Err() return res.Err()
}, }
Username: cfg.Username, return redis.NewUniversalClient(&cfg)
Password: cfg.Password,
DB: cfg.DB,
MaxRetries: 3,
DialTimeout: cfg.DialTimeout,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
PoolFIFO: false,
MaxIdleConns: cfg.Pool.MaxIdle,
PoolSize: cfg.Pool.MaxActive,
ConnMaxIdleTime: cfg.Pool.IdleTimeout,
})
} }
// configureLogHook prepares logging hook parameters. // configureLogHook prepares logging hook parameters.

View file

@ -25,7 +25,7 @@ import (
// Note that there is no implied relationship between these two caches. The // Note that there is no implied relationship between these two caches. The
// layer may exist in one, both or none and the code must be written this way. // layer may exist in one, both or none and the code must be written this way.
type redisBlobDescriptorService struct { type redisBlobDescriptorService struct {
pool *redis.Client pool redis.UniversalClient
// TODO(stevvooe): We use a pool because we don't have great control over // TODO(stevvooe): We use a pool because we don't have great control over
// the cache lifecycle to manage connections. A new connection if fetched // the cache lifecycle to manage connections. A new connection if fetched
@ -37,7 +37,7 @@ var _ distribution.BlobDescriptorService = &redisBlobDescriptorService{}
// NewRedisBlobDescriptorCacheProvider returns a new redis-based // NewRedisBlobDescriptorCacheProvider returns a new redis-based
// BlobDescriptorCacheProvider using the provided redis connection pool. // BlobDescriptorCacheProvider using the provided redis connection pool.
func NewRedisBlobDescriptorCacheProvider(pool *redis.Client) cache.BlobDescriptorCacheProvider { func NewRedisBlobDescriptorCacheProvider(pool redis.UniversalClient) cache.BlobDescriptorCacheProvider {
return metrics.NewPrometheusCacheProvider( return metrics.NewPrometheusCacheProvider(
&redisBlobDescriptorService{ &redisBlobDescriptorService{
pool: pool, pool: pool,

View file

@ -17,15 +17,14 @@ log:
formatter: text formatter: text
level: debug level: debug
redis: redis:
addr: redis:6379 addrs: [redis:6379]
db: 0 db: 0
dialtimeout: 5s dialtimeout: 5s
readtimeout: 10ms readtimeout: 10ms
writetimeout: 10ms writetimeout: 10ms
pool: maxidleconns: 16
idletimeout: 60s poolsize: 64
maxactive: 64 connmaxidletime: 300s
maxidle: 16
storage: storage:
redirect: redirect:
disable: true disable: true