forked from TrueCloudLab/restic
commit
5bc7f150f8
9 changed files with 387 additions and 193 deletions
2
vendor/manifest
vendored
2
vendor/manifest
vendored
|
@ -28,7 +28,7 @@
|
||||||
{
|
{
|
||||||
"importpath": "github.com/minio/minio-go",
|
"importpath": "github.com/minio/minio-go",
|
||||||
"repository": "https://github.com/minio/minio-go",
|
"repository": "https://github.com/minio/minio-go",
|
||||||
"revision": "9e734013294ab153b0bdbe182738bcddd46f1947",
|
"revision": "b1674741d196d5d79486d7c1645ed6ded902b712",
|
||||||
"branch": "master"
|
"branch": "master"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -233,3 +233,13 @@ func ErrNoSuchBucketPolicy(message string) error {
|
||||||
RequestID: "minio",
|
RequestID: "minio",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrAPINotSupported - API not supported response
|
||||||
|
// The specified API call is not supported
|
||||||
|
func ErrAPINotSupported(message string) error {
|
||||||
|
return ErrorResponse{
|
||||||
|
Code: "APINotSupported",
|
||||||
|
Message: message,
|
||||||
|
RequestID: "minio",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -36,16 +36,13 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the request as soon Get is initiated.
|
var httpReader io.ReadCloser
|
||||||
httpReader, objectInfo, err := c.getObject(bucketName, objectName, 0, 0)
|
var objectInfo ObjectInfo
|
||||||
if err != nil {
|
var err error
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create request channel.
|
// Create request channel.
|
||||||
reqCh := make(chan readRequest)
|
reqCh := make(chan getRequest)
|
||||||
// Create response channel.
|
// Create response channel.
|
||||||
resCh := make(chan readResponse)
|
resCh := make(chan getResponse)
|
||||||
// Create done channel.
|
// Create done channel.
|
||||||
doneCh := make(chan struct{})
|
doneCh := make(chan struct{})
|
||||||
|
|
||||||
|
@ -61,58 +58,131 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
// Close the http response body before returning.
|
// Close the http response body before returning.
|
||||||
// This ends the connection with the server.
|
// This ends the connection with the server.
|
||||||
httpReader.Close()
|
if httpReader != nil {
|
||||||
|
httpReader.Close()
|
||||||
|
}
|
||||||
return
|
return
|
||||||
// Request message.
|
|
||||||
case req := <-reqCh:
|
|
||||||
// Offset changes fetch the new object at an Offset.
|
|
||||||
if req.DidOffsetChange {
|
|
||||||
if httpReader != nil {
|
|
||||||
// Close previously opened http reader.
|
|
||||||
httpReader.Close()
|
|
||||||
}
|
|
||||||
// Read from offset.
|
|
||||||
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, 0)
|
|
||||||
if err != nil {
|
|
||||||
resCh <- readResponse{
|
|
||||||
Error: err,
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read at least req.Buffer bytes, if not we have
|
// Gather incoming request.
|
||||||
// reached our EOF.
|
case req := <-reqCh:
|
||||||
size, err := io.ReadFull(httpReader, req.Buffer)
|
// If this is the first request we may not need to do a getObject request yet.
|
||||||
if err == io.ErrUnexpectedEOF {
|
if req.isFirstReq {
|
||||||
// If an EOF happens after reading some but not
|
// First request is a Read/ReadAt.
|
||||||
// all the bytes ReadFull returns ErrUnexpectedEOF
|
if req.isReadOp {
|
||||||
err = io.EOF
|
// Differentiate between wanting the whole object and just a range.
|
||||||
}
|
if req.isReadAt {
|
||||||
// Reply back how much was read.
|
// If this is a ReadAt request only get the specified range.
|
||||||
resCh <- readResponse{
|
// Range is set with respect to the offset and length of the buffer requested.
|
||||||
Size: int(size),
|
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
|
||||||
Error: err,
|
} else {
|
||||||
|
// First request is a Read request.
|
||||||
|
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
resCh <- getResponse{
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Read at least firstReq.Buffer bytes, if not we have
|
||||||
|
// reached our EOF.
|
||||||
|
size, err := io.ReadFull(httpReader, req.Buffer)
|
||||||
|
if err == io.ErrUnexpectedEOF {
|
||||||
|
// If an EOF happens after reading some but not
|
||||||
|
// all the bytes ReadFull returns ErrUnexpectedEOF
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
// Send back the first response.
|
||||||
|
resCh <- getResponse{
|
||||||
|
objectInfo: objectInfo,
|
||||||
|
Size: int(size),
|
||||||
|
Error: err,
|
||||||
|
didRead: true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// First request is a Stat or Seek call.
|
||||||
|
// Only need to run a StatObject until an actual Read or ReadAt request comes through.
|
||||||
|
objectInfo, err = c.StatObject(bucketName, objectName)
|
||||||
|
if err != nil {
|
||||||
|
resCh <- getResponse{
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
// Exit the go-routine.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Send back the first response.
|
||||||
|
resCh <- getResponse{
|
||||||
|
objectInfo: objectInfo,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Offset changes fetch the new object at an Offset.
|
||||||
|
// Because the httpReader may not be set by the first
|
||||||
|
// request if it was a stat or seek it must be checked
|
||||||
|
// if the object has been read or not to only initialize
|
||||||
|
// new ones when they haven't been already.
|
||||||
|
// All readAt requests are new requests.
|
||||||
|
if req.DidOffsetChange || !req.beenRead {
|
||||||
|
if httpReader != nil {
|
||||||
|
// Close previously opened http reader.
|
||||||
|
httpReader.Close()
|
||||||
|
}
|
||||||
|
// If this request is a readAt only get the specified range.
|
||||||
|
if req.isReadAt {
|
||||||
|
// Range is set with respect to the offset and length of the buffer requested.
|
||||||
|
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
|
||||||
|
} else {
|
||||||
|
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, 0)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
resCh <- getResponse{
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read at least req.Buffer bytes, if not we have
|
||||||
|
// reached our EOF.
|
||||||
|
size, err := io.ReadFull(httpReader, req.Buffer)
|
||||||
|
if err == io.ErrUnexpectedEOF {
|
||||||
|
// If an EOF happens after reading some but not
|
||||||
|
// all the bytes ReadFull returns ErrUnexpectedEOF
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
// Reply back how much was read.
|
||||||
|
resCh <- getResponse{
|
||||||
|
Size: int(size),
|
||||||
|
Error: err,
|
||||||
|
didRead: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// Return the readerAt backed by routine.
|
|
||||||
return newObject(reqCh, resCh, doneCh, objectInfo), nil
|
// Create a newObject through the information sent back by reqCh.
|
||||||
|
return newObject(reqCh, resCh, doneCh), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read response message container to reply back for the request.
|
// get request message container to communicate with internal
|
||||||
type readResponse struct {
|
|
||||||
Size int
|
|
||||||
Error error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read request message container to communicate with internal
|
|
||||||
// go-routine.
|
// go-routine.
|
||||||
type readRequest struct {
|
type getRequest struct {
|
||||||
Buffer []byte
|
Buffer []byte
|
||||||
Offset int64 // readAt offset.
|
Offset int64 // readAt offset.
|
||||||
DidOffsetChange bool
|
DidOffsetChange bool // Tracks the offset changes for Seek requests.
|
||||||
|
beenRead bool // Determines if this is the first time an object is being read.
|
||||||
|
isReadAt bool // Determines if this request is a request to a specific range
|
||||||
|
isReadOp bool // Determines if this request is a Read or Read/At request.
|
||||||
|
isFirstReq bool // Determines if this request is the first time an object is being accessed.
|
||||||
|
}
|
||||||
|
|
||||||
|
// get response message container to reply back for the request.
|
||||||
|
type getResponse struct {
|
||||||
|
Size int
|
||||||
|
Error error
|
||||||
|
didRead bool // Lets subsequent calls know whether or not httpReader has been initiated.
|
||||||
|
objectInfo ObjectInfo // Used for the first request.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object represents an open object. It implements Read, ReadAt,
|
// Object represents an open object. It implements Read, ReadAt,
|
||||||
|
@ -122,8 +192,8 @@ type Object struct {
|
||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
|
|
||||||
// User allocated and defined.
|
// User allocated and defined.
|
||||||
reqCh chan<- readRequest
|
reqCh chan<- getRequest
|
||||||
resCh <-chan readResponse
|
resCh <-chan getResponse
|
||||||
doneCh chan<- struct{}
|
doneCh chan<- struct{}
|
||||||
prevOffset int64
|
prevOffset int64
|
||||||
currOffset int64
|
currOffset int64
|
||||||
|
@ -132,8 +202,54 @@ type Object struct {
|
||||||
// Keeps track of closed call.
|
// Keeps track of closed call.
|
||||||
isClosed bool
|
isClosed bool
|
||||||
|
|
||||||
|
// Keeps track of if this is the first call.
|
||||||
|
isStarted bool
|
||||||
|
|
||||||
// Previous error saved for future calls.
|
// Previous error saved for future calls.
|
||||||
prevErr error
|
prevErr error
|
||||||
|
|
||||||
|
// Keeps track of if this object has been read yet.
|
||||||
|
beenRead bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// doGetRequest - sends and blocks on the firstReqCh and reqCh of an object.
|
||||||
|
// Returns back the size of the buffer read, if anything was read, as well
|
||||||
|
// as any error encountered. For all first requests sent on the object
|
||||||
|
// it is also responsible for sending back the objectInfo.
|
||||||
|
func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
|
||||||
|
o.reqCh <- request
|
||||||
|
response := <-o.resCh
|
||||||
|
// This was the first request.
|
||||||
|
if !o.isStarted {
|
||||||
|
// Set objectInfo for first time.
|
||||||
|
o.objectInfo = response.objectInfo
|
||||||
|
// The object has been operated on.
|
||||||
|
o.isStarted = true
|
||||||
|
}
|
||||||
|
// Set beenRead only if it has not been set before.
|
||||||
|
if !o.beenRead {
|
||||||
|
o.beenRead = response.didRead
|
||||||
|
}
|
||||||
|
// Return any error to the top level.
|
||||||
|
if response.Error != nil {
|
||||||
|
return response, response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// setOffset - handles the setting of offsets for
|
||||||
|
// Read/ReadAt/Seek requests.
|
||||||
|
func (o *Object) setOffset(bytesRead int64) error {
|
||||||
|
// Update the currentOffset.
|
||||||
|
o.currOffset += bytesRead
|
||||||
|
// Save the current offset as previous offset.
|
||||||
|
o.prevOffset = o.currOffset
|
||||||
|
|
||||||
|
if o.currOffset >= o.objectInfo.Size {
|
||||||
|
return io.EOF
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read reads up to len(p) bytes into p. It returns the number of
|
// Read reads up to len(p) bytes into p. It returns the number of
|
||||||
|
@ -152,16 +268,17 @@ func (o *Object) Read(b []byte) (n int, err error) {
|
||||||
if o.prevErr != nil || o.isClosed {
|
if o.prevErr != nil || o.isClosed {
|
||||||
return 0, o.prevErr
|
return 0, o.prevErr
|
||||||
}
|
}
|
||||||
|
// Create a new request.
|
||||||
// If current offset has reached Size limit, return EOF.
|
readReq := getRequest{
|
||||||
if o.currOffset >= o.objectInfo.Size {
|
isReadOp: true,
|
||||||
return 0, io.EOF
|
beenRead: o.beenRead,
|
||||||
|
Buffer: b,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send current information over control channel to indicate we are ready.
|
// Alert that this is the first request.
|
||||||
reqMsg := readRequest{}
|
if !o.isStarted {
|
||||||
// Send the pointer to the buffer over the channel.
|
readReq.isFirstReq = true
|
||||||
reqMsg.Buffer = b
|
}
|
||||||
|
|
||||||
// Verify if offset has changed and currOffset is greater than
|
// Verify if offset has changed and currOffset is greater than
|
||||||
// previous offset. Perhaps due to Seek().
|
// previous offset. Perhaps due to Seek().
|
||||||
|
@ -171,42 +288,32 @@ func (o *Object) Read(b []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
if offsetChange > 0 {
|
if offsetChange > 0 {
|
||||||
// Fetch the new reader at the current offset again.
|
// Fetch the new reader at the current offset again.
|
||||||
reqMsg.Offset = o.currOffset
|
readReq.Offset = o.currOffset
|
||||||
reqMsg.DidOffsetChange = true
|
readReq.DidOffsetChange = true
|
||||||
} else {
|
} else {
|
||||||
// No offset changes no need to fetch new reader, continue
|
// No offset changes no need to fetch new reader, continue
|
||||||
// reading.
|
// reading.
|
||||||
reqMsg.DidOffsetChange = false
|
readReq.DidOffsetChange = false
|
||||||
reqMsg.Offset = 0
|
readReq.Offset = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send read request over the control channel.
|
// Send and receive from the first request.
|
||||||
o.reqCh <- reqMsg
|
response, err := o.doGetRequest(readReq)
|
||||||
|
if err != nil {
|
||||||
// Get data over the response channel.
|
// Save the error.
|
||||||
dataMsg := <-o.resCh
|
o.prevErr = err
|
||||||
|
return response.Size, err
|
||||||
|
}
|
||||||
|
|
||||||
// Bytes read.
|
// Bytes read.
|
||||||
bytesRead := int64(dataMsg.Size)
|
bytesRead := int64(response.Size)
|
||||||
|
|
||||||
// Update current offset.
|
// Set the new offset.
|
||||||
o.currOffset += bytesRead
|
err = o.setOffset(bytesRead)
|
||||||
|
if err != nil {
|
||||||
// Save the current offset as previous offset.
|
return response.Size, err
|
||||||
o.prevOffset = o.currOffset
|
|
||||||
|
|
||||||
if dataMsg.Error == nil {
|
|
||||||
// If currOffset read is equal to objectSize
|
|
||||||
// We have reached end of file, we return io.EOF.
|
|
||||||
if o.currOffset >= o.objectInfo.Size {
|
|
||||||
return dataMsg.Size, io.EOF
|
|
||||||
}
|
|
||||||
return dataMsg.Size, nil
|
|
||||||
}
|
}
|
||||||
|
return response.Size, nil
|
||||||
// Save any error.
|
|
||||||
o.prevErr = dataMsg.Error
|
|
||||||
return dataMsg.Size, dataMsg.Error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns the ObjectInfo structure describing object.
|
// Stat returns the ObjectInfo structure describing object.
|
||||||
|
@ -222,6 +329,22 @@ func (o *Object) Stat() (ObjectInfo, error) {
|
||||||
return ObjectInfo{}, o.prevErr
|
return ObjectInfo{}, o.prevErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is the first request.
|
||||||
|
if !o.isStarted {
|
||||||
|
statReq := getRequest{
|
||||||
|
isReadOp: false, // This is a Stat not a Read/ReadAt.
|
||||||
|
Offset: 0,
|
||||||
|
isFirstReq: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the request and get the response.
|
||||||
|
_, err := o.doGetRequest(statReq)
|
||||||
|
if err != nil {
|
||||||
|
o.prevErr = err
|
||||||
|
return ObjectInfo{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return o.objectInfo, nil
|
return o.objectInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,57 +365,46 @@ func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
|
||||||
if o.prevErr != nil || o.isClosed {
|
if o.prevErr != nil || o.isClosed {
|
||||||
return 0, o.prevErr
|
return 0, o.prevErr
|
||||||
}
|
}
|
||||||
|
// Can only compare offsets to size when size has been set.
|
||||||
// if offset is greater than or equal to object size we return io.EOF.
|
if o.isStarted {
|
||||||
// If offset is negative then we return io.EOF.
|
// If offset is negative than we return io.EOF.
|
||||||
if offset < 0 || offset >= o.objectInfo.Size {
|
// If offset is greater than or equal to object size we return io.EOF.
|
||||||
return 0, io.EOF
|
if offset >= o.objectInfo.Size || offset < 0 {
|
||||||
}
|
return 0, io.EOF
|
||||||
|
|
||||||
// Send current information over control channel to indicate we
|
|
||||||
// are ready.
|
|
||||||
reqMsg := readRequest{}
|
|
||||||
|
|
||||||
// Send the offset and pointer to the buffer over the channel.
|
|
||||||
reqMsg.Buffer = b
|
|
||||||
|
|
||||||
// For ReadAt offset always changes, minor optimization where
|
|
||||||
// offset same as currOffset we don't change the offset.
|
|
||||||
reqMsg.DidOffsetChange = offset != o.currOffset
|
|
||||||
if reqMsg.DidOffsetChange {
|
|
||||||
// Set new offset.
|
|
||||||
reqMsg.Offset = offset
|
|
||||||
// Save new offset as current offset.
|
|
||||||
o.currOffset = offset
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send read request over the control channel.
|
|
||||||
o.reqCh <- reqMsg
|
|
||||||
|
|
||||||
// Get data over the response channel.
|
|
||||||
dataMsg := <-o.resCh
|
|
||||||
|
|
||||||
// Bytes read.
|
|
||||||
bytesRead := int64(dataMsg.Size)
|
|
||||||
|
|
||||||
// Update current offset.
|
|
||||||
o.currOffset += bytesRead
|
|
||||||
|
|
||||||
// Save current offset as previous offset before returning.
|
|
||||||
o.prevOffset = o.currOffset
|
|
||||||
|
|
||||||
if dataMsg.Error == nil {
|
|
||||||
// If currentOffset is equal to objectSize
|
|
||||||
// we have reached end of file, we return io.EOF.
|
|
||||||
if o.currOffset >= o.objectInfo.Size {
|
|
||||||
return dataMsg.Size, io.EOF
|
|
||||||
}
|
}
|
||||||
return dataMsg.Size, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save any error.
|
// Create the new readAt request.
|
||||||
o.prevErr = dataMsg.Error
|
readAtReq := getRequest{
|
||||||
return dataMsg.Size, dataMsg.Error
|
isReadOp: true,
|
||||||
|
isReadAt: true,
|
||||||
|
DidOffsetChange: true, // Offset always changes.
|
||||||
|
beenRead: o.beenRead, // Set if this is the first request to try and read.
|
||||||
|
Offset: offset, // Set the offset.
|
||||||
|
Buffer: b,
|
||||||
|
}
|
||||||
|
// Alert that this is the first request.
|
||||||
|
if !o.isStarted {
|
||||||
|
readAtReq.isFirstReq = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send and receive from the first request.
|
||||||
|
response, err := o.doGetRequest(readAtReq)
|
||||||
|
if err != nil {
|
||||||
|
// Save the error.
|
||||||
|
o.prevErr = err
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
// Bytes read.
|
||||||
|
bytesRead := int64(response.Size)
|
||||||
|
|
||||||
|
// Update the offsets.
|
||||||
|
err = o.setOffset(bytesRead)
|
||||||
|
if err != nil {
|
||||||
|
return response.Size, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek sets the offset for the next Read or Write to offset,
|
// Seek sets the offset for the next Read or Write to offset,
|
||||||
|
@ -325,6 +437,23 @@ func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
|
||||||
return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d.", whence))
|
return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d.", whence))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is the first request. So before anything else
|
||||||
|
// get the ObjectInfo.
|
||||||
|
if !o.isStarted {
|
||||||
|
// Create the new Seek request.
|
||||||
|
seekReq := getRequest{
|
||||||
|
isReadOp: false,
|
||||||
|
Offset: offset,
|
||||||
|
isFirstReq: true,
|
||||||
|
}
|
||||||
|
// Send and receive from the seek request.
|
||||||
|
_, err := o.doGetRequest(seekReq)
|
||||||
|
if err != nil {
|
||||||
|
// Save the error.
|
||||||
|
o.prevErr = err
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
// Save current offset as previous offset.
|
// Save current offset as previous offset.
|
||||||
o.prevOffset = o.currOffset
|
o.prevOffset = o.currOffset
|
||||||
|
|
||||||
|
@ -391,13 +520,13 @@ func (o *Object) Close() (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newObject instantiates a new *minio.Object*
|
// newObject instantiates a new *minio.Object*
|
||||||
func newObject(reqCh chan<- readRequest, resCh <-chan readResponse, doneCh chan<- struct{}, objectInfo ObjectInfo) *Object {
|
// ObjectInfo will be set by setObjectInfo
|
||||||
|
func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
|
||||||
return &Object{
|
return &Object{
|
||||||
mutex: &sync.Mutex{},
|
mutex: &sync.Mutex{},
|
||||||
reqCh: reqCh,
|
reqCh: reqCh,
|
||||||
resCh: resCh,
|
resCh: resCh,
|
||||||
doneCh: doneCh,
|
doneCh: doneCh,
|
||||||
objectInfo: objectInfo,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -419,6 +548,7 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
|
||||||
|
|
||||||
customHeader := make(http.Header)
|
customHeader := make(http.Header)
|
||||||
// Set ranges if length and offset are valid.
|
// Set ranges if length and offset are valid.
|
||||||
|
// See https://tools.ietf.org/html/rfc7233#section-3.1 for reference.
|
||||||
if length > 0 && offset >= 0 {
|
if length > 0 && offset >= 0 {
|
||||||
customHeader.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
|
customHeader.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
|
||||||
} else if offset > 0 && length == 0 {
|
} else if offset > 0 && length == 0 {
|
||||||
|
|
|
@ -134,7 +134,15 @@ func (c Client) ListenBucketNotification(bucketName string, accountArn Arn, done
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Continuously run and listen on bucket notification.
|
// Check ARN partition to verify if listening bucket is supported
|
||||||
|
if accountArn.Partition != "minio" {
|
||||||
|
notificationInfoCh <- NotificationInfo{
|
||||||
|
Err: ErrAPINotSupported("Listening bucket notification is specific only to `minio` partitions"),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Continously run and listen on bucket notification.
|
||||||
for {
|
for {
|
||||||
urlValues := make(url.Values)
|
urlValues := make(url.Values)
|
||||||
urlValues.Set("notificationARN", accountArn.String())
|
urlValues.Set("notificationARN", accountArn.String())
|
||||||
|
|
|
@ -59,7 +59,7 @@ func TestMakeBucketErrorV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket in 'eu-west-1'.
|
// Make a new bucket in 'eu-west-1'.
|
||||||
if err = c.MakeBucket(bucketName, "eu-west-1"); err != nil {
|
if err = c.MakeBucket(bucketName, "eu-west-1"); err != nil {
|
||||||
|
@ -105,7 +105,7 @@ func TestGetObjectClosedTwiceV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -122,7 +122,7 @@ func TestGetObjectClosedTwiceV2(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -190,7 +190,7 @@ func TestRemovePartiallyUploadedV2(t *testing.T) {
|
||||||
// c.TraceOn(os.Stderr)
|
// c.TraceOn(os.Stderr)
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// make a new bucket.
|
// make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -257,7 +257,7 @@ func TestResumablePutObjectV2(t *testing.T) {
|
||||||
// c.TraceOn(os.Stderr)
|
// c.TraceOn(os.Stderr)
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -368,7 +368,7 @@ func TestFPutObjectV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -516,7 +516,7 @@ func TestResumableFPutObjectV2(t *testing.T) {
|
||||||
// c.TraceOn(os.Stderr)
|
// c.TraceOn(os.Stderr)
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// make a new bucket.
|
// make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -593,7 +593,7 @@ func TestMakeBucketRegionsV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket in 'eu-central-1'.
|
// Make a new bucket in 'eu-central-1'.
|
||||||
if err = c.MakeBucket(bucketName, "eu-west-1"); err != nil {
|
if err = c.MakeBucket(bucketName, "eu-west-1"); err != nil {
|
||||||
|
@ -644,7 +644,7 @@ func TestGetObjectReadSeekFunctionalV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -661,7 +661,7 @@ func TestGetObjectReadSeekFunctionalV2(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -782,7 +782,7 @@ func TestGetObjectReadAtFunctionalV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -799,7 +799,7 @@ func TestGetObjectReadAtFunctionalV2(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -923,7 +923,7 @@ func TestCopyObjectV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket in 'us-east-1' (source bucket).
|
// Make a new bucket in 'us-east-1' (source bucket).
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -946,7 +946,7 @@ func TestCopyObjectV2(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -1045,7 +1045,7 @@ func TestFunctionalV2(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -1054,7 +1054,7 @@ func TestFunctionalV2(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate a random file name.
|
// Generate a random file name.
|
||||||
fileName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
fileName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
file, err := os.Create(fileName)
|
file, err := os.Create(fileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err)
|
t.Fatal("Error:", err)
|
||||||
|
|
|
@ -40,7 +40,8 @@ const (
|
||||||
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
|
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
|
||||||
)
|
)
|
||||||
|
|
||||||
func randString(n int, src rand.Source) string {
|
// randString generates random names and prepends them with a known prefix.
|
||||||
|
func randString(n int, src rand.Source, prefix string) string {
|
||||||
b := make([]byte, n)
|
b := make([]byte, n)
|
||||||
// A rand.Int63() generates 63 random bits, enough for letterIdxMax letters!
|
// A rand.Int63() generates 63 random bits, enough for letterIdxMax letters!
|
||||||
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
|
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
|
||||||
|
@ -54,7 +55,7 @@ func randString(n int, src rand.Source) string {
|
||||||
cache >>= letterIdxBits
|
cache >>= letterIdxBits
|
||||||
remain--
|
remain--
|
||||||
}
|
}
|
||||||
return string(b[0:30])
|
return prefix + string(b[0:30-len(prefix)])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests bucket re-create errors.
|
// Tests bucket re-create errors.
|
||||||
|
@ -84,7 +85,7 @@ func TestMakeBucketError(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket in 'eu-central-1'.
|
// Make a new bucket in 'eu-central-1'.
|
||||||
if err = c.MakeBucket(bucketName, "eu-central-1"); err != nil {
|
if err = c.MakeBucket(bucketName, "eu-central-1"); err != nil {
|
||||||
|
@ -130,7 +131,7 @@ func TestMakeBucketRegions(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket in 'eu-central-1'.
|
// Make a new bucket in 'eu-central-1'.
|
||||||
if err = c.MakeBucket(bucketName, "eu-central-1"); err != nil {
|
if err = c.MakeBucket(bucketName, "eu-central-1"); err != nil {
|
||||||
|
@ -181,7 +182,7 @@ func TestPutObjectReadAt(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -193,7 +194,7 @@ func TestPutObjectReadAt(t *testing.T) {
|
||||||
buf := make([]byte, minPartSize*4)
|
buf := make([]byte, minPartSize*4)
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -261,7 +262,7 @@ func TestListPartiallyUploaded(t *testing.T) {
|
||||||
// c.TraceOn(os.Stderr)
|
// c.TraceOn(os.Stderr)
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -337,7 +338,7 @@ func TestGetOjectSeekEnd(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -354,7 +355,7 @@ func TestGetOjectSeekEnd(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -438,7 +439,7 @@ func TestGetObjectClosedTwice(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -455,7 +456,7 @@ func TestGetObjectClosedTwice(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -523,7 +524,7 @@ func TestRemovePartiallyUploaded(t *testing.T) {
|
||||||
// c.TraceOn(os.Stderr)
|
// c.TraceOn(os.Stderr)
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -593,7 +594,7 @@ func TestResumablePutObject(t *testing.T) {
|
||||||
// c.TraceOn(os.Stderr)
|
// c.TraceOn(os.Stderr)
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -703,7 +704,7 @@ func TestResumableFPutObject(t *testing.T) {
|
||||||
// c.TraceOn(os.Stderr)
|
// c.TraceOn(os.Stderr)
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -783,7 +784,7 @@ func TestFPutObjectMultipart(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -862,7 +863,7 @@ func TestFPutObject(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -1010,7 +1011,7 @@ func TestGetObjectReadSeekFunctional(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -1027,7 +1028,7 @@ func TestGetObjectReadSeekFunctional(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -1148,7 +1149,7 @@ func TestGetObjectReadAtFunctional(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -1165,7 +1166,7 @@ func TestGetObjectReadAtFunctional(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -1289,7 +1290,7 @@ func TestPresignedPostPolicy(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket in 'us-east-1' (source bucket).
|
// Make a new bucket in 'us-east-1' (source bucket).
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -1306,7 +1307,7 @@ func TestPresignedPostPolicy(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -1389,7 +1390,7 @@ func TestCopyObject(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket in 'us-east-1' (source bucket).
|
// Make a new bucket in 'us-east-1' (source bucket).
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -1412,7 +1413,7 @@ func TestCopyObject(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the data
|
// Save the data
|
||||||
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err, bucketName, objectName)
|
t.Fatal("Error:", err, bucketName, objectName)
|
||||||
|
@ -1575,6 +1576,13 @@ func TestBucketNotification(t *testing.T) {
|
||||||
bNotification := BucketNotification{}
|
bNotification := BucketNotification{}
|
||||||
bNotification.AddTopic(topicConfig)
|
bNotification.AddTopic(topicConfig)
|
||||||
|
|
||||||
|
// Add the same topicConfig again, should have no effect
|
||||||
|
// because it is duplicated
|
||||||
|
bNotification.AddTopic(topicConfig)
|
||||||
|
if len(bNotification.TopicConfigs) != 1 {
|
||||||
|
t.Fatal("Error: duplicated entry added")
|
||||||
|
}
|
||||||
|
|
||||||
// Add and remove a queue config
|
// Add and remove a queue config
|
||||||
bNotification.AddQueue(queueConfig)
|
bNotification.AddQueue(queueConfig)
|
||||||
bNotification.RemoveQueueByArn(queueArn)
|
bNotification.RemoveQueueByArn(queueArn)
|
||||||
|
@ -1629,7 +1637,7 @@ func TestFunctional(t *testing.T) {
|
||||||
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
|
||||||
|
|
||||||
// Generate a new random bucket name.
|
// Generate a new random bucket name.
|
||||||
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
|
||||||
|
|
||||||
// Make a new bucket.
|
// Make a new bucket.
|
||||||
err = c.MakeBucket(bucketName, "us-east-1")
|
err = c.MakeBucket(bucketName, "us-east-1")
|
||||||
|
@ -1638,7 +1646,7 @@ func TestFunctional(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate a random file name.
|
// Generate a random file name.
|
||||||
fileName := randString(60, rand.NewSource(time.Now().UnixNano()))
|
fileName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
|
||||||
file, err := os.Create(fileName)
|
file, err := os.Create(fileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err)
|
t.Fatal("Error:", err)
|
||||||
|
|
|
@ -78,22 +78,41 @@ func TestGetReaderSize(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create request channel.
|
// Create request channel.
|
||||||
reqCh := make(chan readRequest)
|
reqCh := make(chan getRequest, 1)
|
||||||
// Create response channel.
|
// Create response channel.
|
||||||
resCh := make(chan readResponse)
|
resCh := make(chan getResponse, 1)
|
||||||
// Create done channel.
|
// Create done channel.
|
||||||
doneCh := make(chan struct{})
|
doneCh := make(chan struct{})
|
||||||
// objectInfo.
|
|
||||||
objectInfo := ObjectInfo{Size: 10}
|
|
||||||
objectReader := newObject(reqCh, resCh, doneCh, objectInfo)
|
|
||||||
defer objectReader.Close()
|
|
||||||
|
|
||||||
size, err = getReaderSize(objectReader)
|
objectInfo := ObjectInfo{Size: 10}
|
||||||
|
// Create the first request.
|
||||||
|
firstReq := getRequest{
|
||||||
|
isReadOp: false, // Perform only a HEAD object to get objectInfo.
|
||||||
|
isFirstReq: true,
|
||||||
|
}
|
||||||
|
// Create the expected response.
|
||||||
|
firstRes := getResponse{
|
||||||
|
objectInfo: objectInfo,
|
||||||
|
}
|
||||||
|
// Send the expected response.
|
||||||
|
resCh <- firstRes
|
||||||
|
|
||||||
|
// Test setting size on the first request.
|
||||||
|
objectReaderFirstReq := newObject(reqCh, resCh, doneCh)
|
||||||
|
defer objectReaderFirstReq.Close()
|
||||||
|
// Not checking the response here...just that the reader size is correct.
|
||||||
|
_, err = objectReaderFirstReq.doGetRequest(firstReq)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Error:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate that the reader size is the objectInfo size.
|
||||||
|
size, err = getReaderSize(objectReaderFirstReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error:", err)
|
t.Fatal("Error:", err)
|
||||||
}
|
}
|
||||||
if size != int64(10) {
|
if size != int64(10) {
|
||||||
t.Fatalf("Reader length doesn't match got: %v, want: %v", size, 10)
|
t.Fatalf("Reader length doesn't match got: %d, wanted %d", size, objectInfo.Size)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileReader, err := ioutil.TempFile(os.TempDir(), "prefix")
|
fileReader, err := ioutil.TempFile(os.TempDir(), "prefix")
|
||||||
|
|
|
@ -18,6 +18,7 @@ package minio
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
"reflect"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NotificationEventType is a S3 notification event associated to the bucket notification configuration
|
// NotificationEventType is a S3 notification event associated to the bucket notification configuration
|
||||||
|
@ -160,18 +161,36 @@ type BucketNotification struct {
|
||||||
// AddTopic adds a given topic config to the general bucket notification config
|
// AddTopic adds a given topic config to the general bucket notification config
|
||||||
func (b *BucketNotification) AddTopic(topicConfig NotificationConfig) {
|
func (b *BucketNotification) AddTopic(topicConfig NotificationConfig) {
|
||||||
newTopicConfig := TopicConfig{NotificationConfig: topicConfig, Topic: topicConfig.Arn.String()}
|
newTopicConfig := TopicConfig{NotificationConfig: topicConfig, Topic: topicConfig.Arn.String()}
|
||||||
|
for _, n := range b.TopicConfigs {
|
||||||
|
if reflect.DeepEqual(n, newTopicConfig) {
|
||||||
|
// Avoid adding duplicated entry
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
b.TopicConfigs = append(b.TopicConfigs, newTopicConfig)
|
b.TopicConfigs = append(b.TopicConfigs, newTopicConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddQueue adds a given queue config to the general bucket notification config
|
// AddQueue adds a given queue config to the general bucket notification config
|
||||||
func (b *BucketNotification) AddQueue(queueConfig NotificationConfig) {
|
func (b *BucketNotification) AddQueue(queueConfig NotificationConfig) {
|
||||||
newQueueConfig := QueueConfig{NotificationConfig: queueConfig, Queue: queueConfig.Arn.String()}
|
newQueueConfig := QueueConfig{NotificationConfig: queueConfig, Queue: queueConfig.Arn.String()}
|
||||||
|
for _, n := range b.QueueConfigs {
|
||||||
|
if reflect.DeepEqual(n, newQueueConfig) {
|
||||||
|
// Avoid adding duplicated entry
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
b.QueueConfigs = append(b.QueueConfigs, newQueueConfig)
|
b.QueueConfigs = append(b.QueueConfigs, newQueueConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddLambda adds a given lambda config to the general bucket notification config
|
// AddLambda adds a given lambda config to the general bucket notification config
|
||||||
func (b *BucketNotification) AddLambda(lambdaConfig NotificationConfig) {
|
func (b *BucketNotification) AddLambda(lambdaConfig NotificationConfig) {
|
||||||
newLambdaConfig := LambdaConfig{NotificationConfig: lambdaConfig, Lambda: lambdaConfig.Arn.String()}
|
newLambdaConfig := LambdaConfig{NotificationConfig: lambdaConfig, Lambda: lambdaConfig.Arn.String()}
|
||||||
|
for _, n := range b.LambdaConfigs {
|
||||||
|
if reflect.DeepEqual(n, newLambdaConfig) {
|
||||||
|
// Avoid adding duplicated entry
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
b.LambdaConfigs = append(b.LambdaConfigs, newLambdaConfig)
|
b.LambdaConfigs = append(b.LambdaConfigs, newLambdaConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -911,7 +911,7 @@ __Example__
|
||||||
|
|
||||||
|
|
||||||
```go
|
```go
|
||||||
topicArn := NewArn("aws", "s3", "us-east-1", "804605494417", "PhotoUpdate")
|
topicArn := NewArn("aws", "sns", "us-east-1", "804605494417", "PhotoUpdate")
|
||||||
|
|
||||||
topicConfig := NewNotificationConfig(topicArn)
|
topicConfig := NewNotificationConfig(topicArn)
|
||||||
topicConfig.AddEvents(minio.ObjectCreatedAll, minio.ObjectRemovedAll)
|
topicConfig.AddEvents(minio.ObjectCreatedAll, minio.ObjectRemovedAll)
|
||||||
|
|
Loading…
Reference in a new issue