Remove minio/gateways

This commit is contained in:
Evgeniy Kulikov 2020-07-06 12:22:09 +03:00
parent f36a911914
commit f117e2207d
16 changed files with 1 additions and 6762 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,340 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2017 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package azure
import (
"encoding/base64"
"fmt"
"net/http"
"os"
"reflect"
"strconv"
"testing"
"github.com/dustin/go-humanize"
"github.com/Azure/azure-storage-blob-go/azblob"
minio "github.com/minio/minio/cmd"
)
func TestParseStorageEndpoint(t *testing.T) {
testCases := []struct {
host string
accountName string
expectedURL string
expectedErr error
}{
{
"", "myaccount", "https://myaccount.blob.core.windows.net", nil,
},
{
"myaccount.blob.core.usgovcloudapi.net", "myaccount", "https://myaccount.blob.core.usgovcloudapi.net", nil,
},
{
"http://localhost:10000", "myaccount", "http://localhost:10000/myaccount", nil,
},
}
for i, testCase := range testCases {
endpointURL, err := parseStorageEndpoint(testCase.host, testCase.accountName)
if err != testCase.expectedErr {
t.Errorf("Test %d: Expected error %s, got %s", i+1, testCase.expectedErr, err)
}
if endpointURL.String() != testCase.expectedURL {
t.Errorf("Test %d: Expected URL %s, got %s", i+1, testCase.expectedURL, endpointURL.String())
}
}
}
// Test canonical metadata.
func TestS3MetaToAzureProperties(t *testing.T) {
headers := map[string]string{
"accept-encoding": "gzip",
"content-encoding": "gzip",
"cache-control": "age: 3600",
"content-disposition": "dummy",
"content-length": "10",
"content-type": "application/javascript",
"X-Amz-Meta-Hdr": "value",
"X-Amz-Meta-X_test_key": "value",
"X-Amz-Meta-X__test__key": "value",
"X-Amz-Meta-X-Test__key": "value",
"X-Amz-Meta-X-Amz-Key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"X-Amz-Meta-X-Amz-Matdesc": "{}",
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
}
// Only X-Amz-Meta- prefixed entries will be returned in
// Metadata (without the prefix!)
expectedHeaders := map[string]string{
"Hdr": "value",
"X__test__key": "value",
"X____test____key": "value",
"X_Test____key": "value",
"X_Amz_Key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"X_Amz_Matdesc": "{}",
"X_Amz_Iv": "eWmyryl8kq+EVnnsE7jpOg==",
}
meta, _, err := s3MetaToAzureProperties(minio.GlobalContext, headers)
if err != nil {
t.Fatalf("Test failed, with %s", err)
}
if !reflect.DeepEqual(map[string]string(meta), expectedHeaders) {
t.Fatalf("Test failed, expected %#v, got %#v", expectedHeaders, meta)
}
headers = map[string]string{
"invalid--meta": "value",
}
_, _, err = s3MetaToAzureProperties(minio.GlobalContext, headers)
if err != nil {
if _, ok := err.(minio.UnsupportedMetadata); !ok {
t.Fatalf("Test failed with unexpected error %s, expected UnsupportedMetadata", err)
}
}
headers = map[string]string{
"content-md5": "Dce7bmCX61zvxzP5QmfelQ==",
}
_, props, err := s3MetaToAzureProperties(minio.GlobalContext, headers)
if err != nil {
t.Fatalf("Test failed, with %s", err)
}
if base64.StdEncoding.EncodeToString(props.ContentMD5) != headers["content-md5"] {
t.Fatalf("Test failed, expected %s, got %s", headers["content-md5"], props.ContentMD5)
}
}
func TestAzurePropertiesToS3Meta(t *testing.T) {
// Just one testcase. Adding more test cases does not add value to the testcase
// as azureToS3Metadata() just adds a prefix.
metadata := map[string]string{
"first_name": "myname",
"x_test_key": "value",
"x_test__key": "value",
"x__test__key": "value",
"x____test____key": "value",
"x_amz_key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"x_amz_matdesc": "{}",
"x_amz_iv": "eWmyryl8kq+EVnnsE7jpOg==",
}
expectedMeta := map[string]string{
"X-Amz-Meta-First-Name": "myname",
"X-Amz-Meta-X-Test-Key": "value",
"X-Amz-Meta-X-Test_key": "value",
"X-Amz-Meta-X_test_key": "value",
"X-Amz-Meta-X__test__key": "value",
"X-Amz-Meta-X-Amz-Key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"X-Amz-Meta-X-Amz-Matdesc": "{}",
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
"Cache-Control": "max-age: 3600",
"Content-Disposition": "dummy",
"Content-Encoding": "gzip",
"Content-Length": "10",
"Content-MD5": base64.StdEncoding.EncodeToString([]byte("base64-md5")),
"Content-Type": "application/javascript",
}
actualMeta := azurePropertiesToS3Meta(metadata, azblob.BlobHTTPHeaders{
CacheControl: "max-age: 3600",
ContentDisposition: "dummy",
ContentEncoding: "gzip",
ContentMD5: []byte("base64-md5"),
ContentType: "application/javascript",
}, 10)
if !reflect.DeepEqual(actualMeta, expectedMeta) {
t.Fatalf("Test failed, expected %#v, got %#v", expectedMeta, actualMeta)
}
}
// Add tests for azure to object error (top level).
func TestAzureToObjectError(t *testing.T) {
testCases := []struct {
actualErr error
expectedErr error
bucket, object string
}{
{
nil, nil, "", "",
},
{
fmt.Errorf("Non azure error"),
fmt.Errorf("Non azure error"), "", "",
},
}
for i, testCase := range testCases {
if err := azureToObjectError(testCase.actualErr, testCase.bucket, testCase.object); err != nil {
if err.Error() != testCase.expectedErr.Error() {
t.Errorf("Test %d: Expected error %s, got %s", i+1, testCase.expectedErr, err)
}
} else {
if testCase.expectedErr != nil {
t.Errorf("Test %d expected an error but one was not produced", i+1)
}
}
}
}
// Add tests for azure to object error (internal).
func TestAzureCodesToObjectError(t *testing.T) {
testCases := []struct {
originalErr error
actualServiceCode string
actualStatusCode int
expectedErr error
bucket, object string
}{
{
nil, "ContainerAlreadyExists", 0,
minio.BucketExists{Bucket: "bucket"}, "bucket", "",
},
{
nil, "InvalidResourceName", 0,
minio.BucketNameInvalid{Bucket: "bucket."}, "bucket.", "",
},
{
nil, "RequestBodyTooLarge", 0,
minio.PartTooBig{}, "", "",
},
{
nil, "InvalidMetadata", 0,
minio.UnsupportedMetadata{}, "", "",
},
{
nil, "", http.StatusNotFound,
minio.ObjectNotFound{
Bucket: "bucket",
Object: "object",
}, "bucket", "object",
},
{
nil, "", http.StatusNotFound,
minio.BucketNotFound{Bucket: "bucket"}, "bucket", "",
},
{
nil, "", http.StatusBadRequest,
minio.BucketNameInvalid{Bucket: "bucket."}, "bucket.", "",
},
{
fmt.Errorf("unhandled azure error"), "", http.StatusForbidden,
fmt.Errorf("unhandled azure error"), "", "",
},
}
for i, testCase := range testCases {
if err := azureCodesToObjectError(testCase.originalErr, testCase.actualServiceCode, testCase.actualStatusCode, testCase.bucket, testCase.object); err != nil {
if err.Error() != testCase.expectedErr.Error() {
t.Errorf("Test %d: Expected error %s, got %s", i+1, testCase.expectedErr, err)
}
} else {
if testCase.expectedErr != nil {
t.Errorf("Test %d expected an error but one was not produced", i+1)
}
}
}
}
func TestAnonErrToObjectErr(t *testing.T) {
testCases := []struct {
name string
statusCode int
params []string
wantErr error
}{
{"ObjectNotFound",
http.StatusNotFound,
[]string{"testBucket", "testObject"},
minio.ObjectNotFound{Bucket: "testBucket", Object: "testObject"},
},
{"BucketNotFound",
http.StatusNotFound,
[]string{"testBucket", ""},
minio.BucketNotFound{Bucket: "testBucket"},
},
{"ObjectNameInvalid",
http.StatusBadRequest,
[]string{"testBucket", "testObject"},
minio.ObjectNameInvalid{Bucket: "testBucket", Object: "testObject"},
},
{"BucketNameInvalid",
http.StatusBadRequest,
[]string{"testBucket", ""},
minio.BucketNameInvalid{Bucket: "testBucket"},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if err := minio.AnonErrToObjectErr(test.statusCode, test.params...); !reflect.DeepEqual(err, test.wantErr) {
t.Errorf("anonErrToObjectErr() error = %v, wantErr %v", err, test.wantErr)
}
})
}
}
func TestCheckAzureUploadID(t *testing.T) {
invalidUploadIDs := []string{
"123456789abcdefg",
"hello world",
"0x1234567890",
"1234567890abcdef1234567890abcdef",
}
for _, uploadID := range invalidUploadIDs {
if err := checkAzureUploadID(minio.GlobalContext, uploadID); err == nil {
t.Fatalf("%s: expected: <error>, got: <nil>", uploadID)
}
}
validUploadIDs := []string{
"1234567890abcdef",
"1122334455667788",
}
for _, uploadID := range validUploadIDs {
if err := checkAzureUploadID(minio.GlobalContext, uploadID); err != nil {
t.Fatalf("%s: expected: <nil>, got: %s", uploadID, err)
}
}
}
func TestParsingUploadChunkSize(t *testing.T) {
key := "MINIO_AZURE_CHUNK_SIZE_MB"
invalidValues := []string{
"",
"0,3",
"100.1",
"-1",
}
for i, chunkValue := range invalidValues {
os.Setenv(key, chunkValue)
result := getUploadChunkSizeFromEnv(key, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte))
if result != azureDefaultUploadChunkSize {
t.Errorf("Test %d: expected: %d, got: %d", i+1, azureDefaultUploadChunkSize, result)
}
}
validValues := []string{
"1",
"1.25",
"50",
"99",
}
for i, chunkValue := range validValues {
os.Setenv(key, chunkValue)
result := getUploadChunkSizeFromEnv(key, strconv.Itoa(azureDefaultUploadChunkSize/humanize.MiByte))
if result == azureDefaultUploadChunkSize {
t.Errorf("Test %d: expected: %d, got: %d", i+1, azureDefaultUploadChunkSize, result)
}
}
}

View file

@ -1,39 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2017-2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gateway
import (
// Import all gateways please keep the order
// NAS
_ "github.com/minio/minio/cmd/gateway/nas"
// Azure
_ "github.com/minio/minio/cmd/gateway/azure"
// S3
_ "github.com/minio/minio/cmd/gateway/s3"
// HDFS
_ "github.com/minio/minio/cmd/gateway/hdfs"
// GCS (use only if you must, GCS already supports S3 API)
_ "github.com/minio/minio/cmd/gateway/gcs"
// gateway functionality is frozen, no new gateways are being implemented
// or considered for upstream inclusion at this point in time. if needed
// please keep a fork of the project.
)

File diff suppressed because it is too large Load diff

View file

@ -1,490 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2017 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gcs
import (
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"testing"
"time"
"cloud.google.com/go/storage"
"google.golang.org/api/googleapi"
miniogo "github.com/minio/minio-go/v6"
minio "github.com/minio/minio/cmd"
)
func TestToGCSPageToken(t *testing.T) {
testCases := []struct {
Name string
Token string
}{
{
Name: "A",
Token: "CgFB",
},
{
Name: "AAAAAAAAAA",
Token: "CgpBQUFBQUFBQUFB",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CmRBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CpEDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE=",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CpIDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CpMDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQ==",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CvQDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE=",
},
}
for i, testCase := range testCases {
if toGCSPageToken(testCase.Name) != testCase.Token {
t.Errorf("Test %d: Expected %s, got %s", i+1, toGCSPageToken(testCase.Name), testCase.Token)
}
}
}
// TestIsValidGCSProjectIDFormat tests isValidGCSProjectIDFormat
func TestValidGCSProjectIDFormat(t *testing.T) {
testCases := []struct {
ProjectID string
Valid bool
}{
{"", false},
{"a", false},
{"Abc", false},
{"1bcd", false},
// 5 chars
{"abcdb", false},
// 6 chars
{"abcdbz", true},
// 30 chars
{"project-id-1-project-id-more-1", true},
// 31 chars
{"project-id-1-project-id-more-11", false},
{"storage.googleapis.com", false},
{"http://storage.googleapis.com", false},
{"http://localhost:9000", false},
{"project-id-1", true},
{"project-id-1988832", true},
{"projectid1414", true},
}
for i, testCase := range testCases {
valid := isValidGCSProjectIDFormat(testCase.ProjectID)
if valid != testCase.Valid {
t.Errorf("Test %d: Expected %v, got %v", i+1, valid, testCase.Valid)
}
}
}
// Test for isGCSMarker.
func TestIsGCSMarker(t *testing.T) {
testCases := []struct {
marker string
expected bool
}{
{
marker: "{minio}gcs123",
expected: true,
},
{
marker: "{mini_no}tgcs123",
expected: false,
},
{
marker: "{minioagainnotgcs123",
expected: false,
},
{
marker: "obj1",
expected: false,
},
}
for i, tc := range testCases {
if actual := isGCSMarker(tc.marker); actual != tc.expected {
t.Errorf("Test %d: marker is %s, expected %v but got %v",
i+1, tc.marker, tc.expected, actual)
}
}
}
// Test for gcsMultipartMetaName.
func TestGCSMultipartMetaName(t *testing.T) {
uploadID := "a"
expected := path.Join(gcsMinioMultipartPathV1, uploadID, gcsMinioMultipartMeta)
got := gcsMultipartMetaName(uploadID)
if expected != got {
t.Errorf("expected: %s, got: %s", expected, got)
}
}
// Test for gcsMultipartDataName.
func TestGCSMultipartDataName(t *testing.T) {
var (
uploadID = "a"
etag = "b"
partNumber = 1
)
expected := path.Join(gcsMinioMultipartPathV1, uploadID, fmt.Sprintf("%05d.%s", partNumber, etag))
got := gcsMultipartDataName(uploadID, partNumber, etag)
if expected != got {
t.Errorf("expected: %s, got: %s", expected, got)
}
}
func TestFromMinioClientListBucketResultToV2Info(t *testing.T) {
listBucketResult := miniogo.ListBucketResult{
IsTruncated: false,
Marker: "testMarker",
NextMarker: "testMarker2",
CommonPrefixes: []miniogo.CommonPrefix{{Prefix: "one"}, {Prefix: "two"}},
Contents: []miniogo.ObjectInfo{{Key: "testobj", ContentType: ""}},
}
listBucketV2Info := minio.ListObjectsV2Info{
Prefixes: []string{"one", "two"},
Objects: []minio.ObjectInfo{{Name: "testobj", Bucket: "testbucket", UserDefined: map[string]string{"Content-Type": ""}}},
IsTruncated: false,
ContinuationToken: "testMarker",
NextContinuationToken: "testMarker2",
}
if got := minio.FromMinioClientListBucketResultToV2Info("testbucket", listBucketResult); !reflect.DeepEqual(got, listBucketV2Info) {
t.Errorf("fromMinioClientListBucketResultToV2Info() = %v, want %v", got, listBucketV2Info)
}
}
// Test for gcsParseProjectID
func TestGCSParseProjectID(t *testing.T) {
f, err := ioutil.TempFile("", "")
if err != nil {
t.Error(err)
return
}
defer os.Remove(f.Name())
defer f.Close()
contents := `
{
"type": "service_account",
"project_id": "miniotesting"
}
`
f.WriteString(contents)
projectID, err := gcsParseProjectID(f.Name())
if err != nil {
t.Fatal(err)
}
if projectID != "miniotesting" {
t.Errorf(`Expected projectID value to be "miniotesting"`)
}
if _, err = gcsParseProjectID("non-existent"); err == nil {
t.Errorf(`Expected to fail but succeeded reading "non-existent"`)
}
f.WriteString(`,}`)
if _, err := gcsParseProjectID(f.Name()); err == nil {
t.Errorf(`Expected to fail reading corrupted credentials file`)
}
}
func TestGCSToObjectError(t *testing.T) {
testCases := []struct {
params []string
gcsErr error
expectedErr error
}{
{
[]string{}, nil, nil,
},
{
[]string{}, fmt.Errorf("Not *Error"), fmt.Errorf("Not *Error"),
},
{
[]string{"bucket"},
fmt.Errorf("storage: bucket doesn't exist"),
minio.BucketNotFound{
Bucket: "bucket",
},
},
{
[]string{"bucket", "object"},
fmt.Errorf("storage: object doesn't exist"),
minio.ObjectNotFound{
Bucket: "bucket",
Object: "object",
},
},
{
[]string{"bucket", "object", "uploadID"},
fmt.Errorf("storage: object doesn't exist"),
minio.InvalidUploadID{
UploadID: "uploadID",
},
},
{
[]string{},
fmt.Errorf("Unknown error"),
fmt.Errorf("Unknown error"),
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Message: "No list of errors",
},
&googleapi.Error{
Message: "No list of errors",
},
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "conflict",
Message: "You already own this bucket. Please select another name.",
}},
},
minio.BucketAlreadyOwnedByYou{
Bucket: "bucket",
},
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "conflict",
Message: "Sorry, that name is not available. Please try a different one.",
}},
},
minio.BucketAlreadyExists{
Bucket: "bucket",
},
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "conflict",
}},
},
minio.BucketNotEmpty{Bucket: "bucket"},
},
{
[]string{"bucket"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "notFound",
}},
},
minio.BucketNotFound{
Bucket: "bucket",
},
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "notFound",
}},
},
minio.ObjectNotFound{
Bucket: "bucket",
Object: "object",
},
},
{
[]string{"bucket"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "invalid",
}},
},
minio.BucketNameInvalid{
Bucket: "bucket",
},
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "forbidden",
}},
},
minio.PrefixAccessDenied{
Bucket: "bucket",
Object: "object",
},
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "keyInvalid",
}},
},
minio.PrefixAccessDenied{
Bucket: "bucket",
Object: "object",
},
},
{
[]string{"bucket", "object"},
&googleapi.Error{
Errors: []googleapi.ErrorItem{{
Reason: "required",
}},
},
minio.PrefixAccessDenied{
Bucket: "bucket",
Object: "object",
},
},
}
for i, testCase := range testCases {
actualErr := gcsToObjectError(testCase.gcsErr, testCase.params...)
if actualErr != nil {
if actualErr.Error() != testCase.expectedErr.Error() {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.expectedErr, actualErr)
}
}
}
}
func TestS3MetaToGCSAttributes(t *testing.T) {
headers := map[string]string{
"accept-encoding": "gzip",
"content-encoding": "gzip",
"cache-control": "age: 3600",
"content-disposition": "dummy",
"content-type": "application/javascript",
"Content-Language": "en",
"X-Amz-Meta-Hdr": "value",
"X-Amz-Meta-X-Amz-Key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"X-Amz-Meta-X-Amz-Matdesc": "{}",
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
}
// Only X-Amz-Meta- prefixed entries will be returned in
// Metadata (without the prefix!)
expectedHeaders := map[string]string{
"x-goog-meta-Hdr": "value",
"x-goog-meta-X-Amz-Key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"x-goog-meta-X-Amz-Matdesc": "{}",
"x-goog-meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
}
attrs := storage.ObjectAttrs{}
applyMetadataToGCSAttrs(headers, &attrs)
if !reflect.DeepEqual(attrs.Metadata, expectedHeaders) {
t.Fatalf("Test failed, expected %#v, got %#v", expectedHeaders, attrs.Metadata)
}
if attrs.CacheControl != headers["cache-control"] {
t.Fatalf("Test failed with Cache-Control mistmatch, expected %s, got %s", headers["cache-control"], attrs.CacheControl)
}
if attrs.ContentDisposition != headers["content-disposition"] {
t.Fatalf("Test failed with Content-Disposition mistmatch, expected %s, got %s", headers["content-disposition"], attrs.ContentDisposition)
}
if attrs.ContentEncoding != headers["content-encoding"] {
t.Fatalf("Test failed with Content-Encoding mistmatch, expected %s, got %s", headers["content-encoding"], attrs.ContentEncoding)
}
if attrs.ContentLanguage != headers["Content-Language"] {
t.Fatalf("Test failed with Content-Language mistmatch, expected %s, got %s", headers["Content-Language"], attrs.ContentLanguage)
}
if attrs.ContentType != headers["content-type"] {
t.Fatalf("Test failed with Content-Type mistmatch, expected %s, got %s", headers["content-type"], attrs.ContentType)
}
}
func TestGCSAttrsToObjectInfo(t *testing.T) {
metadata := map[string]string{
"x-goog-meta-Hdr": "value",
"x-goog-meta-x_amz_key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"x-goog-meta-x-amz-matdesc": "{}",
"x-goog-meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
}
expectedMeta := map[string]string{
"X-Amz-Meta-Hdr": "value",
"X-Amz-Meta-X_amz_key": "hu3ZSqtqwn+aL4V2VhAeov4i+bG3KyCtRMSXQFRHXOk=",
"X-Amz-Meta-X-Amz-Matdesc": "{}",
"X-Amz-Meta-X-Amz-Iv": "eWmyryl8kq+EVnnsE7jpOg==",
"Cache-Control": "max-age: 3600",
"Content-Disposition": "dummy",
"Content-Encoding": "gzip",
"Content-Language": "en",
"Content-Type": "application/javascript",
}
attrs := storage.ObjectAttrs{
Name: "test-obj",
Bucket: "test-bucket",
Updated: time.Now(),
Size: 123,
CRC32C: 45312398,
CacheControl: "max-age: 3600",
ContentDisposition: "dummy",
ContentEncoding: "gzip",
ContentLanguage: "en",
ContentType: "application/javascript",
Metadata: metadata,
}
expectedETag := minio.ToS3ETag(fmt.Sprintf("%d", attrs.CRC32C))
objInfo := fromGCSAttrsToObjectInfo(&attrs)
if !reflect.DeepEqual(objInfo.UserDefined, expectedMeta) {
t.Fatalf("Test failed, expected %#v, got %#v", expectedMeta, objInfo.UserDefined)
}
if objInfo.Name != attrs.Name {
t.Fatalf("Test failed with Name mistmatch, expected %s, got %s", attrs.Name, objInfo.Name)
}
if objInfo.Bucket != attrs.Bucket {
t.Fatalf("Test failed with Bucket mistmatch, expected %s, got %s", attrs.Bucket, objInfo.Bucket)
}
if objInfo.ModTime != attrs.Updated {
t.Fatalf("Test failed with ModTime mistmatch, expected %s, got %s", attrs.Updated, objInfo.ModTime)
}
if objInfo.Size != attrs.Size {
t.Fatalf("Test failed with Size mistmatch, expected %d, got %d", attrs.Size, objInfo.Size)
}
if objInfo.ETag != expectedETag {
t.Fatalf("Test failed with ETag mistmatch, expected %s, got %s", expectedETag, objInfo.ETag)
}
}

View file

@ -1,67 +0,0 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hdfs
import (
"strings"
"github.com/minio/minio-go/v6/pkg/s3utils"
minio "github.com/minio/minio/cmd"
)
const (
// Minio meta bucket.
minioMetaBucket = ".minio.sys"
// Minio Tmp meta prefix.
minioMetaTmpBucket = minioMetaBucket + "/tmp"
// Minio reserved bucket name.
minioReservedBucket = "minio"
)
// Ignores all reserved bucket names or invalid bucket names.
func isReservedOrInvalidBucket(bucketEntry string, strict bool) bool {
bucketEntry = strings.TrimSuffix(bucketEntry, minio.SlashSeparator)
if strict {
if err := s3utils.CheckValidBucketNameStrict(bucketEntry); err != nil {
return true
}
} else {
if err := s3utils.CheckValidBucketName(bucketEntry); err != nil {
return true
}
}
return isMinioMetaBucket(bucketEntry) || isMinioReservedBucket(bucketEntry)
}
// Returns true if input bucket is a reserved minio meta bucket '.minio.sys'.
func isMinioMetaBucket(bucketName string) bool {
return bucketName == minioMetaBucket
}
// Returns true if input bucket is a reserved minio bucket 'minio'.
func isMinioReservedBucket(bucketName string) bool {
return bucketName == minioReservedBucket
}
// byBucketName is a collection satisfying sort.Interface.
type byBucketName []minio.BucketInfo
func (d byBucketName) Len() int { return len(d) }
func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name }

View file

@ -1,766 +0,0 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hdfs
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"os/user"
"path"
"sort"
"strings"
"syscall"
"time"
"github.com/colinmarc/hdfs/v2"
"github.com/colinmarc/hdfs/v2/hadoopconf"
"github.com/minio/cli"
"github.com/minio/minio-go/v6/pkg/s3utils"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/env"
xnet "github.com/minio/minio/pkg/net"
krb "gopkg.in/jcmturner/gokrb5.v7/client"
"gopkg.in/jcmturner/gokrb5.v7/config"
"gopkg.in/jcmturner/gokrb5.v7/credentials"
)
const (
hdfsBackend = "hdfs"
hdfsSeparator = minio.SlashSeparator
)
func init() {
const hdfsGatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} HDFS-NAMENODE [HDFS-NAMENODE...]
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
HDFS-NAMENODE:
HDFS namenode URI
EXAMPLES:
1. Start minio gateway server for HDFS backend
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accesskey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}secretkey
{{.Prompt}} {{.HelpName}} hdfs://namenode:8200
2. Start minio gateway server for HDFS with edge caching enabled
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accesskey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}secretkey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*,*.png"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_QUOTA{{.AssignmentOperator}}90
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85
{{.Prompt}} {{.HelpName}} hdfs://namenode:8200
`
minio.RegisterGatewayCommand(cli.Command{
Name: hdfsBackend,
Usage: "Hadoop Distributed File System (HDFS)",
Action: hdfsGatewayMain,
CustomHelpTemplate: hdfsGatewayTemplate,
HideHelpCommand: true,
})
}
// Handler for 'minio gateway hdfs' command line.
func hdfsGatewayMain(ctx *cli.Context) {
// Validate gateway arguments.
if ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, hdfsBackend, 1)
}
minio.StartGateway(ctx, &HDFS{args: ctx.Args()})
}
// HDFS implements Gateway.
type HDFS struct {
args []string
}
// Name implements Gateway interface.
func (g *HDFS) Name() string {
return hdfsBackend
}
func getKerberosClient() (*krb.Client, error) {
cfg, err := config.Load(env.Get("KRB5_CONFIG", "/etc/krb5.conf"))
if err != nil {
return nil, err
}
u, err := user.Current()
if err != nil {
return nil, err
}
// Determine the ccache location from the environment, falling back to the default location.
ccachePath := env.Get("KRB5CCNAME", fmt.Sprintf("/tmp/krb5cc_%s", u.Uid))
if strings.Contains(ccachePath, ":") {
if strings.HasPrefix(ccachePath, "FILE:") {
ccachePath = strings.TrimPrefix(ccachePath, "FILE:")
} else {
return nil, fmt.Errorf("unable to use kerberos ccache: %s", ccachePath)
}
}
ccache, err := credentials.LoadCCache(ccachePath)
if err != nil {
return nil, err
}
return krb.NewClientFromCCache(ccache, cfg)
}
// NewGatewayLayer returns hdfs gatewaylayer.
func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
dialFunc := (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext
hconfig, err := hadoopconf.LoadFromEnvironment()
if err != nil {
return nil, err
}
opts := hdfs.ClientOptionsFromConf(hconfig)
opts.NamenodeDialFunc = dialFunc
opts.DatanodeDialFunc = dialFunc
// Not addresses found, load it from command line.
if len(opts.Addresses) == 0 {
var addresses []string
for _, s := range g.args {
u, err := xnet.ParseURL(s)
if err != nil {
return nil, err
}
addresses = append(addresses, u.Host)
}
opts.Addresses = addresses
}
u, err := user.Current()
if err != nil {
return nil, fmt.Errorf("Unable to lookup local user: %s", err)
}
if opts.KerberosClient != nil {
opts.KerberosClient, err = getKerberosClient()
if err != nil {
return nil, fmt.Errorf("Unable to initialize kerberos client: %s", err)
}
} else {
opts.User = env.Get("HADOOP_USER_NAME", u.Username)
}
clnt, err := hdfs.NewClient(opts)
if err != nil {
return nil, err
}
if err = clnt.MkdirAll(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket), os.FileMode(0755)); err != nil {
return nil, err
}
return &hdfsObjects{clnt: clnt, listPool: minio.NewTreeWalkPool(time.Minute * 30)}, nil
}
// Production - hdfs gateway is production ready.
func (g *HDFS) Production() bool {
return true
}
func (n *hdfsObjects) Shutdown(ctx context.Context) error {
return n.clnt.Close()
}
func (n *hdfsObjects) StorageInfo(ctx context.Context, _ bool) (si minio.StorageInfo, errs []error) {
fsInfo, err := n.clnt.StatFs()
if err != nil {
return minio.StorageInfo{}, []error{err}
}
si.Used = []uint64{fsInfo.Used}
si.Backend.Type = minio.BackendGateway
si.Backend.GatewayOnline = true
return si, nil
}
// hdfsObjects implements gateway for Minio and S3 compatible object storage servers.
type hdfsObjects struct {
minio.GatewayUnsupported
clnt *hdfs.Client
listPool *minio.TreeWalkPool
}
func hdfsToObjectErr(ctx context.Context, err error, params ...string) error {
if err == nil {
return nil
}
bucket := ""
object := ""
uploadID := ""
switch len(params) {
case 3:
uploadID = params[2]
fallthrough
case 2:
object = params[1]
fallthrough
case 1:
bucket = params[0]
}
switch {
case os.IsNotExist(err):
if uploadID != "" {
return minio.InvalidUploadID{
UploadID: uploadID,
}
}
if object != "" {
return minio.ObjectNotFound{Bucket: bucket, Object: object}
}
return minio.BucketNotFound{Bucket: bucket}
case os.IsExist(err):
if object != "" {
return minio.PrefixAccessDenied{Bucket: bucket, Object: object}
}
return minio.BucketAlreadyOwnedByYou{Bucket: bucket}
case errors.Is(err, syscall.ENOTEMPTY):
if object != "" {
return minio.PrefixAccessDenied{Bucket: bucket, Object: object}
}
return minio.BucketNotEmpty{Bucket: bucket}
default:
logger.LogIf(ctx, err)
return err
}
}
// hdfsIsValidBucketName verifies whether a bucket name is valid.
func hdfsIsValidBucketName(bucket string) bool {
return s3utils.CheckValidBucketNameStrict(bucket) == nil
}
func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if !hdfsIsValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}
if forceDelete {
return hdfsToObjectErr(ctx, n.clnt.RemoveAll(minio.PathJoin(hdfsSeparator, bucket)), bucket)
}
return hdfsToObjectErr(ctx, n.clnt.Remove(minio.PathJoin(hdfsSeparator, bucket)), bucket)
}
func (n *hdfsObjects) MakeBucketWithLocation(ctx context.Context, bucket, location string, lockEnabled bool) error {
if lockEnabled {
return minio.NotImplemented{}
}
if !hdfsIsValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}
return hdfsToObjectErr(ctx, n.clnt.Mkdir(minio.PathJoin(hdfsSeparator, bucket), os.FileMode(0755)), bucket)
}
func (n *hdfsObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return bi, hdfsToObjectErr(ctx, err, bucket)
}
// As hdfs.Stat() doesn't carry anything other than ModTime(), use ModTime() as CreatedTime.
return minio.BucketInfo{
Name: bucket,
Created: fi.ModTime(),
}, nil
}
func (n *hdfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
entries, err := n.clnt.ReadDir(hdfsSeparator)
if err != nil {
logger.LogIf(ctx, err)
return nil, hdfsToObjectErr(ctx, err)
}
for _, entry := range entries {
// Ignore all reserved bucket names and invalid bucket names.
if isReservedOrInvalidBucket(entry.Name(), false) {
continue
}
buckets = append(buckets, minio.BucketInfo{
Name: entry.Name(),
// As hdfs.Stat() doesnt carry CreatedTime, use ModTime() as CreatedTime.
Created: entry.ModTime(),
})
}
// Sort bucket infos by bucket name.
sort.Sort(byBucketName(buckets))
return buckets, nil
}
func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
// listDir - lists all the entries at a given prefix and given entry in the prefix.
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string) {
f, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, prefixDir))
if err != nil {
if os.IsNotExist(err) {
err = nil
}
logger.LogIf(minio.GlobalContext, err)
return
}
defer f.Close()
fis, err := f.Readdir(0)
if err != nil {
logger.LogIf(minio.GlobalContext, err)
return
}
if len(fis) == 0 {
return true, nil
}
for _, fi := range fis {
if fi.IsDir() {
entries = append(entries, fi.Name()+hdfsSeparator)
} else {
entries = append(entries, fi.Name())
}
}
return false, minio.FilterMatchingPrefix(entries, prefixEntry)
}
// Return list factory instance.
return listDir
}
// ListObjects lists all blobs in HDFS bucket filtered by prefix.
func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
if _, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)); err != nil {
return loi, hdfsToObjectErr(ctx, err, bucket)
}
getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) {
fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket, entry))
if err != nil {
return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket, entry)
}
return minio.ObjectInfo{
Bucket: bucket,
Name: entry,
ModTime: fi.ModTime(),
Size: fi.Size(),
IsDir: fi.IsDir(),
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
}, nil
}
return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), getObjectInfo, getObjectInfo)
}
// deleteObject deletes a file path if its empty. If it's successfully deleted,
// it will recursively move up the tree, deleting empty parent directories
// until it finds one with files in it. Returns nil for a non-empty directory.
func (n *hdfsObjects) deleteObject(basePath, deletePath string) error {
if basePath == deletePath {
return nil
}
// Attempt to remove path.
if err := n.clnt.Remove(deletePath); err != nil {
if errors.Is(err, syscall.ENOTEMPTY) {
// Ignore errors if the directory is not empty. The server relies on
// this functionality, and sometimes uses recursion that should not
// error on parent directories.
return nil
}
return err
}
// Trailing slash is removed when found to ensure
// slashpath.Dir() to work as intended.
deletePath = strings.TrimSuffix(deletePath, hdfsSeparator)
deletePath = path.Dir(deletePath)
// Delete parent directory. Errors for parent directories shouldn't trickle down.
n.deleteObject(basePath, deletePath)
return nil
}
// ListObjectsV2 lists all blobs in HDFS bucket filtered by prefix
func (n *hdfsObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int,
fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) {
// fetchOwner is not supported and unused.
marker := continuationToken
if marker == "" {
marker = startAfter
}
resultV1, err := n.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return loi, err
}
return minio.ListObjectsV2Info{
Objects: resultV1.Objects,
Prefixes: resultV1.Prefixes,
ContinuationToken: continuationToken,
NextContinuationToken: resultV1.NextMarker,
IsTruncated: resultV1.IsTruncated,
}, nil
}
func (n *hdfsObjects) DeleteObject(ctx context.Context, bucket, object string) error {
return hdfsToObjectErr(ctx, n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), minio.PathJoin(hdfsSeparator, bucket, object)), bucket, object)
}
func (n *hdfsObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
errs := make([]error, len(objects))
for idx, object := range objects {
errs[idx] = n.DeleteObject(ctx, bucket, object)
}
return errs, nil
}
func (n *hdfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
objInfo, err := n.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
return nil, err
}
var startOffset, length int64
startOffset, length, err = rs.GetOffsetLength(objInfo.Size)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
go func() {
nerr := n.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts)
pw.CloseWithError(nerr)
}()
// Setup cleanup function to cause the above go-routine to
// exit in case of partial read
pipeCloser := func() { pr.Close() }
return minio.NewGetObjectReaderFromReader(pr, objInfo, opts, pipeCloser)
}
func (n *hdfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.ObjectInfo, error) {
cpSrcDstSame := minio.IsStringEqual(minio.PathJoin(hdfsSeparator, srcBucket, srcObject), minio.PathJoin(hdfsSeparator, dstBucket, dstObject))
if cpSrcDstSame {
return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
}
return n.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, minio.ObjectOptions{
ServerSideEncryption: dstOpts.ServerSideEncryption,
UserDefined: srcInfo.UserDefined,
})
}
func (n *hdfsObjects) GetObject(ctx context.Context, bucket, key string, startOffset, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
if _, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)); err != nil {
return hdfsToObjectErr(ctx, err, bucket)
}
rd, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, key))
if err != nil {
return hdfsToObjectErr(ctx, err, bucket, key)
}
defer rd.Close()
_, err = io.Copy(writer, io.NewSectionReader(rd, startOffset, length))
if err == io.ErrClosedPipe {
// hdfs library doesn't send EOF correctly, so io.Copy attempts
// to write which returns io.ErrClosedPipe - just ignore
// this for now.
err = nil
}
return hdfsToObjectErr(ctx, err, bucket, key)
}
func (n *hdfsObjects) isObjectDir(ctx context.Context, bucket, object string) bool {
f, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, object))
if err != nil {
if os.IsNotExist(err) {
return false
}
logger.LogIf(ctx, err)
return false
}
defer f.Close()
fis, err := f.Readdir(1)
if err != nil && err != io.EOF {
logger.LogIf(ctx, err)
return false
}
// Readdir returns an io.EOF when len(fis) == 0.
return len(fis) == 0
}
// GetObjectInfo reads object info and replies back ObjectInfo.
func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket)
}
if strings.HasSuffix(object, hdfsSeparator) && !n.isObjectDir(ctx, bucket, object) {
return objInfo, hdfsToObjectErr(ctx, os.ErrNotExist, bucket, object)
}
fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket, object))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: fi.ModTime(),
Size: fi.Size(),
IsDir: fi.IsDir(),
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
}, nil
}
func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket)
}
name := minio.PathJoin(hdfsSeparator, bucket, object)
// If its a directory create a prefix {
if strings.HasSuffix(object, hdfsSeparator) && r.Size() == 0 {
if err = n.clnt.MkdirAll(name, os.FileMode(0755)); err != nil {
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), name)
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
} else {
tmpname := minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, minio.MustGetUUID())
var w *hdfs.FileWriter
w, err = n.clnt.Create(tmpname)
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
defer n.deleteObject(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket), tmpname)
if _, err = io.Copy(w, r); err != nil {
w.Close()
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
dir := path.Dir(name)
if dir != "" {
if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil {
w.Close()
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir)
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
}
w.Close()
if err = n.clnt.Rename(tmpname, name); err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
}
fi, err := n.clnt.Stat(name)
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
ETag: r.MD5CurrentHexString(),
ModTime: fi.ModTime(),
Size: fi.Size(),
IsDir: fi.IsDir(),
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
}, nil
}
func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (uploadID string, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return uploadID, hdfsToObjectErr(ctx, err, bucket)
}
uploadID = minio.MustGetUUID()
if err = n.clnt.CreateEmptyFile(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)); err != nil {
return uploadID, hdfsToObjectErr(ctx, err, bucket)
}
return uploadID, nil
}
func (n *hdfsObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return lmi, hdfsToObjectErr(ctx, err, bucket)
}
// It's decided not to support List Multipart Uploads, hence returning empty result.
return lmi, nil
}
func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID))
if err != nil {
return hdfsToObjectErr(ctx, err, bucket, object, uploadID)
}
return nil
}
// GetMultipartInfo returns multipart info of the uploadId of the object
func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return result, hdfsToObjectErr(ctx, err, bucket)
}
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return result, err
}
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
return result, nil
}
func (n *hdfsObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return result, hdfsToObjectErr(ctx, err, bucket)
}
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return result, err
}
// It's decided not to support List parts, hence returning empty result.
return result, nil
}
func (n *hdfsObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int,
startOffset int64, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.PartInfo, error) {
return n.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
}
func (n *hdfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (info minio.PartInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return info, hdfsToObjectErr(ctx, err, bucket)
}
var w *hdfs.FileWriter
w, err = n.clnt.Append(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID))
if err != nil {
return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID)
}
defer w.Close()
_, err = io.Copy(w, r.Reader)
if err != nil {
return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID)
}
info.PartNumber = partID
info.ETag = r.MD5CurrentHexString()
info.LastModified = minio.UTCNow()
info.Size = r.Reader.Size()
return info, nil
}
func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, parts []minio.CompletePart, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket)
}
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return objInfo, err
}
name := minio.PathJoin(hdfsSeparator, bucket, object)
dir := path.Dir(name)
if dir != "" {
if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
}
err = n.clnt.Rename(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID), name)
// Object already exists is an error on HDFS
// remove it and then create it again.
if os.IsExist(err) {
if err = n.clnt.Remove(name); err != nil {
if dir != "" {
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir)
}
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
if err = n.clnt.Rename(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID), name); err != nil {
if dir != "" {
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir)
}
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
}
fi, err := n.clnt.Stat(name)
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := minio.ComputeCompleteMultipartMD5(parts)
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
ETag: s3MD5,
ModTime: fi.ModTime(),
Size: fi.Size(),
IsDir: fi.IsDir(),
AccTime: fi.(*hdfs.FileInfo).AccessTime(),
}, nil
}
func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
if err != nil {
return hdfsToObjectErr(ctx, err, bucket)
}
return hdfsToObjectErr(ctx, n.clnt.Remove(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)), bucket, object, uploadID)
}
// IsReady returns whether the layer is ready to take requests.
func (n *hdfsObjects) IsReady(ctx context.Context) bool {
si, _ := n.StorageInfo(ctx, false)
return si.Backend.GatewayOnline
}

View file

@ -1,132 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nas
import (
"context"
"github.com/minio/cli"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/pkg/auth"
)
const (
nasBackend = "nas"
)
func init() {
const nasGatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} PATH
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
PATH:
path to NAS mount point
EXAMPLES:
1. Start minio gateway server for NAS backend
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accesskey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}secretkey
{{.Prompt}} {{.HelpName}} /shared/nasvol
2. Start minio gateway server for NAS with edge caching enabled
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accesskey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}secretkey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*,*.png"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_QUOTA{{.AssignmentOperator}}90
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85
{{.Prompt}} {{.HelpName}} /shared/nasvol
`
minio.RegisterGatewayCommand(cli.Command{
Name: nasBackend,
Usage: "Network-attached storage (NAS)",
Action: nasGatewayMain,
CustomHelpTemplate: nasGatewayTemplate,
HideHelpCommand: true,
})
}
// Handler for 'minio gateway nas' command line.
func nasGatewayMain(ctx *cli.Context) {
// Validate gateway arguments.
if !ctx.Args().Present() || ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, nasBackend, 1)
}
minio.StartGateway(ctx, &NAS{ctx.Args().First()})
}
// NAS implements Gateway.
type NAS struct {
path string
}
// Name implements Gateway interface.
func (g *NAS) Name() string {
return nasBackend
}
// NewGatewayLayer returns nas gatewaylayer.
func (g *NAS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
var err error
newObject, err := minio.NewFSObjectLayer(g.path)
if err != nil {
return nil, err
}
return &nasObjects{newObject}, nil
}
// Production - nas gateway is production ready.
func (g *NAS) Production() bool {
return true
}
// IsListenBucketSupported returns whether listen bucket notification is applicable for this gateway.
func (n *nasObjects) IsListenBucketSupported() bool {
return false
}
func (n *nasObjects) StorageInfo(ctx context.Context, _ bool) (si minio.StorageInfo, _ []error) {
si, errs := n.ObjectLayer.StorageInfo(ctx, false)
si.Backend.GatewayOnline = si.Backend.Type == minio.BackendFS
si.Backend.Type = minio.BackendGateway
return si, errs
}
// nasObjects implements gateway for MinIO and S3 compatible object storage servers.
type nasObjects struct {
minio.ObjectLayer
}
// IsReady returns whether the layer is ready to take requests.
func (n *nasObjects) IsReady(ctx context.Context) bool {
si, _ := n.StorageInfo(ctx, false)
return si.Backend.GatewayOnline
}
func (n *nasObjects) IsTaggingSupported() bool {
return true
}

View file

@ -1,176 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"time"
jsoniter "github.com/json-iterator/go"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/hash"
)
var (
errGWMetaNotFound = errors.New("dare.meta file not found")
errGWMetaInvalidFormat = errors.New("dare.meta format is invalid")
)
// A gwMetaV1 represents `gw.json` metadata header.
type gwMetaV1 struct {
Version string `json:"version"` // Version of the current `gw.json`.
Format string `json:"format"` // Format of the current `gw.json`.
Stat minio.StatInfo `json:"stat"` // Stat of the current object `gw.json`.
ETag string `json:"etag"` // ETag of the current object
// Metadata map for current object `gw.json`.
Meta map[string]string `json:"meta,omitempty"`
// Captures all the individual object `gw.json`.
Parts []minio.ObjectPartInfo `json:"parts,omitempty"`
}
// Gateway metadata constants.
const (
// Gateway meta version.
gwMetaVersion = "1.0.0"
// Gateway meta version.
gwMetaVersion100 = "1.0.0"
// Gateway meta format string.
gwMetaFormat = "gw"
// Add new constants here.
)
// newGWMetaV1 - initializes new gwMetaV1, adds version.
func newGWMetaV1() (gwMeta gwMetaV1) {
gwMeta = gwMetaV1{}
gwMeta.Version = gwMetaVersion
gwMeta.Format = gwMetaFormat
return gwMeta
}
// IsValid - tells if the format is sane by validating the version
// string, format fields.
func (m gwMetaV1) IsValid() bool {
return ((m.Version == gwMetaVersion || m.Version == gwMetaVersion100) &&
m.Format == gwMetaFormat)
}
// Converts metadata to object info.
func (m gwMetaV1) ToObjectInfo(bucket, object string) minio.ObjectInfo {
filterKeys := append([]string{
"ETag",
"Content-Length",
"Last-Modified",
"Content-Type",
"Expires",
}, defaultFilterKeys...)
objInfo := minio.ObjectInfo{
IsDir: false,
Bucket: bucket,
Name: object,
Size: m.Stat.Size,
ModTime: m.Stat.ModTime,
ContentType: m.Meta["content-type"],
ContentEncoding: m.Meta["content-encoding"],
ETag: minio.CanonicalizeETag(m.ETag),
UserDefined: minio.CleanMinioInternalMetadataKeys(minio.CleanMetadataKeys(m.Meta, filterKeys...)),
Parts: m.Parts,
}
if sc, ok := m.Meta["x-amz-storage-class"]; ok {
objInfo.StorageClass = sc
}
var (
t time.Time
e error
)
if exp, ok := m.Meta["expires"]; ok {
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
objInfo.Expires = t.UTC()
}
}
// Success.
return objInfo
}
// ObjectToPartOffset - translate offset of an object to offset of its individual part.
func (m gwMetaV1) ObjectToPartOffset(ctx context.Context, offset int64) (partIndex int, partOffset int64, err error) {
if offset == 0 {
// Special case - if offset is 0, then partIndex and partOffset are always 0.
return 0, 0, nil
}
partOffset = offset
// Seek until object offset maps to a particular part offset.
for i, part := range m.Parts {
partIndex = i
// Offset is smaller than size we have reached the proper part offset.
if partOffset < part.Size {
return partIndex, partOffset, nil
}
// Continue to towards the next part.
partOffset -= part.Size
}
logger.LogIf(ctx, minio.InvalidRange{})
// Offset beyond the size of the object return InvalidRange.
return 0, 0, minio.InvalidRange{}
}
// Constructs GWMetaV1 using `jsoniter` lib to retrieve each field.
func gwMetaUnmarshalJSON(ctx context.Context, gwMetaBuf []byte) (gwMeta gwMetaV1, err error) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
err = json.Unmarshal(gwMetaBuf, &gwMeta)
return gwMeta, err
}
// readGWMeta reads `dare.meta` and returns back GW metadata structure.
func readGWMetadata(ctx context.Context, buf bytes.Buffer) (gwMeta gwMetaV1, err error) {
if buf.Len() == 0 {
return gwMetaV1{}, errGWMetaNotFound
}
gwMeta, err = gwMetaUnmarshalJSON(ctx, buf.Bytes())
if err != nil {
return gwMetaV1{}, err
}
if !gwMeta.IsValid() {
return gwMetaV1{}, errGWMetaInvalidFormat
}
// Return structured `dare.meta`.
return gwMeta, nil
}
// getGWMetadata - unmarshals dare.meta into a *minio.PutObjReader
func getGWMetadata(ctx context.Context, bucket, prefix string, gwMeta gwMetaV1) (*minio.PutObjReader, error) {
// Marshal json.
metadataBytes, err := json.Marshal(&gwMeta)
if err != nil {
logger.LogIf(ctx, err)
return nil, err
}
hashReader, err := hash.NewReader(bytes.NewReader(metadataBytes), int64(len(metadataBytes)), "", "", int64(len(metadataBytes)), false)
if err != nil {
return nil, err
}
return minio.NewPutObjReader(hashReader, nil, nil), nil
}

View file

@ -1,79 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"bytes"
"testing"
minio "github.com/minio/minio/cmd"
)
// Tests for GW metadata format validity.
func TestGWMetaFormatValid(t *testing.T) {
tests := []struct {
name int
version string
format string
want bool
}{
{1, "123", "fs", false},
{2, "123", gwMetaFormat, false},
{3, gwMetaVersion, "test", false},
{4, gwMetaVersion100, "hello", false},
{5, gwMetaVersion, gwMetaFormat, true},
{6, gwMetaVersion100, gwMetaFormat, true},
}
for _, tt := range tests {
m := newGWMetaV1()
m.Version = tt.version
m.Format = tt.format
if got := m.IsValid(); got != tt.want {
t.Errorf("Test %d: Expected %v but received %v", tt.name, got, tt.want)
}
}
}
// Tests for reading GW metadata info.
func TestReadGWMetadata(t *testing.T) {
tests := []struct {
metaStr string
pass bool
}{
{`{"version": "` + gwMetaVersion + `", "format":"` + gwMetaFormat + `", "stat": {"size": 132, "modTime": "2018-08-31T22:25:39.23626461Z" }}`, true},
{`{"version": "` + gwMetaVersion + `", "format":"` + gwMetaFormat + `", "stat": {"size": 132, "modTime": "0000-00-00T00:00:00.00000000Z" }}`, false},
{`{"version": "` + gwMetaVersion + `", "format":"` + gwMetaFormat + `", "stat": {"size": 5242880, "modTime": "2018-08-31T22:25:39.23626461Z" },"meta":{"content-type":"application/octet-stream","etag":"57c743902b2fc8eea6ba3bb4fc58c8e8"},"parts":[{"number":1,"name":"part.1","etag":"","size":5242880}]}`, true},
{`{"version": "` + gwMetaVersion + `", "format":"` + gwMetaFormat + `", "stat": {"size": 68190720, "modTime": "2018-08-31T22:25:39.23626461Z" },"meta":{"X-Minio-Internal-Encrypted-Multipart":"","X-Minio-Internal-Server-Side-Encryption-Iv":"kdbOcKdXD3Sew8tOiHe5eI9xkX1oQ2W9JURz0oslCZA=","X-Minio-Internal-Server-Side-Encryption-Seal-Algorithm":"DAREv2-HMAC-SHA256","X-Minio-Internal-Server-Side-Encryption-Sealed-Key":"IAAfAMfqKrxMXC9LuiI7ENP+p0xArepzAiIeB/MftFp7Xmq2OzDkKlmNbj5RKI89RrjiAbOVLSSEMvqQsrIrTQ==","content-type":"text/plain; charset=utf-8","etag":"2b137fa4ab80126af54623b010c98de6-2"},"parts":[{"number":1,"name":"part.1","etag":"c5cac075eefdab801a5198812f51b36e","size":67141632},{"number":2,"name":"part.2","etag":"ccdf4b774bc3be8eef9a8987309e8171","size":1049088}]}`, true},
{`{"version": "` + gwMetaVersion + `", "format":"` + gwMetaFormat + `", "stat": {"size": "68190720", "modTime": "2018-08-31T22:25:39.23626461Z" },"meta":{"X-Minio-Internal-Encrypted-Multipart":"","X-Minio-Internal-Server-Side-Encryption-Iv":"kdbOcKdXD3Sew8tOiHe5eI9xkX1oQ2W9JURz0oslCZA=","X-Minio-Internal-Server-Side-Encryption-Seal-Algorithm":"DAREv2-HMAC-SHA256","X-Minio-Internal-Server-Side-Encryption-Sealed-Key":"IAAfAMfqKrxMXC9LuiI7ENP+p0xArepzAiIeB/MftFp7Xmq2OzDkKlmNbj5RKI89RrjiAbOVLSSEMvqQsrIrTQ==","content-type":"text/plain; charset=utf-8","etag":"2b137fa4ab80126af54623b010c98de6-2"},"parts":"123"}`, false},
}
for i, tt := range tests {
buf := bytes.NewBufferString(tt.metaStr)
m, err := readGWMetadata(minio.GlobalContext, *buf)
if err != nil && tt.pass {
t.Errorf("Test %d: Expected parse gw metadata to succeed, but failed, %s", i+1, err)
}
if err == nil && !tt.pass {
t.Errorf("Test %d: Expected parse gw metadata to succeed, but failed", i+1)
}
if err == nil {
if m.Version != gwMetaVersion {
t.Errorf("Test %d: Expected version %s, but failed with %s", i+1, gwMetaVersion, m.Version)
}
}
}
}

View file

@ -1,793 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"bytes"
"context"
"io"
"net/http"
"path"
"strconv"
"strings"
"time"
"github.com/minio/minio-go/v6/pkg/encrypt"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/logger"
)
const (
// name of custom multipart metadata file for s3 backend.
gwdareMetaJSON string = "dare.meta"
// name of temporary per part metadata file
gwpartMetaJSON string = "part.meta"
// custom multipart files are stored under the defaultMinioGWPrefix
defaultMinioGWPrefix = ".minio"
defaultGWContentFileName = "data"
)
// s3EncObjects is a wrapper around s3Objects and implements gateway calls for
// custom large objects encrypted at the gateway
type s3EncObjects struct {
s3Objects
}
/*
NOTE:
Custom gateway encrypted objects are stored on backend as follows:
obj/.minio/data <= encrypted content
obj/.minio/dare.meta <= metadata
When a multipart upload operation is in progress, the metadata set during
NewMultipartUpload is stored in obj/.minio/uploadID/dare.meta and each
UploadPart operation saves additional state of the part's encrypted ETag and
encrypted size in obj/.minio/uploadID/part1/part.meta
All the part metadata and temp dare.meta are cleaned up when upload completes
*/
// ListObjects lists all blobs in S3 bucket filtered by prefix
func (l *s3EncObjects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, e error) {
var continuationToken, startAfter string
res, err := l.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, false, startAfter)
if err != nil {
return loi, err
}
loi.IsTruncated = res.IsTruncated
loi.NextMarker = res.NextContinuationToken
loi.Objects = res.Objects
loi.Prefixes = res.Prefixes
return loi, nil
}
// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
func (l *s3EncObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, e error) {
var objects []minio.ObjectInfo
var prefixes []string
var isTruncated bool
// filter out objects that contain a .minio prefix, but is not a dare.meta metadata file.
for {
loi, e = l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
if e != nil {
return loi, minio.ErrorRespToObjectError(e, bucket)
}
for _, obj := range loi.Objects {
startAfter = obj.Name
continuationToken = loi.NextContinuationToken
isTruncated = loi.IsTruncated
if !isGWObject(obj.Name) {
continue
}
// get objectname and ObjectInfo from the custom metadata file
if strings.HasSuffix(obj.Name, gwdareMetaJSON) {
objSlice := strings.Split(obj.Name, minio.SlashSeparator+defaultMinioGWPrefix)
gwMeta, e := l.getGWMetadata(ctx, bucket, getDareMetaPath(objSlice[0]))
if e != nil {
continue
}
oInfo := gwMeta.ToObjectInfo(bucket, objSlice[0])
objects = append(objects, oInfo)
} else {
objects = append(objects, obj)
}
if len(objects) > maxKeys {
break
}
}
for _, p := range loi.Prefixes {
objName := strings.TrimSuffix(p, minio.SlashSeparator)
gm, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(objName))
// if prefix is actually a custom multi-part object, append it to objects
if err == nil {
objects = append(objects, gm.ToObjectInfo(bucket, objName))
continue
}
isPrefix := l.isPrefix(ctx, bucket, p, fetchOwner, startAfter)
if isPrefix {
prefixes = append(prefixes, p)
}
}
if (len(objects) > maxKeys) || !loi.IsTruncated {
break
}
}
loi.IsTruncated = isTruncated
loi.ContinuationToken = continuationToken
loi.Objects = make([]minio.ObjectInfo, 0)
loi.Prefixes = make([]string, 0)
loi.Objects = append(loi.Objects, objects...)
for _, pfx := range prefixes {
if pfx != prefix {
loi.Prefixes = append(loi.Prefixes, pfx)
}
}
// Set continuation token if s3 returned truncated list
if isTruncated {
if len(objects) > 0 {
loi.NextContinuationToken = objects[len(objects)-1].Name
}
}
return loi, nil
}
// isGWObject returns true if it is a custom object
func isGWObject(objName string) bool {
isEncrypted := strings.Contains(objName, defaultMinioGWPrefix)
if !isEncrypted {
return true
}
// ignore temp part.meta files
if strings.Contains(objName, gwpartMetaJSON) {
return false
}
pfxSlice := strings.Split(objName, minio.SlashSeparator)
var i1, i2 int
for i := len(pfxSlice) - 1; i >= 0; i-- {
p := pfxSlice[i]
if p == defaultMinioGWPrefix {
i1 = i
}
if p == gwdareMetaJSON {
i2 = i
}
if i1 > 0 && i2 > 0 {
break
}
}
// incomplete uploads would have a uploadID between defaultMinioGWPrefix and gwdareMetaJSON
return i2 > 0 && i1 > 0 && i2-i1 == 1
}
// isPrefix returns true if prefix exists and is not an incomplete multipart upload entry
func (l *s3EncObjects) isPrefix(ctx context.Context, bucket, prefix string, fetchOwner bool, startAfter string) bool {
var continuationToken, delimiter string
for {
loi, e := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
if e != nil {
return false
}
for _, obj := range loi.Objects {
if isGWObject(obj.Name) {
return true
}
}
continuationToken = loi.NextContinuationToken
if !loi.IsTruncated {
break
}
}
return false
}
// GetObject reads an object from S3. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
func (l *s3EncObjects) GetObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
return l.getObject(ctx, bucket, key, startOffset, length, writer, etag, opts)
}
func (l *s3EncObjects) isGWEncrypted(ctx context.Context, bucket, object string) bool {
_, err := l.s3Objects.GetObjectInfo(ctx, bucket, getDareMetaPath(object), minio.ObjectOptions{})
return err == nil
}
// getDaremetadata fetches dare.meta from s3 backend and marshals into a structured format.
func (l *s3EncObjects) getGWMetadata(ctx context.Context, bucket, metaFileName string) (m gwMetaV1, err error) {
oi, err1 := l.s3Objects.GetObjectInfo(ctx, bucket, metaFileName, minio.ObjectOptions{})
if err1 != nil {
return m, err1
}
var buffer bytes.Buffer
err = l.s3Objects.GetObject(ctx, bucket, metaFileName, 0, oi.Size, &buffer, oi.ETag, minio.ObjectOptions{})
if err != nil {
return m, err
}
return readGWMetadata(ctx, buffer)
}
// writes dare metadata to the s3 backend
func (l *s3EncObjects) writeGWMetadata(ctx context.Context, bucket, metaFileName string, m gwMetaV1, o minio.ObjectOptions) error {
reader, err := getGWMetadata(ctx, bucket, metaFileName, m)
if err != nil {
logger.LogIf(ctx, err)
return err
}
_, err = l.s3Objects.PutObject(ctx, bucket, metaFileName, reader, o)
return err
}
// returns path of temporary metadata json file for the upload
func getTmpDareMetaPath(object, uploadID string) string {
return path.Join(getGWMetaPath(object), uploadID, gwdareMetaJSON)
}
// returns path of metadata json file for encrypted objects
func getDareMetaPath(object string) string {
return path.Join(getGWMetaPath(object), gwdareMetaJSON)
}
// returns path of temporary part metadata file for multipart uploads
func getPartMetaPath(object, uploadID string, partID int) string {
return path.Join(object, defaultMinioGWPrefix, uploadID, strconv.Itoa(partID), gwpartMetaJSON)
}
// deletes the custom dare metadata file saved at the backend
func (l *s3EncObjects) deleteGWMetadata(ctx context.Context, bucket, metaFileName string) error {
return l.s3Objects.DeleteObject(ctx, bucket, metaFileName)
}
func (l *s3EncObjects) getObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
var o minio.ObjectOptions
if minio.GlobalGatewaySSE.SSEC() {
o = opts
}
dmeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(key))
if err != nil {
// unencrypted content
return l.s3Objects.GetObject(ctx, bucket, key, startOffset, length, writer, etag, o)
}
if startOffset < 0 {
logger.LogIf(ctx, minio.InvalidRange{})
}
// For negative length read everything.
if length < 0 {
length = dmeta.Stat.Size - startOffset
}
// Reply back invalid range if the input offset and length fall out of range.
if startOffset > dmeta.Stat.Size || startOffset+length > dmeta.Stat.Size {
logger.LogIf(ctx, minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size})
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
}
// Get start part index and offset.
_, partOffset, err := dmeta.ObjectToPartOffset(ctx, startOffset)
if err != nil {
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
}
// Calculate endOffset according to length
endOffset := startOffset
if length > 0 {
endOffset += length - 1
}
// Get last part index to read given length.
if _, _, err := dmeta.ObjectToPartOffset(ctx, endOffset); err != nil {
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
}
return l.s3Objects.GetObject(ctx, bucket, key, partOffset, endOffset, writer, dmeta.ETag, o)
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (l *s3EncObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, o minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
var opts minio.ObjectOptions
if minio.GlobalGatewaySSE.SSEC() {
opts = o
}
objInfo, err := l.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
return l.s3Objects.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}
fn, off, length, err := minio.NewGetObjectReader(rs, objInfo, o)
if err != nil {
return nil, minio.ErrorRespToObjectError(err)
}
if l.isGWEncrypted(ctx, bucket, object) {
object = getGWContentPath(object)
}
pr, pw := io.Pipe()
go func() {
err := l.getObject(ctx, bucket, object, off, length, pw, objInfo.ETag, opts)
pw.CloseWithError(err)
}()
// Setup cleanup function to cause the above go-routine to
// exit in case of partial read
pipeCloser := func() { pr.Close() }
return fn(pr, h, o.CheckCopyPrecondFn, pipeCloser)
}
// GetObjectInfo reads object info and replies back ObjectInfo
// For custom gateway encrypted large objects, the ObjectInfo is retrieved from the dare.meta file.
func (l *s3EncObjects) GetObjectInfo(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
var opts minio.ObjectOptions
if minio.GlobalGatewaySSE.SSEC() {
opts = o
}
gwMeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object))
if err != nil {
return l.s3Objects.GetObjectInfo(ctx, bucket, object, opts)
}
return gwMeta.ToObjectInfo(bucket, object), nil
}
// CopyObject copies an object from source bucket to a destination bucket.
func (l *s3EncObjects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, s, d minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
cpSrcDstSame := strings.EqualFold(path.Join(srcBucket, srcObject), path.Join(dstBucket, dstObject))
if cpSrcDstSame {
var gwMeta gwMetaV1
if s.ServerSideEncryption != nil && d.ServerSideEncryption != nil &&
((s.ServerSideEncryption.Type() == encrypt.SSEC && d.ServerSideEncryption.Type() == encrypt.SSEC) ||
(s.ServerSideEncryption.Type() == encrypt.S3 && d.ServerSideEncryption.Type() == encrypt.S3)) {
gwMeta, err = l.getGWMetadata(ctx, srcBucket, getDareMetaPath(srcObject))
if err != nil {
return
}
header := make(http.Header)
if d.ServerSideEncryption != nil {
d.ServerSideEncryption.Marshal(header)
}
for k, v := range header {
srcInfo.UserDefined[k] = v[0]
}
gwMeta.Meta = srcInfo.UserDefined
if err = l.writeGWMetadata(ctx, dstBucket, getDareMetaPath(dstObject), gwMeta, minio.ObjectOptions{}); err != nil {
return objInfo, minio.ErrorRespToObjectError(err)
}
return gwMeta.ToObjectInfo(dstBucket, dstObject), nil
}
}
dstOpts := minio.ObjectOptions{ServerSideEncryption: d.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
return l.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, dstOpts)
}
// DeleteObject deletes a blob in bucket
// For custom gateway encrypted large objects, cleans up encrypted content and metadata files
// from the backend.
func (l *s3EncObjects) DeleteObject(ctx context.Context, bucket string, object string) error {
// Get dare meta json
if _, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object)); err != nil {
return l.s3Objects.DeleteObject(ctx, bucket, object)
}
// delete encrypted object
l.s3Objects.DeleteObject(ctx, bucket, getGWContentPath(object))
return l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
}
// ListMultipartUploads lists all multipart uploads.
func (l *s3EncObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, e error) {
lmi, e = l.s3Objects.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
if e != nil {
return
}
lmi.KeyMarker = strings.TrimSuffix(lmi.KeyMarker, getGWContentPath(minio.SlashSeparator))
lmi.NextKeyMarker = strings.TrimSuffix(lmi.NextKeyMarker, getGWContentPath(minio.SlashSeparator))
for i := range lmi.Uploads {
lmi.Uploads[i].Object = strings.TrimSuffix(lmi.Uploads[i].Object, getGWContentPath(minio.SlashSeparator))
}
return
}
// NewMultipartUpload uploads object in multiple parts
func (l *s3EncObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (uploadID string, err error) {
var sseOpts encrypt.ServerSide
if o.ServerSideEncryption == nil {
return l.s3Objects.NewMultipartUpload(ctx, bucket, object, minio.ObjectOptions{UserDefined: o.UserDefined})
}
// Decide if sse options needed to be passed to backend
if (minio.GlobalGatewaySSE.SSEC() && o.ServerSideEncryption.Type() == encrypt.SSEC) ||
(minio.GlobalGatewaySSE.SSES3() && o.ServerSideEncryption.Type() == encrypt.S3) {
sseOpts = o.ServerSideEncryption
}
uploadID, err = l.s3Objects.NewMultipartUpload(ctx, bucket, getGWContentPath(object), minio.ObjectOptions{ServerSideEncryption: sseOpts})
if err != nil {
return
}
// Create uploadID and write a temporary dare.meta object under object/uploadID prefix
gwmeta := newGWMetaV1()
gwmeta.Meta = o.UserDefined
gwmeta.Stat.ModTime = time.Now().UTC()
err = l.writeGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID), gwmeta, minio.ObjectOptions{})
if err != nil {
return uploadID, minio.ErrorRespToObjectError(err)
}
return uploadID, nil
}
// PutObject creates a new object with the incoming data,
func (l *s3EncObjects) PutObject(ctx context.Context, bucket string, object string, data *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
var sseOpts encrypt.ServerSide
// Decide if sse options needed to be passed to backend
if opts.ServerSideEncryption != nil &&
((minio.GlobalGatewaySSE.SSEC() && opts.ServerSideEncryption.Type() == encrypt.SSEC) ||
(minio.GlobalGatewaySSE.SSES3() && opts.ServerSideEncryption.Type() == encrypt.S3) ||
opts.ServerSideEncryption.Type() == encrypt.KMS) {
sseOpts = opts.ServerSideEncryption
}
if opts.ServerSideEncryption == nil {
defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
defer l.DeleteObject(ctx, bucket, getGWContentPath(object))
return l.s3Objects.PutObject(ctx, bucket, object, data, minio.ObjectOptions{UserDefined: opts.UserDefined})
}
oi, err := l.s3Objects.PutObject(ctx, bucket, getGWContentPath(object), data, minio.ObjectOptions{ServerSideEncryption: sseOpts})
if err != nil {
return objInfo, minio.ErrorRespToObjectError(err)
}
gwMeta := newGWMetaV1()
gwMeta.Meta = make(map[string]string)
for k, v := range opts.UserDefined {
gwMeta.Meta[k] = v
}
encMD5 := data.MD5CurrentHexString()
gwMeta.ETag = encMD5
gwMeta.Stat.Size = oi.Size
gwMeta.Stat.ModTime = time.Now().UTC()
if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
return objInfo, minio.ErrorRespToObjectError(err)
}
objInfo = gwMeta.ToObjectInfo(bucket, object)
// delete any unencrypted content of the same name created previously
l.s3Objects.DeleteObject(ctx, bucket, object)
return objInfo, nil
}
// PutObjectPart puts a part of object in bucket
func (l *s3EncObjects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, data *minio.PutObjReader, opts minio.ObjectOptions) (pi minio.PartInfo, e error) {
if opts.ServerSideEncryption == nil {
return l.s3Objects.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
}
var s3Opts minio.ObjectOptions
// for sse-s3 encryption options should not be passed to backend
if opts.ServerSideEncryption != nil && opts.ServerSideEncryption.Type() == encrypt.SSEC && minio.GlobalGatewaySSE.SSEC() {
s3Opts = opts
}
uploadPath := getTmpGWMetaPath(object, uploadID)
tmpDareMeta := path.Join(uploadPath, gwdareMetaJSON)
_, err := l.s3Objects.GetObjectInfo(ctx, bucket, tmpDareMeta, minio.ObjectOptions{})
if err != nil {
return pi, minio.InvalidUploadID{UploadID: uploadID}
}
pi, e = l.s3Objects.PutObjectPart(ctx, bucket, getGWContentPath(object), uploadID, partID, data, s3Opts)
if e != nil {
return
}
gwMeta := newGWMetaV1()
gwMeta.Parts = make([]minio.ObjectPartInfo, 1)
// Add incoming part.
gwMeta.Parts[0] = minio.ObjectPartInfo{
Number: partID,
ETag: pi.ETag,
Size: pi.Size,
}
gwMeta.ETag = data.MD5CurrentHexString() // encrypted ETag
gwMeta.Stat.Size = pi.Size
gwMeta.Stat.ModTime = pi.LastModified
if err = l.writeGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, partID), gwMeta, minio.ObjectOptions{}); err != nil {
return pi, minio.ErrorRespToObjectError(err)
}
return minio.PartInfo{
Size: gwMeta.Stat.Size,
ETag: minio.CanonicalizeETag(gwMeta.ETag),
LastModified: gwMeta.Stat.ModTime,
PartNumber: partID,
}, nil
}
// CopyObjectPart creates a part in a multipart upload by copying
// existing object or a part of it.
func (l *s3EncObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
partID int, startOffset, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (p minio.PartInfo, err error) {
return l.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
}
// GetMultipartInfo returns multipart info of the uploadId of the object
func (l *s3EncObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
// there could be situations of dare.meta getting corrupted by competing upload parts.
dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
if err != nil {
return l.s3Objects.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
}
result.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
return result, nil
}
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) {
// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
// there could be situations of dare.meta getting corrupted by competing upload parts.
dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
if err != nil {
return l.s3Objects.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
}
lpi, err = l.s3Objects.ListObjectParts(ctx, bucket, getGWContentPath(object), uploadID, partNumberMarker, maxParts, opts)
if err != nil {
return lpi, err
}
for i, part := range lpi.Parts {
partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
if err != nil || len(partMeta.Parts) == 0 {
return lpi, minio.InvalidPart{}
}
lpi.Parts[i].ETag = partMeta.ETag
}
lpi.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
lpi.Object = object
return lpi, nil
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
if _, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID)); err != nil {
return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID)
}
if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID); err != nil {
return err
}
uploadPrefix := getTmpGWMetaPath(object, uploadID)
var continuationToken, startAfter, delimiter string
for {
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
if err != nil {
return minio.InvalidUploadID{UploadID: uploadID}
}
for _, obj := range loi.Objects {
if err := l.s3Objects.DeleteObject(ctx, bucket, obj.Name); err != nil {
return minio.ErrorRespToObjectError(err)
}
startAfter = obj.Name
}
continuationToken = loi.NextContinuationToken
if !loi.IsTruncated {
break
}
}
return nil
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
func (l *s3EncObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {
tmpMeta, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
if err != nil {
oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
if e == nil {
// delete any encrypted version of object that might exist
defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
defer l.DeleteObject(ctx, bucket, getGWContentPath(object))
}
return oi, e
}
gwMeta := newGWMetaV1()
gwMeta.Meta = make(map[string]string)
for k, v := range tmpMeta.Meta {
gwMeta.Meta[k] = v
}
// Allocate parts similar to incoming slice.
gwMeta.Parts = make([]minio.ObjectPartInfo, len(uploadedParts))
bkUploadedParts := make([]minio.CompletePart, len(uploadedParts))
// Calculate full object size.
var objectSize int64
// Validate each part and then commit to disk.
for i, part := range uploadedParts {
partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
if err != nil || len(partMeta.Parts) == 0 {
return oi, minio.InvalidPart{}
}
bkUploadedParts[i] = minio.CompletePart{PartNumber: part.PartNumber, ETag: partMeta.Parts[0].ETag}
gwMeta.Parts[i] = partMeta.Parts[0]
objectSize += partMeta.Parts[0].Size
}
oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID, bkUploadedParts, opts)
if e != nil {
return oi, e
}
//delete any unencrypted version of object that might be on the backend
defer l.s3Objects.DeleteObject(ctx, bucket, object)
// Save the final object size and modtime.
gwMeta.Stat.Size = objectSize
gwMeta.Stat.ModTime = time.Now().UTC()
gwMeta.ETag = oi.ETag
if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
return oi, minio.ErrorRespToObjectError(err)
}
// Clean up any uploaded parts that are not being committed by this CompleteMultipart operation
var continuationToken, startAfter, delimiter string
uploadPrefix := getTmpGWMetaPath(object, uploadID)
done := false
for {
loi, lerr := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
if lerr != nil {
break
}
for _, obj := range loi.Objects {
if !strings.HasPrefix(obj.Name, uploadPrefix) {
done = true
break
}
startAfter = obj.Name
l.s3Objects.DeleteObject(ctx, bucket, obj.Name)
}
continuationToken = loi.NextContinuationToken
if !loi.IsTruncated || done {
break
}
}
return gwMeta.ToObjectInfo(bucket, object), nil
}
// getTmpGWMetaPath returns the prefix under which uploads in progress are stored on backend
func getTmpGWMetaPath(object, uploadID string) string {
return path.Join(object, defaultMinioGWPrefix, uploadID)
}
// getGWMetaPath returns the prefix under which custom object metadata and object are stored on backend after upload completes
func getGWMetaPath(object string) string {
return path.Join(object, defaultMinioGWPrefix)
}
// getGWContentPath returns the prefix under which custom object is stored on backend after upload completes
func getGWContentPath(object string) string {
return path.Join(object, defaultMinioGWPrefix, defaultGWContentFileName)
}
// Clean-up the stale incomplete encrypted multipart uploads. Should be run in a Go routine.
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
l.cleanupStaleEncMultipartUploadsOnGW(ctx, expiry)
}
}
}
// cleanupStaleMultipartUploads removes old custom encryption multipart uploads on backend
func (l *s3EncObjects) cleanupStaleEncMultipartUploadsOnGW(ctx context.Context, expiry time.Duration) {
for {
buckets, err := l.s3Objects.ListBuckets(ctx)
if err != nil {
break
}
for _, b := range buckets {
expParts := l.getStalePartsForBucket(ctx, b.Name, expiry)
for k := range expParts {
l.s3Objects.DeleteObject(ctx, b.Name, k)
}
}
}
}
func (l *s3EncObjects) getStalePartsForBucket(ctx context.Context, bucket string, expiry time.Duration) (expParts map[string]string) {
var prefix, continuationToken, delimiter, startAfter string
expParts = make(map[string]string)
now := time.Now()
for {
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
if err != nil {
break
}
for _, obj := range loi.Objects {
startAfter = obj.Name
if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
continue
}
if isGWObject(obj.Name) {
continue
}
// delete temporary part.meta or dare.meta files for incomplete uploads that are past expiry
if (strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON)) &&
now.Sub(obj.ModTime) > expiry {
expParts[obj.Name] = ""
}
}
continuationToken = loi.NextContinuationToken
if !loi.IsTruncated {
break
}
}
return
}
func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
var prefix, continuationToken, delimiter, startAfter string
expParts := make(map[string]string)
for {
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
if err != nil {
break
}
for _, obj := range loi.Objects {
startAfter = obj.Name
if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
return minio.BucketNotEmpty{}
}
if isGWObject(obj.Name) {
return minio.BucketNotEmpty{}
}
// delete temporary part.meta or dare.meta files for incomplete uploads
if strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON) {
expParts[obj.Name] = ""
}
}
continuationToken = loi.NextContinuationToken
if !loi.IsTruncated {
break
}
}
for k := range expParts {
l.s3Objects.DeleteObject(ctx, bucket, k)
}
err := l.Client.RemoveBucket(bucket)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket)
}
return nil
}

View file

@ -1,49 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
minio "github.com/minio/minio/cmd"
)
// List of header keys to be filtered, usually
// from all S3 API http responses.
var defaultFilterKeys = []string{
"Connection",
"Transfer-Encoding",
"Accept-Ranges",
"Date",
"Server",
"Vary",
"x-amz-bucket-region",
"x-amz-request-id",
"x-amz-id-2",
"Content-Security-Policy",
"X-Xss-Protection",
// Add new headers to be ignored.
}
// FromGatewayObjectPart converts ObjectInfo for custom part stored as object to PartInfo
func FromGatewayObjectPart(partID int, oi minio.ObjectInfo) (pi minio.PartInfo) {
return minio.PartInfo{
Size: oi.Size,
ETag: minio.CanonicalizeETag(oi.ETag),
LastModified: oi.ModTime,
PartNumber: partID,
}
}

View file

@ -1,760 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2017-2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"context"
"encoding/json"
"io"
"math/rand"
"net/http"
"net/url"
"strings"
"time"
"github.com/minio/cli"
miniogo "github.com/minio/minio-go/v6"
"github.com/minio/minio-go/v6/pkg/credentials"
"github.com/minio/minio-go/v6/pkg/tags"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio-go/v6/pkg/encrypt"
"github.com/minio/minio-go/v6/pkg/s3utils"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/policy"
)
const (
s3Backend = "s3"
)
func init() {
const s3GatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} [ENDPOINT]
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
ENDPOINT:
s3 server endpoint. Default ENDPOINT is https://s3.amazonaws.com
EXAMPLES:
1. Start minio gateway server for AWS S3 backend
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accesskey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}secretkey
{{.Prompt}} {{.HelpName}}
2. Start minio gateway server for AWS S3 backend with edge caching enabled
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accesskey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}secretkey
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*,*.png"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_QUOTA{{.AssignmentOperator}}90
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85
{{.Prompt}} {{.HelpName}}
`
minio.RegisterGatewayCommand(cli.Command{
Name: s3Backend,
Usage: "Amazon Simple Storage Service (S3)",
Action: s3GatewayMain,
CustomHelpTemplate: s3GatewayTemplate,
HideHelpCommand: true,
})
}
// Handler for 'minio gateway s3' command line.
func s3GatewayMain(ctx *cli.Context) {
args := ctx.Args()
if !ctx.Args().Present() {
args = cli.Args{"https://s3.amazonaws.com"}
}
serverAddr := ctx.GlobalString("address")
if serverAddr == "" || serverAddr == ":"+minio.GlobalMinioDefaultPort {
serverAddr = ctx.String("address")
}
// Validate gateway arguments.
logger.FatalIf(minio.ValidateGatewayArguments(serverAddr, args.First()), "Invalid argument")
// Start the gateway..
minio.StartGateway(ctx, &S3{args.First()})
}
// S3 implements Gateway.
type S3 struct {
host string
}
// Name implements Gateway interface.
func (g *S3) Name() string {
return s3Backend
}
const letterBytes = "abcdefghijklmnopqrstuvwxyz01234569"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
// randString generates random names and prepends them with a known prefix.
func randString(n int, src rand.Source, prefix string) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdxMax letters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return prefix + string(b[0:30-len(prefix)])
}
// Chains all credential types, in the following order:
// - AWS env vars (i.e. AWS_ACCESS_KEY_ID)
// - AWS creds file (i.e. AWS_SHARED_CREDENTIALS_FILE or ~/.aws/credentials)
// - Static credentials provided by user (i.e. MINIO_ACCESS_KEY)
var defaultProviders = []credentials.Provider{
&credentials.EnvAWS{},
&credentials.FileAWSCredentials{},
&credentials.EnvMinio{},
}
// Chains all credential types, in the following order:
// - AWS env vars (i.e. AWS_ACCESS_KEY_ID)
// - AWS creds file (i.e. AWS_SHARED_CREDENTIALS_FILE or ~/.aws/credentials)
// - IAM profile based credentials. (performs an HTTP
// call to a pre-defined endpoint, only valid inside
// configured ec2 instances)
var defaultAWSCredProviders = []credentials.Provider{
&credentials.EnvAWS{},
&credentials.FileAWSCredentials{},
&credentials.IAM{
Client: &http.Client{
Transport: minio.NewGatewayHTTPTransport(),
},
},
&credentials.EnvMinio{},
}
// newS3 - Initializes a new client by auto probing S3 server signature.
func newS3(urlStr string) (*miniogo.Core, error) {
if urlStr == "" {
urlStr = "https://s3.amazonaws.com"
}
u, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
// Override default params if the host is provided
endpoint, secure, err := minio.ParseGatewayEndpoint(urlStr)
if err != nil {
return nil, err
}
var creds *credentials.Credentials
if s3utils.IsAmazonEndpoint(*u) {
// If we see an Amazon S3 endpoint, then we use more ways to fetch backend credentials.
// Specifically IAM style rotating credentials are only supported with AWS S3 endpoint.
creds = credentials.NewChainCredentials(defaultAWSCredProviders)
} else {
creds = credentials.NewChainCredentials(defaultProviders)
}
options := miniogo.Options{
Creds: creds,
Secure: secure,
Region: s3utils.GetRegionFromURL(*u),
BucketLookup: miniogo.BucketLookupAuto,
}
clnt, err := miniogo.NewWithOptions(endpoint, &options)
if err != nil {
return nil, err
}
return &miniogo.Core{Client: clnt}, nil
}
// NewGatewayLayer returns s3 ObjectLayer.
func (g *S3) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
// creds are ignored here, since S3 gateway implements chaining
// all credentials.
clnt, err := newS3(g.host)
if err != nil {
return nil, err
}
metrics := minio.NewMetrics()
t := &minio.MetricsTransport{
Transport: minio.NewGatewayHTTPTransport(),
Metrics: metrics,
}
// Set custom transport
clnt.SetCustomTransport(t)
probeBucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "probe-bucket-sign-")
// Check if the provided keys are valid.
if _, err = clnt.BucketExists(probeBucketName); err != nil {
if miniogo.ToErrorResponse(err).Code != "AccessDenied" {
return nil, err
}
}
s := s3Objects{
Client: clnt,
Metrics: metrics,
HTTPClient: &http.Client{
Transport: t,
},
}
// Enables single encryption of KMS is configured.
if minio.GlobalKMS != nil {
encS := s3EncObjects{s}
// Start stale enc multipart uploads cleanup routine.
go encS.cleanupStaleEncMultipartUploads(minio.GlobalContext,
minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry)
return &encS, nil
}
return &s, nil
}
// Production - s3 gateway is production ready.
func (g *S3) Production() bool {
return true
}
// s3Objects implements gateway for MinIO and S3 compatible object storage servers.
type s3Objects struct {
minio.GatewayUnsupported
Client *miniogo.Core
HTTPClient *http.Client
Metrics *minio.Metrics
}
// GetMetrics returns this gateway's metrics
func (l *s3Objects) GetMetrics(ctx context.Context) (*minio.Metrics, error) {
return l.Metrics, nil
}
// Shutdown saves any gateway metadata to disk
// if necessary and reload upon next restart.
func (l *s3Objects) Shutdown(ctx context.Context) error {
return nil
}
// StorageInfo is not relevant to S3 backend.
func (l *s3Objects) StorageInfo(ctx context.Context, _ bool) (si minio.StorageInfo, _ []error) {
si.Backend.Type = minio.BackendGateway
si.Backend.GatewayOnline = minio.IsBackendOnline(ctx, l.HTTPClient, l.Client.EndpointURL().String())
return si, nil
}
// MakeBucket creates a new container on S3 backend.
func (l *s3Objects) MakeBucketWithLocation(ctx context.Context, bucket, location string, lockEnabled bool) error {
if lockEnabled {
return minio.NotImplemented{}
}
// Verify if bucket name is valid.
// We are using a separate helper function here to validate bucket
// names instead of IsValidBucketName() because there is a possibility
// that certains users might have buckets which are non-DNS compliant
// in us-east-1 and we might severely restrict them by not allowing
// access to these buckets.
// Ref - http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
if s3utils.CheckValidBucketName(bucket) != nil {
return minio.BucketNameInvalid{Bucket: bucket}
}
err := l.Client.MakeBucket(bucket, location)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket)
}
return err
}
// GetBucketInfo gets bucket metadata..
func (l *s3Objects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, e error) {
buckets, err := l.Client.ListBuckets()
if err != nil {
// Listbuckets may be disallowed, proceed to check if
// bucket indeed exists, if yes return success.
var ok bool
if ok, err = l.Client.BucketExists(bucket); err != nil {
return bi, minio.ErrorRespToObjectError(err, bucket)
}
if !ok {
return bi, minio.BucketNotFound{Bucket: bucket}
}
return minio.BucketInfo{
Name: bi.Name,
Created: time.Now().UTC(),
}, nil
}
for _, bi := range buckets {
if bi.Name != bucket {
continue
}
return minio.BucketInfo{
Name: bi.Name,
Created: bi.CreationDate,
}, nil
}
return bi, minio.BucketNotFound{Bucket: bucket}
}
// ListBuckets lists all S3 buckets
func (l *s3Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error) {
buckets, err := l.Client.ListBuckets()
if err != nil {
return nil, minio.ErrorRespToObjectError(err)
}
b := make([]minio.BucketInfo, len(buckets))
for i, bi := range buckets {
b[i] = minio.BucketInfo{
Name: bi.Name,
Created: bi.CreationDate,
}
}
return b, err
}
// DeleteBucket deletes a bucket on S3
func (l *s3Objects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
err := l.Client.RemoveBucket(bucket)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket)
}
return nil
}
// ListObjects lists all blobs in S3 bucket filtered by prefix
func (l *s3Objects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, e error) {
result, err := l.Client.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return loi, minio.ErrorRespToObjectError(err, bucket)
}
return minio.FromMinioClientListBucketResult(bucket, result), nil
}
// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
func (l *s3Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, e error) {
result, err := l.Client.ListObjectsV2(bucket, prefix, continuationToken, fetchOwner, delimiter, maxKeys, startAfter)
if err != nil {
return loi, minio.ErrorRespToObjectError(err, bucket)
}
return minio.FromMinioClientListBucketV2Result(bucket, result), nil
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (l *s3Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = l.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
return nil, minio.ErrorRespToObjectError(err, bucket, object)
}
var startOffset, length int64
startOffset, length, err = rs.GetOffsetLength(objInfo.Size)
if err != nil {
return nil, minio.ErrorRespToObjectError(err, bucket, object)
}
pr, pw := io.Pipe()
go func() {
err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts)
pw.CloseWithError(err)
}()
// Setup cleanup function to cause the above go-routine to
// exit in case of partial read
pipeCloser := func() { pr.Close() }
return minio.NewGetObjectReaderFromReader(pr, objInfo, opts, pipeCloser)
}
// GetObject reads an object from S3. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *s3Objects) GetObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, o minio.ObjectOptions) error {
if length < 0 && length != -1 {
return minio.ErrorRespToObjectError(minio.InvalidRange{}, bucket, key)
}
opts := miniogo.GetObjectOptions{}
opts.ServerSideEncryption = o.ServerSideEncryption
if startOffset >= 0 && length >= 0 {
if err := opts.SetRange(startOffset, startOffset+length-1); err != nil {
return minio.ErrorRespToObjectError(err, bucket, key)
}
}
object, _, _, err := l.Client.GetObject(bucket, key, opts)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket, key)
}
defer object.Close()
if _, err := io.Copy(writer, object); err != nil {
return minio.ErrorRespToObjectError(err, bucket, key)
}
return nil
}
// GetObjectInfo reads object info and replies back ObjectInfo
func (l *s3Objects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
oi, err := l.Client.StatObject(bucket, object, miniogo.StatObjectOptions{
GetObjectOptions: miniogo.GetObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
},
})
if err != nil {
return minio.ObjectInfo{}, minio.ErrorRespToObjectError(err, bucket, object)
}
return minio.FromMinioClientObjectInfo(bucket, oi), nil
}
// PutObject creates a new object with the incoming data,
func (l *s3Objects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
data := r.Reader
var tagMap map[string]string
if tagstr, ok := opts.UserDefined[xhttp.AmzObjectTagging]; ok && tagstr != "" {
tagObj, err := tags.ParseObjectTags(tagstr)
if err != nil {
return objInfo, minio.ErrorRespToObjectError(err, bucket, object)
}
tagMap = tagObj.ToMap()
delete(opts.UserDefined, xhttp.AmzObjectTagging)
}
putOpts := miniogo.PutObjectOptions{
UserMetadata: opts.UserDefined,
ServerSideEncryption: opts.ServerSideEncryption,
UserTags: tagMap,
}
oi, err := l.Client.PutObject(bucket, object, data, data.Size(), data.MD5Base64String(), data.SHA256HexString(), putOpts)
if err != nil {
return objInfo, minio.ErrorRespToObjectError(err, bucket, object)
}
// On success, populate the key & metadata so they are present in the notification
oi.Key = object
oi.Metadata = minio.ToMinioClientObjectInfoMetadata(opts.UserDefined)
return minio.FromMinioClientObjectInfo(bucket, oi), nil
}
// CopyObject copies an object from source bucket to a destination bucket.
func (l *s3Objects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
if srcOpts.CheckCopyPrecondFn != nil && srcOpts.CheckCopyPrecondFn(srcInfo, "") {
return minio.ObjectInfo{}, minio.PreConditionFailed{}
}
// Set this header such that following CopyObject() always sets the right metadata on the destination.
// metadata input is already a trickled down value from interpreting x-amz-metadata-directive at
// handler layer. So what we have right now is supposed to be applied on the destination object anyways.
// So preserve it by adding "REPLACE" directive to save all the metadata set by CopyObject API.
srcInfo.UserDefined["x-amz-metadata-directive"] = "REPLACE"
srcInfo.UserDefined["x-amz-copy-source-if-match"] = srcInfo.ETag
header := make(http.Header)
if srcOpts.ServerSideEncryption != nil {
encrypt.SSECopy(srcOpts.ServerSideEncryption).Marshal(header)
}
if dstOpts.ServerSideEncryption != nil {
dstOpts.ServerSideEncryption.Marshal(header)
}
for k, v := range header {
srcInfo.UserDefined[k] = v[0]
}
if _, err = l.Client.CopyObject(srcBucket, srcObject, dstBucket, dstObject, srcInfo.UserDefined); err != nil {
return objInfo, minio.ErrorRespToObjectError(err, srcBucket, srcObject)
}
return l.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts)
}
// DeleteObject deletes a blob in bucket
func (l *s3Objects) DeleteObject(ctx context.Context, bucket string, object string) error {
err := l.Client.RemoveObject(bucket, object)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket, object)
}
return nil
}
func (l *s3Objects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
errs := make([]error, len(objects))
for idx, object := range objects {
errs[idx] = l.DeleteObject(ctx, bucket, object)
}
return errs, nil
}
// ListMultipartUploads lists all multipart uploads.
func (l *s3Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, e error) {
result, err := l.Client.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
if err != nil {
return lmi, err
}
return minio.FromMinioClientListMultipartsInfo(result), nil
}
// NewMultipartUpload upload object in multiple parts
func (l *s3Objects) NewMultipartUpload(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (uploadID string, err error) {
var tagMap map[string]string
if tagStr, ok := o.UserDefined[xhttp.AmzObjectTagging]; ok {
tagObj, err := tags.Parse(tagStr, true)
if err != nil {
return uploadID, minio.ErrorRespToObjectError(err, bucket, object)
}
tagMap = tagObj.ToMap()
delete(o.UserDefined, xhttp.AmzObjectTagging)
}
// Create PutObject options
opts := miniogo.PutObjectOptions{
UserMetadata: o.UserDefined,
ServerSideEncryption: o.ServerSideEncryption,
UserTags: tagMap,
}
uploadID, err = l.Client.NewMultipartUpload(bucket, object, opts)
if err != nil {
return uploadID, minio.ErrorRespToObjectError(err, bucket, object)
}
return uploadID, nil
}
// PutObjectPart puts a part of object in bucket
func (l *s3Objects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (pi minio.PartInfo, e error) {
data := r.Reader
info, err := l.Client.PutObjectPart(bucket, object, uploadID, partID, data, data.Size(), data.MD5Base64String(), data.SHA256HexString(), opts.ServerSideEncryption)
if err != nil {
return pi, minio.ErrorRespToObjectError(err, bucket, object)
}
return minio.FromMinioClientObjectPart(info), nil
}
// CopyObjectPart creates a part in a multipart upload by copying
// existing object or a part of it.
func (l *s3Objects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
partID int, startOffset, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (p minio.PartInfo, err error) {
if srcOpts.CheckCopyPrecondFn != nil && srcOpts.CheckCopyPrecondFn(srcInfo, "") {
return minio.PartInfo{}, minio.PreConditionFailed{}
}
srcInfo.UserDefined = map[string]string{
"x-amz-copy-source-if-match": srcInfo.ETag,
}
header := make(http.Header)
if srcOpts.ServerSideEncryption != nil {
encrypt.SSECopy(srcOpts.ServerSideEncryption).Marshal(header)
}
if dstOpts.ServerSideEncryption != nil {
dstOpts.ServerSideEncryption.Marshal(header)
}
for k, v := range header {
srcInfo.UserDefined[k] = v[0]
}
completePart, err := l.Client.CopyObjectPart(srcBucket, srcObject, destBucket, destObject,
uploadID, partID, startOffset, length, srcInfo.UserDefined)
if err != nil {
return p, minio.ErrorRespToObjectError(err, srcBucket, srcObject)
}
p.PartNumber = completePart.PartNumber
p.ETag = completePart.ETag
return p, nil
}
// GetMultipartInfo returns multipart info of the uploadId of the object
func (l *s3Objects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
return result, nil
}
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *s3Objects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) {
result, err := l.Client.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil {
return lpi, err
}
lpi = minio.FromMinioClientListPartsInfo(result)
if lpi.IsTruncated && maxParts > len(lpi.Parts) {
partNumberMarker = lpi.NextPartNumberMarker
for {
result, err = l.Client.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil {
return lpi, err
}
nlpi := minio.FromMinioClientListPartsInfo(result)
partNumberMarker = nlpi.NextPartNumberMarker
lpi.Parts = append(lpi.Parts, nlpi.Parts...)
if !nlpi.IsTruncated {
break
}
}
}
return lpi, nil
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
err := l.Client.AbortMultipartUpload(bucket, object, uploadID)
return minio.ErrorRespToObjectError(err, bucket, object)
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
func (l *s3Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {
etag, err := l.Client.CompleteMultipartUpload(bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts))
if err != nil {
return oi, minio.ErrorRespToObjectError(err, bucket, object)
}
return minio.ObjectInfo{Bucket: bucket, Name: object, ETag: strings.Trim(etag, "\"")}, nil
}
// SetBucketPolicy sets policy on bucket
func (l *s3Objects) SetBucketPolicy(ctx context.Context, bucket string, bucketPolicy *policy.Policy) error {
data, err := json.Marshal(bucketPolicy)
if err != nil {
// This should not happen.
logger.LogIf(ctx, err)
return minio.ErrorRespToObjectError(err, bucket)
}
if err := l.Client.SetBucketPolicy(bucket, string(data)); err != nil {
return minio.ErrorRespToObjectError(err, bucket)
}
return nil
}
// GetBucketPolicy will get policy on bucket
func (l *s3Objects) GetBucketPolicy(ctx context.Context, bucket string) (*policy.Policy, error) {
data, err := l.Client.GetBucketPolicy(bucket)
if err != nil {
return nil, minio.ErrorRespToObjectError(err, bucket)
}
bucketPolicy, err := policy.ParseConfig(strings.NewReader(data), bucket)
return bucketPolicy, minio.ErrorRespToObjectError(err, bucket)
}
// DeleteBucketPolicy deletes all policies on bucket
func (l *s3Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error {
if err := l.Client.SetBucketPolicy(bucket, ""); err != nil {
return minio.ErrorRespToObjectError(err, bucket, "")
}
return nil
}
// GetObjectTags gets the tags set on the object
func (l *s3Objects) GetObjectTags(ctx context.Context, bucket string, object string) (*tags.Tags, error) {
var err error
var tagObj *tags.Tags
var tagStr string
var opts minio.ObjectOptions
if _, err = l.GetObjectInfo(ctx, bucket, object, opts); err != nil {
return nil, minio.ErrorRespToObjectError(err, bucket, object)
}
if tagStr, err = l.Client.GetObjectTagging(bucket, object); err != nil {
return nil, minio.ErrorRespToObjectError(err, bucket, object)
}
if tagObj, err = tags.ParseObjectXML(strings.NewReader(tagStr)); err != nil {
return nil, minio.ErrorRespToObjectError(err, bucket, object)
}
return tagObj, err
}
// PutObjectTags attaches the tags to the object
func (l *s3Objects) PutObjectTags(ctx context.Context, bucket, object string, tagStr string) error {
tagObj, err := tags.Parse(tagStr, true)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket, object)
}
if err = l.Client.PutObjectTagging(bucket, object, tagObj.ToMap()); err != nil {
return minio.ErrorRespToObjectError(err, bucket, object)
}
return nil
}
// DeleteObjectTags removes the tags attached to the object
func (l *s3Objects) DeleteObjectTags(ctx context.Context, bucket, object string) error {
if err := l.Client.RemoveObjectTagging(bucket, object); err != nil {
return minio.ErrorRespToObjectError(err, bucket, object)
}
return nil
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (l *s3Objects) IsCompressionSupported() bool {
return false
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
func (l *s3Objects) IsEncryptionSupported() bool {
return minio.GlobalKMS != nil || len(minio.GlobalGatewaySSE) > 0
}
// IsReady returns whether the layer is ready to take requests.
func (l *s3Objects) IsReady(ctx context.Context) bool {
return minio.IsBackendOnline(ctx, l.HTTPClient, l.Client.EndpointURL().String())
}
func (l *s3Objects) IsTaggingSupported() bool {
return true
}

View file

@ -1,124 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2017 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3
import (
"fmt"
"testing"
miniogo "github.com/minio/minio-go/v6"
"github.com/minio/minio/pkg/hash"
minio "github.com/minio/minio/cmd"
)
func errResponse(code string) miniogo.ErrorResponse {
return miniogo.ErrorResponse{
Code: code,
}
}
func TestS3ToObjectError(t *testing.T) {
testCases := []struct {
inputErr error
expectedErr error
bucket, object string
}{
{
inputErr: errResponse("BucketAlreadyOwnedByYou"),
expectedErr: minio.BucketAlreadyOwnedByYou{},
},
{
inputErr: errResponse("BucketNotEmpty"),
expectedErr: minio.BucketNotEmpty{},
},
{
inputErr: errResponse("InvalidBucketName"),
expectedErr: minio.BucketNameInvalid{},
},
{
inputErr: errResponse("InvalidPart"),
expectedErr: minio.InvalidPart{},
},
{
inputErr: errResponse("NoSuchBucketPolicy"),
expectedErr: minio.BucketPolicyNotFound{},
},
{
inputErr: errResponse("NoSuchBucket"),
expectedErr: minio.BucketNotFound{},
},
// with empty Object in miniogo.ErrorRepsonse, NoSuchKey
// is interpreted as BucketNotFound
{
inputErr: errResponse("NoSuchKey"),
expectedErr: minio.BucketNotFound{},
},
{
inputErr: errResponse("NoSuchUpload"),
expectedErr: minio.InvalidUploadID{},
},
{
inputErr: errResponse("XMinioInvalidObjectName"),
expectedErr: minio.ObjectNameInvalid{},
},
{
inputErr: errResponse("AccessDenied"),
expectedErr: minio.PrefixAccessDenied{},
},
{
inputErr: errResponse("XAmzContentSHA256Mismatch"),
expectedErr: hash.SHA256Mismatch{},
},
{
inputErr: errResponse("EntityTooSmall"),
expectedErr: minio.PartTooSmall{},
},
{
inputErr: nil,
expectedErr: nil,
},
// Special test case for NoSuchKey with object name
{
inputErr: miniogo.ErrorResponse{
Code: "NoSuchKey",
},
expectedErr: minio.ObjectNotFound{
Bucket: "bucket",
Object: "object",
},
bucket: "bucket",
object: "object",
},
// N B error values that aren't of expected types
// should be left untouched.
// Special test case for error that is not of type
// miniogo.ErrorResponse
{
inputErr: fmt.Errorf("not a ErrorResponse"),
expectedErr: fmt.Errorf("not a ErrorResponse"),
},
}
for i, tc := range testCases {
actualErr := minio.ErrorRespToObjectError(tc.inputErr, tc.bucket, tc.object)
if actualErr != nil && tc.expectedErr != nil && actualErr.Error() != tc.expectedErr.Error() {
t.Errorf("Test case %d: Expected error %v but received error %v", i+1, tc.expectedErr, actualErr)
}
}
}

5
go.mod
View file

@ -3,9 +3,7 @@ module github.com/minio/minio
go 1.13
require (
cloud.google.com/go/storage v1.0.0
contrib.go.opencensus.io/exporter/ocagent v0.5.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.1
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Azure/go-autorest v11.7.1+incompatible // indirect
github.com/Shopify/sarama v1.24.1
@ -15,7 +13,6 @@ require (
github.com/beevik/ntp v0.2.0
github.com/cespare/xxhash/v2 v2.1.1
github.com/cheggaaa/pb v1.0.28
github.com/colinmarc/hdfs/v2 v2.1.1
github.com/coredns/coredns v1.4.0
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
@ -109,7 +106,7 @@ require (
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
gopkg.in/ini.v1 v1.57.0 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.3.0
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
gopkg.in/ldap.v3 v3.0.3
gopkg.in/olivere/elastic.v5 v5.0.80
gopkg.in/yaml.v2 v2.2.8

7
go.sum
View file

@ -81,8 +81,6 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/colinmarc/hdfs/v2 v2.1.1 h1:x0hw/m+o3UE20Scso/KCkvYNc9Di39TBlCfGMkJ1/a0=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/coredns/coredns v1.4.0 h1:RubBkYmkByUqZWWkjRHvNLnUHgkRVqAWgSMmRFvpE1A=
github.com/coredns/coredns v1.4.0/go.mod h1:zASH/MVDgR6XZTbxvOnsZfffS+31vg6Ackf/wo1+AM0=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
@ -164,7 +162,6 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
@ -253,7 +250,6 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX
github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc=
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v0.0.0-20180228145832-27454136f036/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
@ -279,7 +275,6 @@ github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKe
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf h1:WfD7VjIE6z8dIvMsI4/s+1qr5EL+zoIGev1BQj1eoJ8=
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf/go.mod h1:hyb9oH7vZsitZCiBt0ZvifOrB+qc8PS5IiilCIb87rg=
github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
@ -440,7 +435,6 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
@ -587,7 +581,6 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
golang.org/x/arch v0.0.0-20190909030613-46d78d1859ac/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4=
golang.org/x/crypto v0.0.0-20180723164146-c126467f60eb/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181106171534-e4dc69e5b2fd/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=