Re-add the async buffer on seek - fixes #1137

This commit is contained in:
Nick Craig-Wood 2017-02-15 22:39:07 +00:00
parent 3f778d70f7
commit f15c6b68b6
2 changed files with 35 additions and 12 deletions

View file

@ -343,12 +343,8 @@ func NewAccount(in io.ReadCloser, obj Object) *Account {
return NewAccountSizeName(in, obj.Size(), obj.Remote()) return NewAccountSizeName(in, obj.Size(), obj.Remote())
} }
// NewAccountSizeNameWithBuffer makes a Account reader for an io.ReadCloser of // bufferReadCloser returns a buffered version of in if necessary
// the given size and name func bufferReadCloser(in io.ReadCloser, size int64, name string) io.ReadCloser {
//
// If the file is above a certain size it adds an Async reader
func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account {
const bufSize = 1024 * 1024
var buffers int var buffers int
if size >= int64(Config.BufferSize) { if size >= int64(Config.BufferSize) {
buffers = int(int64(Config.BufferSize) / asyncBufferSize) buffers = int(int64(Config.BufferSize) / asyncBufferSize)
@ -364,7 +360,15 @@ func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Ac
in = newIn in = newIn
} }
} }
return NewAccountSizeName(in, size, name) return in
}
// NewAccountSizeNameWithBuffer makes a Account reader for an io.ReadCloser of
// the given size and name
//
// If the file is above a certain size it adds an Async reader
func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account {
return NewAccountSizeName(bufferReadCloser(in, size, name), size, name)
} }
// NewAccountWithBuffer makes a Account reader for an object // NewAccountWithBuffer makes a Account reader for an object
@ -384,7 +388,10 @@ func (acc *Account) GetReader() io.ReadCloser {
// UpdateReader updates the underlying io.ReadCloser // UpdateReader updates the underlying io.ReadCloser
func (acc *Account) UpdateReader(in io.ReadCloser) { func (acc *Account) UpdateReader(in io.ReadCloser) {
acc.mu.Lock() acc.mu.Lock()
acc.in = in if asyncIn, ok := acc.in.(*asyncReader); ok {
asyncIn.Abandon()
}
acc.in = bufferReadCloser(in, acc.size, acc.name)
acc.mu.Unlock() acc.mu.Unlock()
} }

View file

@ -152,9 +152,10 @@ func (a *asyncReader) WriteTo(w io.Writer) (n int64, err error) {
} }
} }
// Close will ensure that the underlying async reader is shut down. // close will ensure that the underlying async reader is shut down.
// It will also close the input supplied on newAsyncReader. // If closeIn is set it will also close the input supplied on
func (a *asyncReader) Close() (err error) { // newAsyncReader.
func (a *asyncReader) close(closeIn bool) (err error) {
// Return if already closed // Return if already closed
select { select {
case <-a.exit: case <-a.exit:
@ -171,7 +172,22 @@ func (a *asyncReader) Close() (err error) {
for b := range a.ready { for b := range a.ready {
a.putBuffer(b) a.putBuffer(b)
} }
return a.in.Close() if closeIn {
return a.in.Close()
}
return nil
}
// Close will ensure that the underlying async reader is shut down.
// It will also close the input supplied on newAsyncReader.
func (a *asyncReader) Close() (err error) {
return a.close(true)
}
// Abandon will ensure that the underlying async reader is shut down.
// It will NOT close the input supplied on newAsyncReader.
func (a *asyncReader) Abandon() {
_ = a.close(false)
} }
// Internal buffer // Internal buffer