Update minio-go

This commit is contained in:
Alexander Neumann 2016-08-31 18:08:43 +02:00
parent 769f06cea2
commit a6eda344a4
9 changed files with 387 additions and 193 deletions

2
vendor/manifest vendored
View file

@ -28,7 +28,7 @@
{
"importpath": "github.com/minio/minio-go",
"repository": "https://github.com/minio/minio-go",
"revision": "9e734013294ab153b0bdbe182738bcddd46f1947",
"revision": "b1674741d196d5d79486d7c1645ed6ded902b712",
"branch": "master"
},
{

View file

@ -233,3 +233,13 @@ func ErrNoSuchBucketPolicy(message string) error {
RequestID: "minio",
}
}
// ErrAPINotSupported - API not supported response
// The specified API call is not supported
func ErrAPINotSupported(message string) error {
return ErrorResponse{
Code: "APINotSupported",
Message: message,
RequestID: "minio",
}
}

View file

@ -36,16 +36,13 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
return nil, err
}
// Start the request as soon Get is initiated.
httpReader, objectInfo, err := c.getObject(bucketName, objectName, 0, 0)
if err != nil {
return nil, err
}
var httpReader io.ReadCloser
var objectInfo ObjectInfo
var err error
// Create request channel.
reqCh := make(chan readRequest)
reqCh := make(chan getRequest)
// Create response channel.
resCh := make(chan readResponse)
resCh := make(chan getResponse)
// Create done channel.
doneCh := make(chan struct{})
@ -61,20 +58,84 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
case <-doneCh:
// Close the http response body before returning.
// This ends the connection with the server.
if httpReader != nil {
httpReader.Close()
}
return
// Request message.
// Gather incoming request.
case req := <-reqCh:
// If this is the first request we may not need to do a getObject request yet.
if req.isFirstReq {
// First request is a Read/ReadAt.
if req.isReadOp {
// Differentiate between wanting the whole object and just a range.
if req.isReadAt {
// If this is a ReadAt request only get the specified range.
// Range is set with respect to the offset and length of the buffer requested.
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
} else {
// First request is a Read request.
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
}
if err != nil {
resCh <- getResponse{
Error: err,
}
return
}
// Read at least firstReq.Buffer bytes, if not we have
// reached our EOF.
size, err := io.ReadFull(httpReader, req.Buffer)
if err == io.ErrUnexpectedEOF {
// If an EOF happens after reading some but not
// all the bytes ReadFull returns ErrUnexpectedEOF
err = io.EOF
}
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
Size: int(size),
Error: err,
didRead: true,
}
} else {
// First request is a Stat or Seek call.
// Only need to run a StatObject until an actual Read or ReadAt request comes through.
objectInfo, err = c.StatObject(bucketName, objectName)
if err != nil {
resCh <- getResponse{
Error: err,
}
// Exit the go-routine.
return
}
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
}
}
} else {
// Offset changes fetch the new object at an Offset.
if req.DidOffsetChange {
// Because the httpReader may not be set by the first
// request if it was a stat or seek it must be checked
// if the object has been read or not to only initialize
// new ones when they haven't been already.
// All readAt requests are new requests.
if req.DidOffsetChange || !req.beenRead {
if httpReader != nil {
// Close previously opened http reader.
httpReader.Close()
}
// Read from offset.
// If this request is a readAt only get the specified range.
if req.isReadAt {
// Range is set with respect to the offset and length of the buffer requested.
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
} else {
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, 0)
}
if err != nil {
resCh <- readResponse{
resCh <- getResponse{
Error: err,
}
return
@ -90,29 +151,38 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
err = io.EOF
}
// Reply back how much was read.
resCh <- readResponse{
resCh <- getResponse{
Size: int(size),
Error: err,
didRead: true,
}
}
}
}
}()
// Return the readerAt backed by routine.
return newObject(reqCh, resCh, doneCh, objectInfo), nil
// Create a newObject through the information sent back by reqCh.
return newObject(reqCh, resCh, doneCh), nil
}
// Read response message container to reply back for the request.
type readResponse struct {
Size int
Error error
}
// Read request message container to communicate with internal
// get request message container to communicate with internal
// go-routine.
type readRequest struct {
type getRequest struct {
Buffer []byte
Offset int64 // readAt offset.
DidOffsetChange bool
DidOffsetChange bool // Tracks the offset changes for Seek requests.
beenRead bool // Determines if this is the first time an object is being read.
isReadAt bool // Determines if this request is a request to a specific range
isReadOp bool // Determines if this request is a Read or Read/At request.
isFirstReq bool // Determines if this request is the first time an object is being accessed.
}
// get response message container to reply back for the request.
type getResponse struct {
Size int
Error error
didRead bool // Lets subsequent calls know whether or not httpReader has been initiated.
objectInfo ObjectInfo // Used for the first request.
}
// Object represents an open object. It implements Read, ReadAt,
@ -122,8 +192,8 @@ type Object struct {
mutex *sync.Mutex
// User allocated and defined.
reqCh chan<- readRequest
resCh <-chan readResponse
reqCh chan<- getRequest
resCh <-chan getResponse
doneCh chan<- struct{}
prevOffset int64
currOffset int64
@ -132,8 +202,54 @@ type Object struct {
// Keeps track of closed call.
isClosed bool
// Keeps track of if this is the first call.
isStarted bool
// Previous error saved for future calls.
prevErr error
// Keeps track of if this object has been read yet.
beenRead bool
}
// doGetRequest - sends and blocks on the firstReqCh and reqCh of an object.
// Returns back the size of the buffer read, if anything was read, as well
// as any error encountered. For all first requests sent on the object
// it is also responsible for sending back the objectInfo.
func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
o.reqCh <- request
response := <-o.resCh
// This was the first request.
if !o.isStarted {
// Set objectInfo for first time.
o.objectInfo = response.objectInfo
// The object has been operated on.
o.isStarted = true
}
// Set beenRead only if it has not been set before.
if !o.beenRead {
o.beenRead = response.didRead
}
// Return any error to the top level.
if response.Error != nil {
return response, response.Error
}
return response, nil
}
// setOffset - handles the setting of offsets for
// Read/ReadAt/Seek requests.
func (o *Object) setOffset(bytesRead int64) error {
// Update the currentOffset.
o.currOffset += bytesRead
// Save the current offset as previous offset.
o.prevOffset = o.currOffset
if o.currOffset >= o.objectInfo.Size {
return io.EOF
}
return nil
}
// Read reads up to len(p) bytes into p. It returns the number of
@ -152,16 +268,17 @@ func (o *Object) Read(b []byte) (n int, err error) {
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
// If current offset has reached Size limit, return EOF.
if o.currOffset >= o.objectInfo.Size {
return 0, io.EOF
// Create a new request.
readReq := getRequest{
isReadOp: true,
beenRead: o.beenRead,
Buffer: b,
}
// Send current information over control channel to indicate we are ready.
reqMsg := readRequest{}
// Send the pointer to the buffer over the channel.
reqMsg.Buffer = b
// Alert that this is the first request.
if !o.isStarted {
readReq.isFirstReq = true
}
// Verify if offset has changed and currOffset is greater than
// previous offset. Perhaps due to Seek().
@ -171,42 +288,32 @@ func (o *Object) Read(b []byte) (n int, err error) {
}
if offsetChange > 0 {
// Fetch the new reader at the current offset again.
reqMsg.Offset = o.currOffset
reqMsg.DidOffsetChange = true
readReq.Offset = o.currOffset
readReq.DidOffsetChange = true
} else {
// No offset changes no need to fetch new reader, continue
// reading.
reqMsg.DidOffsetChange = false
reqMsg.Offset = 0
readReq.DidOffsetChange = false
readReq.Offset = 0
}
// Send read request over the control channel.
o.reqCh <- reqMsg
// Get data over the response channel.
dataMsg := <-o.resCh
// Send and receive from the first request.
response, err := o.doGetRequest(readReq)
if err != nil {
// Save the error.
o.prevErr = err
return response.Size, err
}
// Bytes read.
bytesRead := int64(dataMsg.Size)
bytesRead := int64(response.Size)
// Update current offset.
o.currOffset += bytesRead
// Save the current offset as previous offset.
o.prevOffset = o.currOffset
if dataMsg.Error == nil {
// If currOffset read is equal to objectSize
// We have reached end of file, we return io.EOF.
if o.currOffset >= o.objectInfo.Size {
return dataMsg.Size, io.EOF
// Set the new offset.
err = o.setOffset(bytesRead)
if err != nil {
return response.Size, err
}
return dataMsg.Size, nil
}
// Save any error.
o.prevErr = dataMsg.Error
return dataMsg.Size, dataMsg.Error
return response.Size, nil
}
// Stat returns the ObjectInfo structure describing object.
@ -222,6 +329,22 @@ func (o *Object) Stat() (ObjectInfo, error) {
return ObjectInfo{}, o.prevErr
}
// This is the first request.
if !o.isStarted {
statReq := getRequest{
isReadOp: false, // This is a Stat not a Read/ReadAt.
Offset: 0,
isFirstReq: true,
}
// Send the request and get the response.
_, err := o.doGetRequest(statReq)
if err != nil {
o.prevErr = err
return ObjectInfo{}, err
}
}
return o.objectInfo, nil
}
@ -242,57 +365,46 @@ func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
// if offset is greater than or equal to object size we return io.EOF.
// If offset is negative then we return io.EOF.
if offset < 0 || offset >= o.objectInfo.Size {
// Can only compare offsets to size when size has been set.
if o.isStarted {
// If offset is negative than we return io.EOF.
// If offset is greater than or equal to object size we return io.EOF.
if offset >= o.objectInfo.Size || offset < 0 {
return 0, io.EOF
}
// Send current information over control channel to indicate we
// are ready.
reqMsg := readRequest{}
// Send the offset and pointer to the buffer over the channel.
reqMsg.Buffer = b
// For ReadAt offset always changes, minor optimization where
// offset same as currOffset we don't change the offset.
reqMsg.DidOffsetChange = offset != o.currOffset
if reqMsg.DidOffsetChange {
// Set new offset.
reqMsg.Offset = offset
// Save new offset as current offset.
o.currOffset = offset
}
// Send read request over the control channel.
o.reqCh <- reqMsg
// Get data over the response channel.
dataMsg := <-o.resCh
// Create the new readAt request.
readAtReq := getRequest{
isReadOp: true,
isReadAt: true,
DidOffsetChange: true, // Offset always changes.
beenRead: o.beenRead, // Set if this is the first request to try and read.
Offset: offset, // Set the offset.
Buffer: b,
}
// Alert that this is the first request.
if !o.isStarted {
readAtReq.isFirstReq = true
}
// Send and receive from the first request.
response, err := o.doGetRequest(readAtReq)
if err != nil {
// Save the error.
o.prevErr = err
return 0, err
}
// Bytes read.
bytesRead := int64(dataMsg.Size)
bytesRead := int64(response.Size)
// Update current offset.
o.currOffset += bytesRead
// Save current offset as previous offset before returning.
o.prevOffset = o.currOffset
if dataMsg.Error == nil {
// If currentOffset is equal to objectSize
// we have reached end of file, we return io.EOF.
if o.currOffset >= o.objectInfo.Size {
return dataMsg.Size, io.EOF
}
return dataMsg.Size, nil
// Update the offsets.
err = o.setOffset(bytesRead)
if err != nil {
return response.Size, err
}
// Save any error.
o.prevErr = dataMsg.Error
return dataMsg.Size, dataMsg.Error
return response.Size, nil
}
// Seek sets the offset for the next Read or Write to offset,
@ -325,6 +437,23 @@ func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d.", whence))
}
// This is the first request. So before anything else
// get the ObjectInfo.
if !o.isStarted {
// Create the new Seek request.
seekReq := getRequest{
isReadOp: false,
Offset: offset,
isFirstReq: true,
}
// Send and receive from the seek request.
_, err := o.doGetRequest(seekReq)
if err != nil {
// Save the error.
o.prevErr = err
return 0, err
}
}
// Save current offset as previous offset.
o.prevOffset = o.currOffset
@ -391,13 +520,13 @@ func (o *Object) Close() (err error) {
}
// newObject instantiates a new *minio.Object*
func newObject(reqCh chan<- readRequest, resCh <-chan readResponse, doneCh chan<- struct{}, objectInfo ObjectInfo) *Object {
// ObjectInfo will be set by setObjectInfo
func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
return &Object{
mutex: &sync.Mutex{},
reqCh: reqCh,
resCh: resCh,
doneCh: doneCh,
objectInfo: objectInfo,
}
}
@ -419,6 +548,7 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
customHeader := make(http.Header)
// Set ranges if length and offset are valid.
// See https://tools.ietf.org/html/rfc7233#section-3.1 for reference.
if length > 0 && offset >= 0 {
customHeader.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
} else if offset > 0 && length == 0 {

View file

@ -134,7 +134,15 @@ func (c Client) ListenBucketNotification(bucketName string, accountArn Arn, done
return
}
// Continuously run and listen on bucket notification.
// Check ARN partition to verify if listening bucket is supported
if accountArn.Partition != "minio" {
notificationInfoCh <- NotificationInfo{
Err: ErrAPINotSupported("Listening bucket notification is specific only to `minio` partitions"),
}
return
}
// Continously run and listen on bucket notification.
for {
urlValues := make(url.Values)
urlValues.Set("notificationARN", accountArn.String())

View file

@ -59,7 +59,7 @@ func TestMakeBucketErrorV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'eu-west-1'.
if err = c.MakeBucket(bucketName, "eu-west-1"); err != nil {
@ -105,7 +105,7 @@ func TestGetObjectClosedTwiceV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -122,7 +122,7 @@ func TestGetObjectClosedTwiceV2(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -190,7 +190,7 @@ func TestRemovePartiallyUploadedV2(t *testing.T) {
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -257,7 +257,7 @@ func TestResumablePutObjectV2(t *testing.T) {
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -368,7 +368,7 @@ func TestFPutObjectV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -516,7 +516,7 @@ func TestResumableFPutObjectV2(t *testing.T) {
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -593,7 +593,7 @@ func TestMakeBucketRegionsV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'eu-central-1'.
if err = c.MakeBucket(bucketName, "eu-west-1"); err != nil {
@ -644,7 +644,7 @@ func TestGetObjectReadSeekFunctionalV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -661,7 +661,7 @@ func TestGetObjectReadSeekFunctionalV2(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -782,7 +782,7 @@ func TestGetObjectReadAtFunctionalV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -799,7 +799,7 @@ func TestGetObjectReadAtFunctionalV2(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -923,7 +923,7 @@ func TestCopyObjectV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'us-east-1' (source bucket).
err = c.MakeBucket(bucketName, "us-east-1")
@ -946,7 +946,7 @@ func TestCopyObjectV2(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -1045,7 +1045,7 @@ func TestFunctionalV2(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -1054,7 +1054,7 @@ func TestFunctionalV2(t *testing.T) {
}
// Generate a random file name.
fileName := randString(60, rand.NewSource(time.Now().UnixNano()))
fileName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
file, err := os.Create(fileName)
if err != nil {
t.Fatal("Error:", err)

View file

@ -40,7 +40,8 @@ const (
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
func randString(n int, src rand.Source) string {
// randString generates random names and prepends them with a known prefix.
func randString(n int, src rand.Source, prefix string) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdxMax letters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
@ -54,7 +55,7 @@ func randString(n int, src rand.Source) string {
cache >>= letterIdxBits
remain--
}
return string(b[0:30])
return prefix + string(b[0:30-len(prefix)])
}
// Tests bucket re-create errors.
@ -84,7 +85,7 @@ func TestMakeBucketError(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'eu-central-1'.
if err = c.MakeBucket(bucketName, "eu-central-1"); err != nil {
@ -130,7 +131,7 @@ func TestMakeBucketRegions(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'eu-central-1'.
if err = c.MakeBucket(bucketName, "eu-central-1"); err != nil {
@ -181,7 +182,7 @@ func TestPutObjectReadAt(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -193,7 +194,7 @@ func TestPutObjectReadAt(t *testing.T) {
buf := make([]byte, minPartSize*4)
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -261,7 +262,7 @@ func TestListPartiallyUploaded(t *testing.T) {
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -337,7 +338,7 @@ func TestGetOjectSeekEnd(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -354,7 +355,7 @@ func TestGetOjectSeekEnd(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -438,7 +439,7 @@ func TestGetObjectClosedTwice(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -455,7 +456,7 @@ func TestGetObjectClosedTwice(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -523,7 +524,7 @@ func TestRemovePartiallyUploaded(t *testing.T) {
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -593,7 +594,7 @@ func TestResumablePutObject(t *testing.T) {
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -703,7 +704,7 @@ func TestResumableFPutObject(t *testing.T) {
// c.TraceOn(os.Stderr)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -783,7 +784,7 @@ func TestFPutObjectMultipart(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -862,7 +863,7 @@ func TestFPutObject(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -1010,7 +1011,7 @@ func TestGetObjectReadSeekFunctional(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -1027,7 +1028,7 @@ func TestGetObjectReadSeekFunctional(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -1148,7 +1149,7 @@ func TestGetObjectReadAtFunctional(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -1165,7 +1166,7 @@ func TestGetObjectReadAtFunctional(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -1289,7 +1290,7 @@ func TestPresignedPostPolicy(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'us-east-1' (source bucket).
err = c.MakeBucket(bucketName, "us-east-1")
@ -1306,7 +1307,7 @@ func TestPresignedPostPolicy(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -1389,7 +1390,7 @@ func TestCopyObject(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'us-east-1' (source bucket).
err = c.MakeBucket(bucketName, "us-east-1")
@ -1412,7 +1413,7 @@ func TestCopyObject(t *testing.T) {
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
@ -1575,6 +1576,13 @@ func TestBucketNotification(t *testing.T) {
bNotification := BucketNotification{}
bNotification.AddTopic(topicConfig)
// Add the same topicConfig again, should have no effect
// because it is duplicated
bNotification.AddTopic(topicConfig)
if len(bNotification.TopicConfigs) != 1 {
t.Fatal("Error: duplicated entry added")
}
// Add and remove a queue config
bNotification.AddQueue(queueConfig)
bNotification.RemoveQueueByArn(queueArn)
@ -1629,7 +1637,7 @@ func TestFunctional(t *testing.T) {
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
@ -1638,7 +1646,7 @@ func TestFunctional(t *testing.T) {
}
// Generate a random file name.
fileName := randString(60, rand.NewSource(time.Now().UnixNano()))
fileName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
file, err := os.Create(fileName)
if err != nil {
t.Fatal("Error:", err)

View file

@ -78,22 +78,41 @@ func TestGetReaderSize(t *testing.T) {
}
// Create request channel.
reqCh := make(chan readRequest)
reqCh := make(chan getRequest, 1)
// Create response channel.
resCh := make(chan readResponse)
resCh := make(chan getResponse, 1)
// Create done channel.
doneCh := make(chan struct{})
// objectInfo.
objectInfo := ObjectInfo{Size: 10}
objectReader := newObject(reqCh, resCh, doneCh, objectInfo)
defer objectReader.Close()
size, err = getReaderSize(objectReader)
objectInfo := ObjectInfo{Size: 10}
// Create the first request.
firstReq := getRequest{
isReadOp: false, // Perform only a HEAD object to get objectInfo.
isFirstReq: true,
}
// Create the expected response.
firstRes := getResponse{
objectInfo: objectInfo,
}
// Send the expected response.
resCh <- firstRes
// Test setting size on the first request.
objectReaderFirstReq := newObject(reqCh, resCh, doneCh)
defer objectReaderFirstReq.Close()
// Not checking the response here...just that the reader size is correct.
_, err = objectReaderFirstReq.doGetRequest(firstReq)
if err != nil {
t.Fatal("Error:", err)
}
// Validate that the reader size is the objectInfo size.
size, err = getReaderSize(objectReaderFirstReq)
if err != nil {
t.Fatal("Error:", err)
}
if size != int64(10) {
t.Fatalf("Reader length doesn't match got: %v, want: %v", size, 10)
t.Fatalf("Reader length doesn't match got: %d, wanted %d", size, objectInfo.Size)
}
fileReader, err := ioutil.TempFile(os.TempDir(), "prefix")

View file

@ -18,6 +18,7 @@ package minio
import (
"encoding/xml"
"reflect"
)
// NotificationEventType is a S3 notification event associated to the bucket notification configuration
@ -160,18 +161,36 @@ type BucketNotification struct {
// AddTopic adds a given topic config to the general bucket notification config
func (b *BucketNotification) AddTopic(topicConfig NotificationConfig) {
newTopicConfig := TopicConfig{NotificationConfig: topicConfig, Topic: topicConfig.Arn.String()}
for _, n := range b.TopicConfigs {
if reflect.DeepEqual(n, newTopicConfig) {
// Avoid adding duplicated entry
return
}
}
b.TopicConfigs = append(b.TopicConfigs, newTopicConfig)
}
// AddQueue adds a given queue config to the general bucket notification config
func (b *BucketNotification) AddQueue(queueConfig NotificationConfig) {
newQueueConfig := QueueConfig{NotificationConfig: queueConfig, Queue: queueConfig.Arn.String()}
for _, n := range b.QueueConfigs {
if reflect.DeepEqual(n, newQueueConfig) {
// Avoid adding duplicated entry
return
}
}
b.QueueConfigs = append(b.QueueConfigs, newQueueConfig)
}
// AddLambda adds a given lambda config to the general bucket notification config
func (b *BucketNotification) AddLambda(lambdaConfig NotificationConfig) {
newLambdaConfig := LambdaConfig{NotificationConfig: lambdaConfig, Lambda: lambdaConfig.Arn.String()}
for _, n := range b.LambdaConfigs {
if reflect.DeepEqual(n, newLambdaConfig) {
// Avoid adding duplicated entry
return
}
}
b.LambdaConfigs = append(b.LambdaConfigs, newLambdaConfig)
}

View file

@ -911,7 +911,7 @@ __Example__
```go
topicArn := NewArn("aws", "s3", "us-east-1", "804605494417", "PhotoUpdate")
topicArn := NewArn("aws", "sns", "us-east-1", "804605494417", "PhotoUpdate")
topicConfig := NewNotificationConfig(topicArn)
topicConfig.AddEvents(minio.ObjectCreatedAll, minio.ObjectRemovedAll)