serve s3: implement --auth-proxy
This implements --auth-proxy for serve s3. In addition it: * add listbuckets tests with and without authProxy * use auth proxy test framework * servetest: implement workaround for #7454 * update github.com/rclone/gofakes3 to fix race condition
This commit is contained in:
parent
d4b29fef92
commit
9de485f949
10 changed files with 404 additions and 131 deletions
|
@ -25,22 +25,26 @@ var (
|
|||
// backend for gofakes3
|
||||
type s3Backend struct {
|
||||
opt *Options
|
||||
vfs *vfs.VFS
|
||||
s *Server
|
||||
meta *sync.Map
|
||||
}
|
||||
|
||||
// newBackend creates a new SimpleBucketBackend.
|
||||
func newBackend(vfs *vfs.VFS, opt *Options) gofakes3.Backend {
|
||||
func newBackend(s *Server, opt *Options) gofakes3.Backend {
|
||||
return &s3Backend{
|
||||
vfs: vfs,
|
||||
opt: opt,
|
||||
s: s,
|
||||
meta: new(sync.Map),
|
||||
}
|
||||
}
|
||||
|
||||
// ListBuckets always returns the default bucket.
|
||||
func (b *s3Backend) ListBuckets(ctx context.Context) ([]gofakes3.BucketInfo, error) {
|
||||
dirEntries, err := getDirEntries("/", b.vfs)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dirEntries, err := getDirEntries("/", _vfs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -60,7 +64,11 @@ func (b *s3Backend) ListBuckets(ctx context.Context) ([]gofakes3.BucketInfo, err
|
|||
|
||||
// ListBucket lists the objects in the given bucket.
|
||||
func (b *s3Backend) ListBucket(ctx context.Context, bucket string, prefix *gofakes3.Prefix, page gofakes3.ListBucketPage) (*gofakes3.ObjectList, error) {
|
||||
_, err := b.vfs.Stat(bucket)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = _vfs.Stat(bucket)
|
||||
if err != nil {
|
||||
return nil, gofakes3.BucketNotFound(bucket)
|
||||
}
|
||||
|
@ -79,7 +87,7 @@ func (b *s3Backend) ListBucket(ctx context.Context, bucket string, prefix *gofak
|
|||
response := gofakes3.NewObjectList()
|
||||
path, remaining := prefixParser(prefix)
|
||||
|
||||
err = b.entryListR(bucket, path, remaining, prefix.HasDelimiter, response)
|
||||
err = b.entryListR(_vfs, bucket, path, remaining, prefix.HasDelimiter, response)
|
||||
if err == gofakes3.ErrNoSuchKey {
|
||||
// AWS just returns an empty list
|
||||
response = gofakes3.NewObjectList()
|
||||
|
@ -94,13 +102,17 @@ func (b *s3Backend) ListBucket(ctx context.Context, bucket string, prefix *gofak
|
|||
//
|
||||
// Note that the metadata is not supported yet.
|
||||
func (b *s3Backend) HeadObject(ctx context.Context, bucketName, objectName string) (*gofakes3.Object, error) {
|
||||
_, err := b.vfs.Stat(bucketName)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = _vfs.Stat(bucketName)
|
||||
if err != nil {
|
||||
return nil, gofakes3.BucketNotFound(bucketName)
|
||||
}
|
||||
|
||||
fp := path.Join(bucketName, objectName)
|
||||
node, err := b.vfs.Stat(fp)
|
||||
node, err := _vfs.Stat(fp)
|
||||
if err != nil {
|
||||
return nil, gofakes3.KeyNotFound(objectName)
|
||||
}
|
||||
|
@ -141,13 +153,17 @@ func (b *s3Backend) HeadObject(ctx context.Context, bucketName, objectName strin
|
|||
|
||||
// GetObject fetchs the object from the filesystem.
|
||||
func (b *s3Backend) GetObject(ctx context.Context, bucketName, objectName string, rangeRequest *gofakes3.ObjectRangeRequest) (obj *gofakes3.Object, err error) {
|
||||
_, err = b.vfs.Stat(bucketName)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = _vfs.Stat(bucketName)
|
||||
if err != nil {
|
||||
return nil, gofakes3.BucketNotFound(bucketName)
|
||||
}
|
||||
|
||||
fp := path.Join(bucketName, objectName)
|
||||
node, err := b.vfs.Stat(fp)
|
||||
node, err := _vfs.Stat(fp)
|
||||
if err != nil {
|
||||
return nil, gofakes3.KeyNotFound(objectName)
|
||||
}
|
||||
|
@ -223,9 +239,13 @@ func (b *s3Backend) storeModtime(fp string, meta map[string]string, val string)
|
|||
|
||||
// TouchObject creates or updates meta on specified object.
|
||||
func (b *s3Backend) TouchObject(ctx context.Context, fp string, meta map[string]string) (result gofakes3.PutObjectResult, err error) {
|
||||
_, err = b.vfs.Stat(fp)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
_, err = _vfs.Stat(fp)
|
||||
if err == vfs.ENOENT {
|
||||
f, err := b.vfs.Create(fp)
|
||||
f, err := _vfs.Create(fp)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
@ -235,7 +255,7 @@ func (b *s3Backend) TouchObject(ctx context.Context, fp string, meta map[string]
|
|||
return result, err
|
||||
}
|
||||
|
||||
_, err = b.vfs.Stat(fp)
|
||||
_, err = _vfs.Stat(fp)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
@ -246,7 +266,7 @@ func (b *s3Backend) TouchObject(ctx context.Context, fp string, meta map[string]
|
|||
ti, err := swift.FloatStringToTime(val)
|
||||
if err == nil {
|
||||
b.storeModtime(fp, meta, val)
|
||||
return result, b.vfs.Chtimes(fp, ti, ti)
|
||||
return result, _vfs.Chtimes(fp, ti, ti)
|
||||
}
|
||||
// ignore error since the file is successfully created
|
||||
}
|
||||
|
@ -255,7 +275,7 @@ func (b *s3Backend) TouchObject(ctx context.Context, fp string, meta map[string]
|
|||
ti, err := swift.FloatStringToTime(val)
|
||||
if err == nil {
|
||||
b.storeModtime(fp, meta, val)
|
||||
return result, b.vfs.Chtimes(fp, ti, ti)
|
||||
return result, _vfs.Chtimes(fp, ti, ti)
|
||||
}
|
||||
// ignore error since the file is successfully created
|
||||
}
|
||||
|
@ -270,7 +290,11 @@ func (b *s3Backend) PutObject(
|
|||
meta map[string]string,
|
||||
input io.Reader, size int64,
|
||||
) (result gofakes3.PutObjectResult, err error) {
|
||||
_, err = b.vfs.Stat(bucketName)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
_, err = _vfs.Stat(bucketName)
|
||||
if err != nil {
|
||||
return result, gofakes3.BucketNotFound(bucketName)
|
||||
}
|
||||
|
@ -284,12 +308,12 @@ func (b *s3Backend) PutObject(
|
|||
// }
|
||||
|
||||
if objectDir != "." {
|
||||
if err := mkdirRecursive(objectDir, b.vfs); err != nil {
|
||||
if err := mkdirRecursive(objectDir, _vfs); err != nil {
|
||||
return result, err
|
||||
}
|
||||
}
|
||||
|
||||
f, err := b.vfs.Create(fp)
|
||||
f, err := _vfs.Create(fp)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
@ -297,17 +321,17 @@ func (b *s3Backend) PutObject(
|
|||
if _, err := io.Copy(f, input); err != nil {
|
||||
// remove file when i/o error occurred (FsPutErr)
|
||||
_ = f.Close()
|
||||
_ = b.vfs.Remove(fp)
|
||||
_ = _vfs.Remove(fp)
|
||||
return result, err
|
||||
}
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
// remove file when close error occurred (FsPutErr)
|
||||
_ = b.vfs.Remove(fp)
|
||||
_ = _vfs.Remove(fp)
|
||||
return result, err
|
||||
}
|
||||
|
||||
_, err = b.vfs.Stat(fp)
|
||||
_, err = _vfs.Stat(fp)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
@ -318,16 +342,13 @@ func (b *s3Backend) PutObject(
|
|||
ti, err := swift.FloatStringToTime(val)
|
||||
if err == nil {
|
||||
b.storeModtime(fp, meta, val)
|
||||
return result, b.vfs.Chtimes(fp, ti, ti)
|
||||
return result, _vfs.Chtimes(fp, ti, ti)
|
||||
}
|
||||
// ignore error since the file is successfully created
|
||||
}
|
||||
|
||||
if val, ok := meta["mtime"]; ok {
|
||||
ti, err := swift.FloatStringToTime(val)
|
||||
if err == nil {
|
||||
if val, ok := meta["mtime"]; ok {
|
||||
b.storeModtime(fp, meta, val)
|
||||
return result, b.vfs.Chtimes(fp, ti, ti)
|
||||
return result, _vfs.Chtimes(fp, ti, ti)
|
||||
}
|
||||
// ignore error since the file is successfully created
|
||||
}
|
||||
|
@ -338,7 +359,7 @@ func (b *s3Backend) PutObject(
|
|||
// DeleteMulti deletes multiple objects in a single request.
|
||||
func (b *s3Backend) DeleteMulti(ctx context.Context, bucketName string, objects ...string) (result gofakes3.MultiDeleteResult, rerr error) {
|
||||
for _, object := range objects {
|
||||
if err := b.deleteObject(bucketName, object); err != nil {
|
||||
if err := b.deleteObject(ctx, bucketName, object); err != nil {
|
||||
fs.Errorf("serve s3", "delete object failed: %v", err)
|
||||
result.Error = append(result.Error, gofakes3.ErrorResult{
|
||||
Code: gofakes3.ErrInternal,
|
||||
|
@ -357,12 +378,16 @@ func (b *s3Backend) DeleteMulti(ctx context.Context, bucketName string, objects
|
|||
|
||||
// DeleteObject deletes the object with the given name.
|
||||
func (b *s3Backend) DeleteObject(ctx context.Context, bucketName, objectName string) (result gofakes3.ObjectDeleteResult, rerr error) {
|
||||
return result, b.deleteObject(bucketName, objectName)
|
||||
return result, b.deleteObject(ctx, bucketName, objectName)
|
||||
}
|
||||
|
||||
// deleteObject deletes the object from the filesystem.
|
||||
func (b *s3Backend) deleteObject(bucketName, objectName string) error {
|
||||
_, err := b.vfs.Stat(bucketName)
|
||||
func (b *s3Backend) deleteObject(ctx context.Context, bucketName, objectName string) error {
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = _vfs.Stat(bucketName)
|
||||
if err != nil {
|
||||
return gofakes3.BucketNotFound(bucketName)
|
||||
}
|
||||
|
@ -370,18 +395,22 @@ func (b *s3Backend) deleteObject(bucketName, objectName string) error {
|
|||
fp := path.Join(bucketName, objectName)
|
||||
// S3 does not report an error when attemping to delete a key that does not exist, so
|
||||
// we need to skip IsNotExist errors.
|
||||
if err := b.vfs.Remove(fp); err != nil && !os.IsNotExist(err) {
|
||||
if err := _vfs.Remove(fp); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// FIXME: unsafe operation
|
||||
rmdirRecursive(fp, b.vfs)
|
||||
rmdirRecursive(fp, _vfs)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateBucket creates a new bucket.
|
||||
func (b *s3Backend) CreateBucket(ctx context.Context, name string) error {
|
||||
_, err := b.vfs.Stat(name)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = _vfs.Stat(name)
|
||||
if err != nil && err != vfs.ENOENT {
|
||||
return gofakes3.ErrInternal
|
||||
}
|
||||
|
@ -390,7 +419,7 @@ func (b *s3Backend) CreateBucket(ctx context.Context, name string) error {
|
|||
return gofakes3.ErrBucketAlreadyExists
|
||||
}
|
||||
|
||||
if err := b.vfs.Mkdir(name, 0755); err != nil {
|
||||
if err := _vfs.Mkdir(name, 0755); err != nil {
|
||||
return gofakes3.ErrInternal
|
||||
}
|
||||
return nil
|
||||
|
@ -398,12 +427,16 @@ func (b *s3Backend) CreateBucket(ctx context.Context, name string) error {
|
|||
|
||||
// DeleteBucket deletes the bucket with the given name.
|
||||
func (b *s3Backend) DeleteBucket(ctx context.Context, name string) error {
|
||||
_, err := b.vfs.Stat(name)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = _vfs.Stat(name)
|
||||
if err != nil {
|
||||
return gofakes3.BucketNotFound(name)
|
||||
}
|
||||
|
||||
if err := b.vfs.Remove(name); err != nil {
|
||||
if err := _vfs.Remove(name); err != nil {
|
||||
return gofakes3.ErrBucketNotEmpty
|
||||
}
|
||||
|
||||
|
@ -412,7 +445,11 @@ func (b *s3Backend) DeleteBucket(ctx context.Context, name string) error {
|
|||
|
||||
// BucketExists checks if the bucket exists.
|
||||
func (b *s3Backend) BucketExists(ctx context.Context, name string) (exists bool, err error) {
|
||||
_, err = b.vfs.Stat(name)
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
_, err = _vfs.Stat(name)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -422,6 +459,10 @@ func (b *s3Backend) BucketExists(ctx context.Context, name string) (exists bool,
|
|||
|
||||
// CopyObject copy specified object from srcKey to dstKey.
|
||||
func (b *s3Backend) CopyObject(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string, meta map[string]string) (result gofakes3.CopyObjectResult, err error) {
|
||||
_vfs, err := b.s.getVFS(ctx)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
fp := path.Join(srcBucket, srcKey)
|
||||
if srcBucket == dstBucket && srcKey == dstKey {
|
||||
b.meta.Store(fp, meta)
|
||||
|
@ -439,10 +480,10 @@ func (b *s3Backend) CopyObject(ctx context.Context, srcBucket, srcKey, dstBucket
|
|||
}
|
||||
b.storeModtime(fp, meta, val)
|
||||
|
||||
return result, b.vfs.Chtimes(fp, ti, ti)
|
||||
return result, _vfs.Chtimes(fp, ti, ti)
|
||||
}
|
||||
|
||||
cStat, err := b.vfs.Stat(fp)
|
||||
cStat, err := _vfs.Stat(fp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -5,12 +5,13 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/rclone/gofakes3"
|
||||
"github.com/rclone/rclone/vfs"
|
||||
)
|
||||
|
||||
func (b *s3Backend) entryListR(bucket, fdPath, name string, addPrefix bool, response *gofakes3.ObjectList) error {
|
||||
func (b *s3Backend) entryListR(_vfs *vfs.VFS, bucket, fdPath, name string, addPrefix bool, response *gofakes3.ObjectList) error {
|
||||
fp := path.Join(bucket, fdPath)
|
||||
|
||||
dirEntries, err := getDirEntries(fp, b.vfs)
|
||||
dirEntries, err := getDirEntries(fp, _vfs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -30,7 +31,7 @@ func (b *s3Backend) entryListR(bucket, fdPath, name string, addPrefix bool, resp
|
|||
response.AddPrefix(gofakes3.URLEncode(objectPath))
|
||||
continue
|
||||
}
|
||||
err := b.entryListR(bucket, path.Join(fdPath, object), "", false, response)
|
||||
err := b.entryListR(_vfs, bucket, path.Join(fdPath, object), "", false, response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
"github.com/rclone/rclone/cmd/serve/proxy/proxyflags"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config/flags"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
httplib "github.com/rclone/rclone/lib/http"
|
||||
|
@ -20,6 +22,7 @@ var DefaultOpt = Options{
|
|||
hashName: "MD5",
|
||||
hashType: hash.MD5,
|
||||
noCleanup: false,
|
||||
Auth: httplib.DefaultAuthCfg(),
|
||||
HTTP: httplib.DefaultCfg(),
|
||||
}
|
||||
|
||||
|
@ -30,8 +33,10 @@ const flagPrefix = ""
|
|||
|
||||
func init() {
|
||||
flagSet := Command.Flags()
|
||||
httplib.AddAuthFlagsPrefix(flagSet, flagPrefix, &Opt.Auth)
|
||||
httplib.AddHTTPFlagsPrefix(flagSet, flagPrefix, &Opt.HTTP)
|
||||
vfsflags.AddFlags(flagSet)
|
||||
proxyflags.AddFlags(flagSet)
|
||||
flags.BoolVarP(flagSet, &Opt.pathBucketMode, "force-path-style", "", Opt.pathBucketMode, "If true use path style access if false use virtual hosted style (default true)", "")
|
||||
flags.StringVarP(flagSet, &Opt.hashName, "etag-hash", "", Opt.hashName, "Which hash to use for the ETag, or auto or blank for off", "")
|
||||
flags.StringArrayVarP(flagSet, &Opt.authPair, "auth-key", "", Opt.authPair, "Set key pair for v4 authorization: access_key_id,secret_access_key", "")
|
||||
|
@ -55,10 +60,15 @@ var Command = &cobra.Command{
|
|||
},
|
||||
Use: "s3 remote:path",
|
||||
Short: `Serve remote:path over s3.`,
|
||||
Long: help() + httplib.Help(flagPrefix) + vfs.Help(),
|
||||
Long: help() + httplib.AuthHelp(flagPrefix) + httplib.Help(flagPrefix) + vfs.Help(),
|
||||
RunE: func(command *cobra.Command, args []string) error {
|
||||
cmd.CheckArgs(1, 1, command, args)
|
||||
f := cmd.NewFsSrc(args)
|
||||
var f fs.Fs
|
||||
if proxyflags.Opt.AuthProxy == "" {
|
||||
cmd.CheckArgs(1, 1, command, args)
|
||||
f = cmd.NewFsSrc(args)
|
||||
} else {
|
||||
cmd.CheckArgs(0, 0, command, args)
|
||||
}
|
||||
|
||||
if Opt.hashName == "auto" {
|
||||
Opt.hashType = f.Hashes().GetOne()
|
||||
|
@ -73,13 +83,13 @@ var Command = &cobra.Command{
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
router := s.Router()
|
||||
router := s.server.Router()
|
||||
s.Bind(router)
|
||||
err = s.serve()
|
||||
err = s.Serve()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.Wait()
|
||||
s.server.Wait()
|
||||
return nil
|
||||
})
|
||||
return nil
|
||||
|
|
|
@ -9,10 +9,8 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -21,6 +19,7 @@ import (
|
|||
"github.com/rclone/rclone/fs/object"
|
||||
|
||||
_ "github.com/rclone/rclone/backend/local"
|
||||
"github.com/rclone/rclone/cmd/serve/proxy/proxyflags"
|
||||
"github.com/rclone/rclone/cmd/serve/servetest"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config/configmap"
|
||||
|
@ -37,7 +36,7 @@ const (
|
|||
)
|
||||
|
||||
// Configure and serve the server
|
||||
func serveS3(f fs.Fs) (testURL string, keyid string, keysec string) {
|
||||
func serveS3(f fs.Fs) (testURL string, keyid string, keysec string, w *Server) {
|
||||
keyid = random.String(16)
|
||||
keysec = random.String(16)
|
||||
serveropt := &Options{
|
||||
|
@ -49,12 +48,12 @@ func serveS3(f fs.Fs) (testURL string, keyid string, keysec string) {
|
|||
}
|
||||
|
||||
serveropt.HTTP.ListenAddr = []string{endpoint}
|
||||
w, _ := newServer(context.Background(), f, serveropt)
|
||||
router := w.Router()
|
||||
w, _ = newServer(context.Background(), f, serveropt)
|
||||
router := w.server.Router()
|
||||
|
||||
w.Bind(router)
|
||||
w.Serve()
|
||||
testURL = w.Server.URLs()[0]
|
||||
_ = w.Serve()
|
||||
testURL = w.server.URLs()[0]
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -63,7 +62,7 @@ func serveS3(f fs.Fs) (testURL string, keyid string, keysec string) {
|
|||
// s3 remote against it.
|
||||
func TestS3(t *testing.T) {
|
||||
start := func(f fs.Fs) (configmap.Simple, func()) {
|
||||
testURL, keyid, keysec := serveS3(f)
|
||||
testURL, keyid, keysec, _ := serveS3(f)
|
||||
// Config for the backend we'll use to connect to the server
|
||||
config := configmap.Simple{
|
||||
"type": "s3",
|
||||
|
@ -76,62 +75,7 @@ func TestS3(t *testing.T) {
|
|||
return config, func() {}
|
||||
}
|
||||
|
||||
RunS3UnitTests(t, "s3", start)
|
||||
}
|
||||
|
||||
func RunS3UnitTests(t *testing.T, name string, start servetest.StartFn) {
|
||||
fstest.Initialise()
|
||||
ci := fs.GetConfig(context.Background())
|
||||
ci.DisableFeatures = append(ci.DisableFeatures, "Metadata")
|
||||
|
||||
fremote, _, clean, err := fstest.RandomRemote()
|
||||
assert.NoError(t, err)
|
||||
defer clean()
|
||||
|
||||
err = fremote.Mkdir(context.Background(), "")
|
||||
assert.NoError(t, err)
|
||||
|
||||
f := fremote
|
||||
config, cleanup := start(f)
|
||||
defer cleanup()
|
||||
|
||||
// Change directory to run the tests
|
||||
cwd, err := os.Getwd()
|
||||
require.NoError(t, err)
|
||||
err = os.Chdir("../../../backend/" + name)
|
||||
require.NoError(t, err, "failed to cd to "+name+" backend")
|
||||
defer func() {
|
||||
// Change back to the old directory
|
||||
require.NoError(t, os.Chdir(cwd))
|
||||
}()
|
||||
|
||||
// RunS3UnitTests the backend tests with an on the fly remote
|
||||
args := []string{"test"}
|
||||
if testing.Verbose() {
|
||||
args = append(args, "-v")
|
||||
}
|
||||
if *fstest.Verbose {
|
||||
args = append(args, "-verbose")
|
||||
}
|
||||
remoteName := "serve" + name + ":"
|
||||
args = append(args, "-remote", remoteName)
|
||||
args = append(args, "-run", "^TestIntegration$")
|
||||
args = append(args, "-list-retries", fmt.Sprint(*fstest.ListRetries))
|
||||
cmd := exec.Command("go", args...)
|
||||
|
||||
// Configure the backend with environment variables
|
||||
cmd.Env = os.Environ()
|
||||
prefix := "RCLONE_CONFIG_" + strings.ToUpper(remoteName[:len(remoteName)-1]) + "_"
|
||||
for k, v := range config {
|
||||
cmd.Env = append(cmd.Env, prefix+strings.ToUpper(k)+"="+v)
|
||||
}
|
||||
|
||||
// RunS3UnitTests the test
|
||||
out, err := cmd.CombinedOutput()
|
||||
if len(out) != 0 {
|
||||
t.Logf("\n----------\n%s----------\n", string(out))
|
||||
}
|
||||
assert.NoError(t, err, "Running "+name+" integration tests")
|
||||
servetest.Run(t, "s3", start)
|
||||
}
|
||||
|
||||
// tests using the minio client
|
||||
|
@ -181,7 +125,7 @@ func TestEncodingWithMinioClient(t *testing.T) {
|
|||
_, err = f.Put(context.Background(), in, obji)
|
||||
assert.NoError(t, err)
|
||||
|
||||
endpoint, keyid, keysec := serveS3(f)
|
||||
endpoint, keyid, keysec, _ := serveS3(f)
|
||||
testURL, _ := url.Parse(endpoint)
|
||||
minioClient, err := minio.New(testURL.Host, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(keyid, keysec, ""),
|
||||
|
@ -200,5 +144,161 @@ func TestEncodingWithMinioClient(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type FileStuct struct {
|
||||
path string
|
||||
filename string
|
||||
}
|
||||
|
||||
type TestCase struct {
|
||||
description string
|
||||
bucket string
|
||||
files []FileStuct
|
||||
keyID string
|
||||
keySec string
|
||||
shouldFail bool
|
||||
}
|
||||
|
||||
func testListBuckets(t *testing.T, cases []TestCase, useProxy bool) {
|
||||
fstest.Initialise()
|
||||
|
||||
var f fs.Fs
|
||||
if useProxy {
|
||||
// the backend config will be made by the proxy
|
||||
prog, err := filepath.Abs("../servetest/proxy_code.go")
|
||||
require.NoError(t, err)
|
||||
files, err := filepath.Abs("testdata")
|
||||
require.NoError(t, err)
|
||||
cmd := "go run " + prog + " " + files
|
||||
|
||||
// FIXME: this is untidy setting a global variable!
|
||||
proxyflags.Opt.AuthProxy = cmd
|
||||
defer func() {
|
||||
proxyflags.Opt.AuthProxy = ""
|
||||
}()
|
||||
|
||||
f = nil
|
||||
} else {
|
||||
// create a test Fs
|
||||
var err error
|
||||
f, err = fs.NewFs(context.Background(), "testdata")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, tt := range cases {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
endpoint, keyid, keysec, s := serveS3(f)
|
||||
defer func() {
|
||||
assert.NoError(t, s.server.Shutdown())
|
||||
}()
|
||||
|
||||
if tt.keyID != "" {
|
||||
keyid = tt.keyID
|
||||
}
|
||||
if tt.keySec != "" {
|
||||
keysec = tt.keySec
|
||||
}
|
||||
|
||||
testURL, _ := url.Parse(endpoint)
|
||||
minioClient, err := minio.New(testURL.Host, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(keyid, keysec, ""),
|
||||
Secure: false,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
buckets, err := minioClient.ListBuckets(context.Background())
|
||||
if tt.shouldFail {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, buckets)
|
||||
assert.Equal(t, buckets[0].Name, tt.bucket)
|
||||
|
||||
o := minioClient.ListObjects(context.Background(), tt.bucket, minio.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
})
|
||||
// save files after reading from channel
|
||||
objects := []string{}
|
||||
for object := range o {
|
||||
objects = append(objects, object.Key)
|
||||
}
|
||||
|
||||
for _, tt := range tt.files {
|
||||
file := path.Join(tt.path, tt.filename)
|
||||
found := false
|
||||
for _, fname := range objects {
|
||||
if file == fname {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.Equal(t, true, found, "Object not found: "+file)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestListBuckets(t *testing.T) {
|
||||
var cases = []TestCase{
|
||||
{
|
||||
description: "list buckets",
|
||||
bucket: "mybucket",
|
||||
files: []FileStuct{
|
||||
{
|
||||
path: "",
|
||||
filename: "lorem.txt",
|
||||
},
|
||||
{
|
||||
path: "foo",
|
||||
filename: "bar.txt",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "list buckets: wrong s3 key",
|
||||
bucket: "mybucket",
|
||||
keyID: "invalid",
|
||||
shouldFail: true,
|
||||
},
|
||||
{
|
||||
description: "list buckets: wrong s3 secret",
|
||||
bucket: "mybucket",
|
||||
keySec: "invalid",
|
||||
shouldFail: true,
|
||||
},
|
||||
}
|
||||
|
||||
testListBuckets(t, cases, false)
|
||||
}
|
||||
|
||||
func TestListBucketsAuthProxy(t *testing.T) {
|
||||
var cases = []TestCase{
|
||||
{
|
||||
description: "list buckets",
|
||||
bucket: "mybucket",
|
||||
// request with random keyid
|
||||
// instead of what was set in 'authPair'
|
||||
keyID: random.String(16),
|
||||
files: []FileStuct{
|
||||
{
|
||||
path: "",
|
||||
filename: "lorem.txt",
|
||||
},
|
||||
{
|
||||
path: "foo",
|
||||
filename: "bar.txt",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "list buckets: wrong s3 secret",
|
||||
bucket: "mybucket",
|
||||
keySec: "invalid",
|
||||
shouldFail: true,
|
||||
},
|
||||
}
|
||||
|
||||
testListBuckets(t, cases, true)
|
||||
}
|
||||
|
|
|
@ -3,12 +3,19 @@ package s3
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/rclone/gofakes3"
|
||||
"github.com/rclone/gofakes3/signature"
|
||||
"github.com/rclone/rclone/cmd/serve/proxy"
|
||||
"github.com/rclone/rclone/cmd/serve/proxy/proxyflags"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
httplib "github.com/rclone/rclone/lib/http"
|
||||
|
@ -16,6 +23,12 @@ import (
|
|||
"github.com/rclone/rclone/vfs/vfscommon"
|
||||
)
|
||||
|
||||
type ctxKey int
|
||||
|
||||
const (
|
||||
ctxKeyID ctxKey = iota
|
||||
)
|
||||
|
||||
// Options contains options for the http Server
|
||||
type Options struct {
|
||||
//TODO add more options
|
||||
|
@ -24,17 +37,20 @@ type Options struct {
|
|||
hashType hash.Type
|
||||
authPair []string
|
||||
noCleanup bool
|
||||
Auth httplib.AuthConfig
|
||||
HTTP httplib.Config
|
||||
}
|
||||
|
||||
// Server is a s3.FileSystem interface
|
||||
type Server struct {
|
||||
*httplib.Server
|
||||
f fs.Fs
|
||||
vfs *vfs.VFS
|
||||
faker *gofakes3.GoFakeS3
|
||||
handler http.Handler
|
||||
ctx context.Context // for global config
|
||||
server *httplib.Server
|
||||
f fs.Fs
|
||||
_vfs *vfs.VFS // don't use directly, use getVFS
|
||||
faker *gofakes3.GoFakeS3
|
||||
handler http.Handler
|
||||
proxy *proxy.Proxy
|
||||
ctx context.Context // for global config
|
||||
s3Secret string
|
||||
}
|
||||
|
||||
// Make a new S3 Server to serve the remote
|
||||
|
@ -42,16 +58,17 @@ func newServer(ctx context.Context, f fs.Fs, opt *Options) (s *Server, err error
|
|||
w := &Server{
|
||||
f: f,
|
||||
ctx: ctx,
|
||||
vfs: vfs.New(f, &vfscommon.Opt),
|
||||
}
|
||||
|
||||
if len(opt.authPair) == 0 {
|
||||
fs.Logf("serve s3", "No auth provided so allowing anonymous access")
|
||||
} else {
|
||||
w.s3Secret = getAuthSecret(opt.authPair)
|
||||
}
|
||||
|
||||
var newLogger logger
|
||||
w.faker = gofakes3.New(
|
||||
newBackend(w.vfs, opt),
|
||||
newBackend(w, opt),
|
||||
gofakes3.WithHostBucket(!opt.pathBucketMode),
|
||||
gofakes3.WithLogger(newLogger),
|
||||
gofakes3.WithRequestID(rand.Uint64()),
|
||||
|
@ -60,24 +77,124 @@ func newServer(ctx context.Context, f fs.Fs, opt *Options) (s *Server, err error
|
|||
gofakes3.WithIntegrityCheck(true), // Check Content-MD5 if supplied
|
||||
)
|
||||
|
||||
w.Server, err = httplib.NewServer(ctx,
|
||||
w.handler = http.NewServeMux()
|
||||
w.handler = w.faker.Server()
|
||||
|
||||
if proxyflags.Opt.AuthProxy != "" {
|
||||
w.proxy = proxy.New(ctx, &proxyflags.Opt)
|
||||
// proxy auth middleware
|
||||
w.handler = proxyAuthMiddleware(w.handler, w)
|
||||
w.handler = authPairMiddleware(w.handler, w)
|
||||
} else {
|
||||
w._vfs = vfs.New(f, &vfscommon.Opt)
|
||||
|
||||
if len(opt.authPair) > 0 {
|
||||
w.faker.AddAuthKeys(authlistResolver(opt.authPair))
|
||||
}
|
||||
}
|
||||
|
||||
w.server, err = httplib.NewServer(ctx,
|
||||
httplib.WithConfig(opt.HTTP),
|
||||
httplib.WithAuth(opt.Auth),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to init server: %w", err)
|
||||
}
|
||||
|
||||
w.handler = w.faker.Server()
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (w *Server) getVFS(ctx context.Context) (VFS *vfs.VFS, err error) {
|
||||
if w._vfs != nil {
|
||||
return w._vfs, nil
|
||||
}
|
||||
|
||||
value := ctx.Value(ctxKeyID)
|
||||
if value == nil {
|
||||
return nil, errors.New("no VFS found in context")
|
||||
}
|
||||
|
||||
VFS, ok := value.(*vfs.VFS)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("context value is not VFS: %#v", value)
|
||||
}
|
||||
return VFS, nil
|
||||
}
|
||||
|
||||
// auth does proxy authorization
|
||||
func (w *Server) auth(accessKeyID string) (value interface{}, err error) {
|
||||
VFS, _, err := w.proxy.Call(stringToMd5Hash(accessKeyID), accessKeyID, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return VFS, err
|
||||
}
|
||||
|
||||
// Bind register the handler to http.Router
|
||||
func (w *Server) Bind(router chi.Router) {
|
||||
router.Handle("/*", w.handler)
|
||||
}
|
||||
|
||||
func (w *Server) serve() error {
|
||||
w.Serve()
|
||||
fs.Logf(w.f, "Starting s3 server on %s", w.URLs())
|
||||
// Serve serves the s3 server
|
||||
func (w *Server) Serve() error {
|
||||
w.server.Serve()
|
||||
fs.Logf(w.f, "Starting s3 server on %s", w.server.URLs())
|
||||
return nil
|
||||
}
|
||||
|
||||
func authPairMiddleware(next http.Handler, ws *Server) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
accessKey, _ := parseAccessKeyID(r)
|
||||
// set the auth pair
|
||||
authPair := map[string]string{
|
||||
accessKey: ws.s3Secret,
|
||||
}
|
||||
ws.faker.AddAuthKeys(authPair)
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func proxyAuthMiddleware(next http.Handler, ws *Server) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
accessKey, _ := parseAccessKeyID(r)
|
||||
value, err := ws.auth(accessKey)
|
||||
if err != nil {
|
||||
fs.Infof(r.URL.Path, "%s: Auth failed: %v", r.RemoteAddr, err)
|
||||
}
|
||||
if value != nil {
|
||||
r = r.WithContext(context.WithValue(r.Context(), ctxKeyID, value))
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func parseAccessKeyID(r *http.Request) (accessKey string, error signature.ErrorCode) {
|
||||
v4Auth := r.Header.Get("Authorization")
|
||||
req, err := signature.ParseSignV4(v4Auth)
|
||||
if err != signature.ErrNone {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return req.Credential.GetAccessKey(), signature.ErrNone
|
||||
}
|
||||
|
||||
func stringToMd5Hash(s string) string {
|
||||
hasher := md5.New()
|
||||
hasher.Write([]byte(s))
|
||||
return hex.EncodeToString(hasher.Sum(nil))
|
||||
}
|
||||
|
||||
func getAuthSecret(authPair []string) string {
|
||||
if len(authPair) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
splited := strings.Split(authPair[0], ",")
|
||||
if len(splited) != 2 {
|
||||
return ""
|
||||
}
|
||||
|
||||
secret := strings.TrimSpace(splited[1])
|
||||
return secret
|
||||
}
|
||||
|
|
1
cmd/serve/s3/testdata/mybucket/foo/bar.txt
vendored
Normal file
1
cmd/serve/s3/testdata/mybucket/foo/bar.txt
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
I am inside a folder
|
1
cmd/serve/s3/testdata/mybucket/lorem.txt
vendored
Normal file
1
cmd/serve/s3/testdata/mybucket/lorem.txt
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
lorem epsum gipsum
|
|
@ -76,7 +76,7 @@ func run(t *testing.T, name string, start StartFn, useProxy bool) {
|
|||
if *fstest.Verbose {
|
||||
args = append(args, "-verbose")
|
||||
}
|
||||
remoteName := name + "test:"
|
||||
remoteName := "serve" + name + "test:"
|
||||
if *subRun != "" {
|
||||
args = append(args, "-run", *subRun)
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -52,7 +52,7 @@ require (
|
|||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
|
||||
github.com/prometheus/client_golang v1.19.1
|
||||
github.com/putdotio/go-putio/putio v0.0.0-20200123120452-16d982cac2b8
|
||||
github.com/rclone/gofakes3 v0.0.3-0.20240715104526-0c656d1755f9
|
||||
github.com/rclone/gofakes3 v0.0.3-0.20240716093803-d6abc178be56
|
||||
github.com/rfjakob/eme v1.1.2
|
||||
github.com/rivo/uniseg v0.4.7
|
||||
github.com/rogpeppe/go-internal v1.12.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -460,6 +460,8 @@ github.com/rclone/gofakes3 v0.0.3-0.20240710114216-d61b9c9b56e3 h1:VV56i89SMfX/s
|
|||
github.com/rclone/gofakes3 v0.0.3-0.20240710114216-d61b9c9b56e3/go.mod h1:L0VIBE0mT6ArN/5dfHsJm3UjqCpi5B/cdN+qWDNh7ko=
|
||||
github.com/rclone/gofakes3 v0.0.3-0.20240715104526-0c656d1755f9 h1:2R9eePKGGwhB6eiXr4r+U1UDWLeN+Yz0xeyyaodi2h0=
|
||||
github.com/rclone/gofakes3 v0.0.3-0.20240715104526-0c656d1755f9/go.mod h1:L0VIBE0mT6ArN/5dfHsJm3UjqCpi5B/cdN+qWDNh7ko=
|
||||
github.com/rclone/gofakes3 v0.0.3-0.20240716093803-d6abc178be56 h1:JmCt3EsTnlZrg/PHIyZqvKDRvBCde/rmThAQFliE9bU=
|
||||
github.com/rclone/gofakes3 v0.0.3-0.20240716093803-d6abc178be56/go.mod h1:L0VIBE0mT6ArN/5dfHsJm3UjqCpi5B/cdN+qWDNh7ko=
|
||||
github.com/relvacode/iso8601 v1.3.0 h1:HguUjsGpIMh/zsTczGN3DVJFxTU/GX+MMmzcKoMO7ko=
|
||||
github.com/relvacode/iso8601 v1.3.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I=
|
||||
github.com/rfjakob/eme v1.1.2 h1:SxziR8msSOElPayZNFfQw4Tjx/Sbaeeh3eRvrHVMUs4=
|
||||
|
|
Loading…
Reference in a new issue