553 lines
18 KiB
Go
553 lines
18 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2015-2019 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/minio/cli"
|
|
"github.com/minio/minio/cmd/config"
|
|
xhttp "github.com/minio/minio/cmd/http"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/auth"
|
|
"github.com/minio/minio/pkg/certs"
|
|
"github.com/minio/minio/pkg/color"
|
|
"github.com/minio/minio/pkg/env"
|
|
"github.com/minio/minio/pkg/retry"
|
|
)
|
|
|
|
// ServerFlags - server command specific flags
|
|
var ServerFlags = []cli.Flag{
|
|
cli.StringFlag{
|
|
Name: "address",
|
|
Value: ":" + GlobalMinioDefaultPort,
|
|
Usage: "bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname",
|
|
},
|
|
}
|
|
|
|
var serverCmd = cli.Command{
|
|
Name: "server",
|
|
Usage: "start object storage server",
|
|
Flags: append(ServerFlags, GlobalFlags...),
|
|
Action: serverMain,
|
|
CustomHelpTemplate: `NAME:
|
|
{{.HelpName}} - {{.Usage}}
|
|
|
|
USAGE:
|
|
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR1 [DIR2..]
|
|
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64}
|
|
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64} DIR{65...128}
|
|
|
|
DIR:
|
|
DIR points to a directory on a filesystem. When you want to combine
|
|
multiple drives into a single large system, pass one directory per
|
|
filesystem separated by space. You may also use a '...' convention
|
|
to abbreviate the directory arguments. Remote directories in a
|
|
distributed setup are encoded as HTTP(s) URIs.
|
|
{{if .VisibleFlags}}
|
|
FLAGS:
|
|
{{range .VisibleFlags}}{{.}}
|
|
{{end}}{{end}}
|
|
|
|
EXAMPLES:
|
|
1. Start minio server on "/home/shared" directory.
|
|
{{.Prompt}} {{.HelpName}} /home/shared
|
|
|
|
2. Start single node server with 64 local drives "/mnt/data1" to "/mnt/data64".
|
|
{{.Prompt}} {{.HelpName}} /mnt/data{1...64}
|
|
|
|
3. Start distributed minio server on an 32 node setup with 32 drives each, run following command on all the nodes
|
|
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}minio
|
|
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}miniostorage
|
|
{{.Prompt}} {{.HelpName}} http://node{1...32}.example.com/mnt/export{1...32}
|
|
|
|
4. Start distributed minio server in an expanded setup, run the following command on all the nodes
|
|
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}minio
|
|
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}miniostorage
|
|
{{.Prompt}} {{.HelpName}} http://node{1...16}.example.com/mnt/export{1...32} \
|
|
http://node{17...64}.example.com/mnt/export{1...64}
|
|
|
|
`,
|
|
}
|
|
|
|
// Checks if endpoints are either available through environment
|
|
// or command line, returns false if both fails.
|
|
func endpointsPresent(ctx *cli.Context) bool {
|
|
endpoints := env.Get(config.EnvEndpoints, strings.Join(ctx.Args(), config.ValueSeparator))
|
|
return len(endpoints) != 0
|
|
}
|
|
|
|
func serverHandleCmdArgs(ctx *cli.Context) {
|
|
// Handle common command args.
|
|
handleCommonCmdArgs(ctx)
|
|
|
|
logger.FatalIf(CheckLocalServerAddr(globalCLIContext.Addr), "Unable to validate passed arguments")
|
|
|
|
var setupType SetupType
|
|
var err error
|
|
|
|
globalMinioAddr = globalCLIContext.Addr
|
|
|
|
globalMinioHost, globalMinioPort = mustSplitHostPort(globalMinioAddr)
|
|
endpoints := strings.Fields(env.Get(config.EnvEndpoints, ""))
|
|
if len(endpoints) > 0 {
|
|
globalEndpoints, globalXLSetDriveCount, setupType, err = createServerEndpoints(globalCLIContext.Addr, endpoints...)
|
|
} else {
|
|
globalEndpoints, globalXLSetDriveCount, setupType, err = createServerEndpoints(globalCLIContext.Addr, ctx.Args()...)
|
|
}
|
|
logger.FatalIf(err, "Invalid command line arguments")
|
|
|
|
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
|
|
// to IPv6 address ie minio will start listening on IPv6 address whereas another
|
|
// (non-)minio process is listening on IPv4 of given port.
|
|
// To avoid this error situation we check for port availability.
|
|
logger.FatalIf(checkPortAvailability(globalMinioHost, globalMinioPort), "Unable to start the server")
|
|
|
|
globalIsXL = (setupType == XLSetupType)
|
|
globalIsDistXL = (setupType == DistXLSetupType)
|
|
if globalIsDistXL {
|
|
globalIsXL = true
|
|
}
|
|
}
|
|
|
|
func serverHandleEnvVars() {
|
|
// Handle common environment variables.
|
|
handleCommonEnvVars()
|
|
}
|
|
|
|
func newAllSubsystems() {
|
|
// Create new notification system and initialize notification targets
|
|
globalNotificationSys = NewNotificationSys(globalEndpoints)
|
|
|
|
// Create new bucket metadata system.
|
|
globalBucketMetadataSys = NewBucketMetadataSys()
|
|
|
|
// Create a new config system.
|
|
globalConfigSys = NewConfigSys()
|
|
|
|
// Create new IAM system.
|
|
globalIAMSys = NewIAMSys()
|
|
|
|
// Create new policy system.
|
|
globalPolicySys = NewPolicySys()
|
|
|
|
// Create new lifecycle system.
|
|
globalLifecycleSys = NewLifecycleSys()
|
|
|
|
// Create new bucket encryption subsystem
|
|
globalBucketSSEConfigSys = NewBucketSSEConfigSys()
|
|
|
|
// Create new bucket object lock subsystem
|
|
globalBucketObjectLockSys = NewBucketObjectLockSys()
|
|
|
|
// Create new bucket quota subsystem
|
|
globalBucketQuotaSys = NewBucketQuotaSys()
|
|
}
|
|
|
|
func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
|
|
// Create cancel context to control 'newRetryTimer' go routine.
|
|
retryCtx, cancel := context.WithCancel(ctx)
|
|
|
|
// Indicate to our routine to exit cleanly upon return.
|
|
defer cancel()
|
|
|
|
// Make sure to hold lock for entire migration to avoid
|
|
// such that only one server should migrate the entire config
|
|
// at a given time, this big transaction lock ensures this
|
|
// appropriately. This is also true for rotation of encrypted
|
|
// content.
|
|
txnLk := newObject.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/transaction.lock")
|
|
defer func(txnLk RWLocker) {
|
|
txnLk.Unlock()
|
|
|
|
if err != nil {
|
|
var cerr config.Err
|
|
// For any config error, we don't need to drop into safe-mode
|
|
// instead its a user error and should be fixed by user.
|
|
if errors.As(err, &cerr) {
|
|
return
|
|
}
|
|
|
|
// Prints the formatted startup message in safe mode operation.
|
|
// Drops-into safe mode where users need to now manually recover
|
|
// the server.
|
|
printStartupSafeModeMessage(getAPIEndpoints(), err)
|
|
|
|
// Initialization returned error reaching safe mode and
|
|
// not proceeding waiting for admin action.
|
|
handleSignals()
|
|
}
|
|
}(txnLk)
|
|
|
|
// Enable healing to heal drives if possible
|
|
if globalIsXL {
|
|
initBackgroundHealing(ctx, newObject)
|
|
initLocalDisksAutoHeal(ctx, newObject)
|
|
}
|
|
|
|
// **** WARNING ****
|
|
// Migrating to encrypted backend should happen before initialization of any
|
|
// sub-systems, make sure that we do not move the above codeblock elsewhere.
|
|
|
|
// Initializing sub-systems needs a retry mechanism for
|
|
// the following reasons:
|
|
// - Read quorum is lost just after the initialization
|
|
// of the object layer.
|
|
// - Write quorum not met when upgrading configuration
|
|
// version is needed, migration is needed etc.
|
|
rquorum := InsufficientReadQuorum{}
|
|
wquorum := InsufficientWriteQuorum{}
|
|
for range retry.NewTimer(retryCtx) {
|
|
// let one of the server acquire the lock, if not let them timeout.
|
|
// which shall be retried again by this loop.
|
|
if err = txnLk.GetLock(newDynamicTimeout(1*time.Second, 10*time.Second)); err != nil {
|
|
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
|
|
continue
|
|
}
|
|
|
|
// These messages only meant primarily for distributed setup, so only log during distributed setup.
|
|
if globalIsDistXL {
|
|
logger.Info("Waiting for all MinIO sub-systems to be initialized.. lock acquired")
|
|
}
|
|
|
|
// Migrate all backend configs to encrypted backend configs, optionally
|
|
// handles rotating keys for encryption, if there is any retriable failure
|
|
// that shall be retried if there is an error.
|
|
if err = handleEncryptedConfigBackend(newObject, true); err == nil {
|
|
// Upon success migrating the config, initialize all sub-systems
|
|
// if all sub-systems initialized successfully return right away
|
|
if err = initAllSubsystems(retryCtx, newObject); err == nil {
|
|
// All successful return.
|
|
if globalIsDistXL {
|
|
// These messages only meant primarily for distributed setup, so only log during distributed setup.
|
|
logger.Info("All MinIO sub-systems initialized successfully")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// One of these retriable errors shall be retried.
|
|
if errors.Is(err, errDiskNotFound) ||
|
|
errors.Is(err, errConfigNotFound) ||
|
|
errors.Is(err, context.Canceled) ||
|
|
errors.Is(err, context.DeadlineExceeded) ||
|
|
errors.As(err, &rquorum) ||
|
|
errors.As(err, &wquorum) ||
|
|
isErrBucketNotFound(err) {
|
|
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
|
|
txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible.
|
|
continue
|
|
}
|
|
|
|
// Any other unhandled return right here.
|
|
return fmt.Errorf("Unable to initialize sub-systems: %w", err)
|
|
}
|
|
|
|
// Return an error when retry is canceled or deadlined
|
|
if err = retryCtx.Err(); err != nil {
|
|
return fmt.Errorf("Unable to initialize sub-systems: %w", err)
|
|
}
|
|
|
|
// Retry was canceled successfully.
|
|
return errors.New("Initializing sub-systems stopped gracefully")
|
|
}
|
|
|
|
func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
|
// %w is used by all error returns here to make sure
|
|
// we wrap the underlying error, make sure when you
|
|
// are modifying this code that you do so, if and when
|
|
// you want to add extra context to your error. This
|
|
// ensures top level retry works accordingly.
|
|
var buckets []BucketInfo
|
|
if globalIsDistXL || globalIsXL {
|
|
// List buckets to heal, and be re-used for loading configs.
|
|
buckets, err = newObject.ListBucketsHeal(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to list buckets to heal: %w", err)
|
|
}
|
|
// Attempt a heal if possible and re-use the bucket names
|
|
// to reload their config.
|
|
wquorum := &InsufficientWriteQuorum{}
|
|
rquorum := &InsufficientReadQuorum{}
|
|
for _, bucket := range buckets {
|
|
if err = newObject.MakeBucketWithLocation(ctx, bucket.Name, "", false); err != nil {
|
|
if errors.As(err, &wquorum) || errors.As(err, &rquorum) {
|
|
// Return the error upwards for the caller to retry.
|
|
return fmt.Errorf("Unable to heal bucket: %w", err)
|
|
}
|
|
if _, ok := err.(BucketExists); !ok {
|
|
// ignore any other error and log for investigation.
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
// Bucket already exists, nothing that needs to be done.
|
|
}
|
|
}
|
|
} else {
|
|
buckets, err = newObject.ListBuckets(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to list buckets: %w", err)
|
|
}
|
|
}
|
|
|
|
// Initialize config system.
|
|
if err = globalConfigSys.Init(newObject); err != nil {
|
|
return fmt.Errorf("Unable to initialize config system: %w", err)
|
|
}
|
|
|
|
// Populate existing buckets to the etcd backend
|
|
if globalDNSConfig != nil {
|
|
// Background this operation.
|
|
go initFederatorBackend(buckets, newObject)
|
|
}
|
|
|
|
// Initialize bucket metadata sub-system.
|
|
if err := globalBucketMetadataSys.Init(ctx, buckets, newObject); err != nil {
|
|
return fmt.Errorf("Unable to initialize bucket metadata sub-system: %w", err)
|
|
}
|
|
|
|
// Initialize notification system.
|
|
if err = globalNotificationSys.Init(buckets, newObject); err != nil {
|
|
return fmt.Errorf("Unable to initialize notification system: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) {
|
|
// Make sure only 1 crawler is running on the cluster.
|
|
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader")
|
|
for {
|
|
err := locker.GetLock(leaderLockTimeout)
|
|
if err != nil {
|
|
time.Sleep(leaderTick)
|
|
continue
|
|
}
|
|
break
|
|
// No unlock for "leader" lock.
|
|
}
|
|
|
|
if globalIsXL {
|
|
initGlobalHeal(ctx, objAPI)
|
|
}
|
|
|
|
initDataCrawler(ctx, objAPI)
|
|
initQuotaEnforcement(ctx, objAPI)
|
|
}
|
|
|
|
// serverMain handler called for 'minio server' command.
|
|
func serverMain(ctx *cli.Context) {
|
|
if ctx.Args().First() == "help" || !endpointsPresent(ctx) {
|
|
cli.ShowCommandHelpAndExit(ctx, "server", 1)
|
|
}
|
|
setDefaultProfilerRates()
|
|
|
|
// Initialize globalConsoleSys system
|
|
globalConsoleSys = NewConsoleLogger(GlobalContext)
|
|
|
|
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM)
|
|
|
|
// Handle all server command args.
|
|
serverHandleCmdArgs(ctx)
|
|
|
|
// Handle all server environment vars.
|
|
serverHandleEnvVars()
|
|
|
|
// Set node name, only set for distributed setup.
|
|
globalConsoleSys.SetNodeName(globalEndpoints)
|
|
|
|
// Initialize all help
|
|
initHelp()
|
|
|
|
// Check and load TLS certificates.
|
|
var err error
|
|
globalPublicCerts, globalTLSCerts, globalIsSSL, err = getTLSConfig()
|
|
logger.FatalIf(err, "Unable to load the TLS configuration")
|
|
|
|
// Check and load Root CAs.
|
|
globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get())
|
|
logger.FatalIf(err, "Failed to read root CAs (%v)", err)
|
|
|
|
globalMinioEndpoint = func() string {
|
|
host := globalMinioHost
|
|
if host == "" {
|
|
host = sortIPs(localIP4.ToSlice())[0]
|
|
}
|
|
return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort))
|
|
}()
|
|
|
|
// Is distributed setup, error out if no certificates are found for HTTPS endpoints.
|
|
if globalIsDistXL {
|
|
if globalEndpoints.HTTPS() && !globalIsSSL {
|
|
logger.Fatal(config.ErrNoCertsAndHTTPSEndpoints(nil), "Unable to start the server")
|
|
}
|
|
if !globalEndpoints.HTTPS() && globalIsSSL {
|
|
logger.Fatal(config.ErrCertsAndHTTPEndpoints(nil), "Unable to start the server")
|
|
}
|
|
}
|
|
|
|
if !globalCLIContext.Quiet {
|
|
// Check for new updates from dl.min.io.
|
|
checkUpdate(getMinioMode())
|
|
}
|
|
|
|
if !globalActiveCred.IsValid() && globalIsDistXL {
|
|
logger.Fatal(config.ErrEnvCredentialsMissingDistributed(nil),
|
|
"Unable to initialize the server in distributed mode")
|
|
}
|
|
|
|
// Set system resources to maximum.
|
|
setMaxResources()
|
|
|
|
if globalIsXL {
|
|
// New global heal state
|
|
globalAllHealState = newHealState()
|
|
globalBackgroundHealState = newHealState()
|
|
}
|
|
|
|
// Configure server.
|
|
handler, err := configureServerHandler(globalEndpoints)
|
|
if err != nil {
|
|
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
|
|
}
|
|
|
|
var getCert certs.GetCertificateFunc
|
|
if globalTLSCerts != nil {
|
|
getCert = globalTLSCerts.GetCertificate
|
|
}
|
|
|
|
// Annonying hack to ensure that Go doesn't write its own logging,
|
|
// interleaved with our formatted logging, this allows us to
|
|
// honor --json and --quiet flag properly.
|
|
//
|
|
// Unfortunately we have to resort to this sort of hacky approach
|
|
// because, Go automatically initializes ErrorLog on its own
|
|
// and can log without application control.
|
|
//
|
|
// This is an implementation issue in Go and should be fixed, but
|
|
// until then this hack is okay and works for our needs.
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
defer pr.Close()
|
|
scanner := bufio.NewScanner(&contextReader{pr, GlobalContext})
|
|
for scanner.Scan() {
|
|
logger.LogIf(GlobalContext, errors.New(scanner.Text()))
|
|
}
|
|
}()
|
|
|
|
httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{handler}, getCert)
|
|
httpServer.ErrorLog = log.New(pw, "", 0)
|
|
httpServer.BaseContext = func(listener net.Listener) context.Context {
|
|
return GlobalContext
|
|
}
|
|
go func() {
|
|
globalHTTPServerErrorCh <- httpServer.Start()
|
|
}()
|
|
|
|
globalObjLayerMutex.Lock()
|
|
globalHTTPServer = httpServer
|
|
globalObjLayerMutex.Unlock()
|
|
|
|
if globalIsDistXL && globalEndpoints.FirstLocal() {
|
|
for {
|
|
// Additionally in distributed setup, validate the setup and configuration.
|
|
err := verifyServerSystemConfig(globalEndpoints)
|
|
if err == nil {
|
|
break
|
|
}
|
|
logger.LogIf(GlobalContext, err, "Unable to initialize distributed setup")
|
|
select {
|
|
case <-GlobalContext.Done():
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
|
|
logger.SetDeploymentID(globalDeploymentID)
|
|
if err != nil {
|
|
// Stop watching for any certificate changes.
|
|
globalTLSCerts.Stop()
|
|
|
|
globalHTTPServer.Shutdown()
|
|
logger.Fatal(err, "Unable to initialize backend")
|
|
}
|
|
|
|
// Once endpoints are finalized, initialize the new object api in safe mode.
|
|
globalObjLayerMutex.Lock()
|
|
globalSafeMode = true
|
|
globalObjectAPI = newObject
|
|
globalObjLayerMutex.Unlock()
|
|
|
|
newAllSubsystems()
|
|
|
|
go startBackgroundOps(GlobalContext, newObject)
|
|
|
|
logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode")
|
|
|
|
// Initialize users credentials and policies in background.
|
|
go startBackgroundIAMLoad(GlobalContext)
|
|
|
|
if globalCacheConfig.Enabled {
|
|
// initialize the new disk cache objects.
|
|
var cacheAPI CacheObjectLayer
|
|
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
|
|
logger.FatalIf(err, "Unable to initialize disk caching")
|
|
|
|
globalObjLayerMutex.Lock()
|
|
globalCacheObjectAPI = cacheAPI
|
|
globalObjLayerMutex.Unlock()
|
|
}
|
|
|
|
// Disable safe mode operation, after all initialization is over.
|
|
globalObjLayerMutex.Lock()
|
|
globalSafeMode = false
|
|
globalObjLayerMutex.Unlock()
|
|
|
|
// Prints the formatted startup message once object layer is initialized.
|
|
printStartupMessage(getAPIEndpoints())
|
|
|
|
if globalActiveCred.Equal(auth.DefaultCredentials) {
|
|
msg := fmt.Sprintf("Detected default credentials '%s', please change the credentials immediately using 'MINIO_ACCESS_KEY' and 'MINIO_SECRET_KEY'", globalActiveCred)
|
|
logger.StartupMessage(color.RedBold(msg))
|
|
}
|
|
|
|
handleSignals()
|
|
}
|
|
|
|
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
|
func newObjectLayer(ctx context.Context, endpointZones EndpointZones) (newObject ObjectLayer, err error) {
|
|
// For FS only, directly use the disk.
|
|
if endpointZones.NEndpoints() == 1 {
|
|
// Initialize new FS object layer.
|
|
return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path)
|
|
}
|
|
|
|
return newXLZones(ctx, endpointZones)
|
|
}
|