Merge pull request #53 from stevvooe/spool-uploads-remotely

Spool uploads remotely
This commit is contained in:
Stephen Day 2015-01-09 15:40:22 -08:00
commit 21a69f53b5
16 changed files with 692 additions and 476 deletions

View file

@ -29,8 +29,6 @@ type App struct {
// services contains the main services instance for the application.
services *storage.Services
tokenProvider tokenProvider
layerHandler storage.LayerHandler
accessController auth.AccessController
@ -66,8 +64,6 @@ func NewApp(configuration configuration.Configuration) *App {
app.driver = driver
app.services = storage.NewServices(app.driver)
app.tokenProvider = newHMACTokenProvider(configuration.HTTP.Secret)
authType := configuration.Auth.Type()
if authType != "" {

72
registry/hmac.go Normal file
View file

@ -0,0 +1,72 @@
package registry
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"time"
)
// layerUploadState captures the state serializable state of the layer upload.
type layerUploadState struct {
// name is the primary repository under which the layer will be linked.
Name string
// UUID identifies the upload.
UUID string
// offset contains the current progress of the upload.
Offset int64
// StartedAt is the original start time of the upload.
StartedAt time.Time
}
type hmacKey string
// unpackUploadState unpacks and validates the layer upload state from the
// token, using the hmacKey secret.
func (secret hmacKey) unpackUploadState(token string) (layerUploadState, error) {
var state layerUploadState
tokenBytes, err := base64.URLEncoding.DecodeString(token)
if err != nil {
return state, err
}
mac := hmac.New(sha256.New, []byte(secret))
if len(tokenBytes) < mac.Size() {
return state, fmt.Errorf("Invalid token")
}
macBytes := tokenBytes[:mac.Size()]
messageBytes := tokenBytes[mac.Size():]
mac.Write(messageBytes)
if !hmac.Equal(mac.Sum(nil), macBytes) {
return state, fmt.Errorf("Invalid token")
}
if err := json.Unmarshal(messageBytes, &state); err != nil {
return state, err
}
return state, nil
}
// packUploadState packs the upload state signed with and hmac digest using
// the hmacKey secret, encoding to url safe base64. The resulting token can be
// used to share data with minimized risk of external tampering.
func (secret hmacKey) packUploadState(lus layerUploadState) (string, error) {
mac := hmac.New(sha256.New, []byte(secret))
p, err := json.Marshal(lus)
if err != nil {
return "", err
}
mac.Write(p)
return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), p...)), nil
}

View file

@ -1,12 +1,8 @@
package registry
import (
"testing"
import "testing"
"github.com/docker/distribution/storage"
)
var layerUploadStates = []storage.LayerUploadState{
var layerUploadStates = []layerUploadState{
{
Name: "hello",
UUID: "abcd-1234-qwer-0987",
@ -47,15 +43,15 @@ var secrets = []string{
// TestLayerUploadTokens constructs stateTokens from LayerUploadStates and
// validates that the tokens can be used to reconstruct the proper upload state.
func TestLayerUploadTokens(t *testing.T) {
tokenProvider := newHMACTokenProvider("supersecret")
secret := hmacKey("supersecret")
for _, testcase := range layerUploadStates {
token, err := tokenProvider.layerUploadStateToToken(testcase)
token, err := secret.packUploadState(testcase)
if err != nil {
t.Fatal(err)
}
lus, err := tokenProvider.layerUploadStateFromToken(token)
lus, err := secret.unpackUploadState(token)
if err != nil {
t.Fatal(err)
}
@ -68,39 +64,39 @@ func TestLayerUploadTokens(t *testing.T) {
// only if they share the same secret.
func TestHMACValidation(t *testing.T) {
for _, secret := range secrets {
tokenProvider1 := newHMACTokenProvider(secret)
tokenProvider2 := newHMACTokenProvider(secret)
badTokenProvider := newHMACTokenProvider("DifferentSecret")
secret1 := hmacKey(secret)
secret2 := hmacKey(secret)
badSecret := hmacKey("DifferentSecret")
for _, testcase := range layerUploadStates {
token, err := tokenProvider1.layerUploadStateToToken(testcase)
token, err := secret1.packUploadState(testcase)
if err != nil {
t.Fatal(err)
}
lus, err := tokenProvider2.layerUploadStateFromToken(token)
lus, err := secret2.unpackUploadState(token)
if err != nil {
t.Fatal(err)
}
assertLayerUploadStateEquals(t, testcase, lus)
_, err = badTokenProvider.layerUploadStateFromToken(token)
_, err = badSecret.unpackUploadState(token)
if err == nil {
t.Fatalf("Expected token provider to fail at retrieving state from token: %s", token)
}
badToken, err := badTokenProvider.layerUploadStateToToken(testcase)
badToken, err := badSecret.packUploadState(lus)
if err != nil {
t.Fatal(err)
}
_, err = tokenProvider1.layerUploadStateFromToken(badToken)
_, err = secret1.unpackUploadState(badToken)
if err == nil {
t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken)
}
_, err = tokenProvider2.layerUploadStateFromToken(badToken)
_, err = secret2.unpackUploadState(badToken)
if err == nil {
t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken)
}
@ -108,7 +104,7 @@ func TestHMACValidation(t *testing.T) {
}
}
func assertLayerUploadStateEquals(t *testing.T, expected storage.LayerUploadState, received storage.LayerUploadState) {
func assertLayerUploadStateEquals(t *testing.T, expected layerUploadState, received layerUploadState) {
if expected.Name != received.Name {
t.Fatalf("Expected Name=%q, Received Name=%q", expected.Name, received.Name)
}

View file

@ -5,7 +5,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/api/v2"
@ -33,26 +33,57 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
if luh.UUID != "" {
luh.log = luh.log.WithField("uuid", luh.UUID)
state, err := ctx.tokenProvider.layerUploadStateFromToken(r.FormValue("_state"))
state, err := hmacKey(ctx.Config.HTTP.Secret).unpackUploadState(r.FormValue("_state"))
if err != nil {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logrus.Infof("error resolving upload: %v", err)
w.WriteHeader(http.StatusInternalServerError)
luh.Errors.Push(v2.ErrorCodeUnknown, err)
ctx.log.Infof("error resolving upload: %v", err)
w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err)
})
}
luh.State = state
if state.UUID != luh.UUID {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.log.Infof("mismatched uuid in upload state: %q != %q", state.UUID, luh.UUID)
w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err)
})
}
layers := ctx.services.Layers()
upload, err := layers.Resume(state)
upload, err := layers.Resume(luh.Name, luh.UUID)
if err != nil && err != storage.ErrLayerUploadUnknown {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logrus.Infof("error resolving upload: %v", err)
w.WriteHeader(http.StatusInternalServerError)
luh.Errors.Push(v2.ErrorCodeUnknown, err)
ctx.log.Errorf("error resolving upload: %v", err)
w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeBlobUploadUnknown, err)
})
}
luh.Upload = upload
if state.Offset > 0 {
// Seek the layer upload to the correct spot if it's non-zero.
// These error conditions should be rare and demonstrate really
// problems. We basically cancel the upload and tell the client to
// start over.
if nn, err := upload.Seek(luh.State.Offset, os.SEEK_SET); err != nil {
ctx.log.Infof("error seeking layer upload: %v", err)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err)
upload.Cancel()
})
} else if nn != luh.State.Offset {
ctx.log.Infof("seek to wrong offest: %d != %d", nn, luh.State.Offset)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err)
upload.Cancel()
})
}
}
handler = closeResources(handler, luh.Upload)
}
@ -67,6 +98,8 @@ type layerUploadHandler struct {
UUID string
Upload storage.LayerUpload
State layerUploadState
}
// StartLayerUpload begins the layer upload process and allocates a server-
@ -171,14 +204,30 @@ func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.
// chunk responses. This sets the correct headers but the response status is
// left to the caller.
func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error {
values := make(url.Values)
stateToken, err := luh.Context.tokenProvider.layerUploadStateToToken(storage.LayerUploadState{Name: luh.Upload.Name(), UUID: luh.Upload.UUID(), Offset: luh.Upload.Offset()})
offset, err := luh.Upload.Seek(0, os.SEEK_CUR)
if err != nil {
luh.log.Errorf("unable get current offset of layer upload: %v", err)
return err
}
// TODO(stevvooe): Need a better way to manage the upload state automatically.
luh.State.Name = luh.Name
luh.State.UUID = luh.Upload.UUID()
luh.State.Offset = offset
luh.State.StartedAt = luh.Upload.StartedAt()
token, err := hmacKey(luh.Config.HTTP.Secret).packUploadState(luh.State)
if err != nil {
logrus.Infof("error building upload state token: %s", err)
return err
}
values.Set("_state", stateToken)
uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(luh.Upload.Name(), luh.Upload.UUID(), values)
uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(
luh.Upload.Name(), luh.Upload.UUID(),
url.Values{
"_state": []string{token},
})
if err != nil {
logrus.Infof("error building upload url: %s", err)
return err
@ -186,7 +235,7 @@ func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *htt
w.Header().Set("Location", uploadURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset()))
w.Header().Set("Range", fmt.Sprintf("0-%d", luh.State.Offset))
return nil
}
@ -198,7 +247,6 @@ var errNotReadyToComplete = fmt.Errorf("not ready to complete upload")
func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error {
// If we get a digest and length, we can finish the upload.
dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters!
sizeStr := r.FormValue("size")
if dgstStr == "" {
return errNotReadyToComplete
@ -209,23 +257,13 @@ func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *htt
return err
}
var size int64
if sizeStr != "" {
size, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return err
}
} else {
size = -1
}
luh.completeUpload(w, r, size, dgst)
luh.completeUpload(w, r, dgst)
return nil
}
// completeUpload finishes out the upload with the correct response.
func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) {
layer, err := luh.Upload.Finish(size, dgst)
func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, dgst digest.Digest) {
layer, err := luh.Upload.Finish(dgst)
if err != nil {
luh.Errors.Push(v2.ErrorCodeUnknown, err)
w.WriteHeader(http.StatusInternalServerError)

View file

@ -1,65 +0,0 @@
package registry
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/docker/distribution/storage"
)
// tokenProvider contains methods for serializing and deserializing state from token strings.
type tokenProvider interface {
// layerUploadStateFromToken retrieves the LayerUploadState for a given state token.
layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error)
// layerUploadStateToToken returns a token string representing the given LayerUploadState.
layerUploadStateToToken(layerUploadState storage.LayerUploadState) (string, error)
}
type hmacTokenProvider struct {
secret string
}
func newHMACTokenProvider(secret string) tokenProvider {
return &hmacTokenProvider{secret: secret}
}
// layerUploadStateFromToken deserializes the given HMAC stateToken and validates the prefix HMAC
func (ts *hmacTokenProvider) layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) {
var lus storage.LayerUploadState
tokenBytes, err := base64.URLEncoding.DecodeString(stateToken)
if err != nil {
return lus, err
}
mac := hmac.New(sha256.New, []byte(ts.secret))
if len(tokenBytes) < mac.Size() {
return lus, fmt.Errorf("Invalid token")
}
macBytes := tokenBytes[:mac.Size()]
messageBytes := tokenBytes[mac.Size():]
mac.Write(messageBytes)
if !hmac.Equal(mac.Sum(nil), macBytes) {
return lus, fmt.Errorf("Invalid token")
}
if err := json.Unmarshal(messageBytes, &lus); err != nil {
return lus, err
}
return lus, nil
}
// layerUploadStateToToken serializes the given LayerUploadState to JSON with an HMAC prepended
func (ts *hmacTokenProvider) layerUploadStateToToken(lus storage.LayerUploadState) (string, error) {
mac := hmac.New(sha256.New, []byte(ts.secret))
stateJSON := fmt.Sprintf("{\"Name\": \"%s\", \"UUID\": \"%s\", \"Offset\": %d}", lus.Name, lus.UUID, lus.Offset)
mac.Write([]byte(stateJSON))
return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), stateJSON...)), nil
}

153
storage/filewriter.go Normal file
View file

@ -0,0 +1,153 @@
package storage
import (
"bytes"
"fmt"
"io"
"os"
"github.com/docker/distribution/storagedriver"
)
// fileWriter implements a remote file writer backed by a storage driver.
type fileWriter struct {
driver storagedriver.StorageDriver
// identifying fields
path string
// mutable fields
size int64 // size of the file, aka the current end
offset int64 // offset is the current write offset
err error // terminal error, if set, reader is closed
}
// fileWriterInterface makes the desired io compliant interface that the
// filewriter should implement.
type fileWriterInterface interface {
io.WriteSeeker
io.WriterAt
io.ReaderFrom
io.Closer
}
var _ fileWriterInterface = &fileWriter{}
// newFileWriter returns a prepared fileWriter for the driver and path. This
// could be considered similar to an "open" call on a regular filesystem.
func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) {
fw := fileWriter{
driver: driver,
path: path,
}
if fi, err := driver.Stat(path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// ignore, offset is zero
default:
return nil, err
}
} else {
if fi.IsDir() {
return nil, fmt.Errorf("cannot write to a directory")
}
fw.size = fi.Size()
}
return &fw, nil
}
// Write writes the buffer p at the current write offset.
func (fw *fileWriter) Write(p []byte) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), -1)
return int(nn), err
}
// WriteAt writes p at the specified offset. The underlying offset does not
// change.
func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), offset)
return int(nn), err
}
// ReadFrom reads reader r until io.EOF writing the contents at the current
// offset.
func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) {
return fw.readFromAt(r, -1)
}
// Seek moves the write position do the requested offest based on the whence
// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET.
func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
if fw.err != nil {
return 0, fw.err
}
var err error
newOffset := fw.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = fw.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else if newOffset > fw.size {
fw.offset = newOffset
fw.size = newOffset
} else {
// No problems, set the offset.
fw.offset = newOffset
}
return fw.offset, err
}
// Close closes the fileWriter for writing.
func (fw *fileWriter) Close() error {
if fw.err != nil {
return fw.err
}
fw.err = fmt.Errorf("filewriter@%v: closed", fw.path)
return fw.err
}
// readFromAt writes to fw from r at the specified offset. If offset is less
// than zero, the value of fw.offset is used and updated after the operation.
func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) {
if fw.err != nil {
return 0, fw.err
}
var updateOffset bool
if offset < 0 {
offset = fw.offset
updateOffset = true
}
nn, err := fw.driver.WriteStream(fw.path, offset, r)
if updateOffset {
// We should forward the offset, whether or not there was an error.
// Basically, we keep the filewriter in sync with the reader's head. If an
// error is encountered, the whole thing should be retried but we proceed
// from an expected offset, even if the data didn't make it to the
// backend.
fw.offset += nn
if fw.offset > fw.size {
fw.size = fw.offset
}
}
return nn, err
}

148
storage/filewriter_test.go Normal file
View file

@ -0,0 +1,148 @@
package storage
import (
"bytes"
"crypto/rand"
"io"
"os"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver/inmemory"
)
// TestSimpleWrite takes the fileWriter through common write operations
// ensuring data integrity.
func TestSimpleWrite(t *testing.T) {
content := make([]byte, 1<<20)
n, err := rand.Read(content)
if err != nil {
t.Fatalf("unexpected error building random data: %v", err)
}
if n != len(content) {
t.Fatalf("random read did't fill buffer")
}
dgst, err := digest.FromReader(bytes.NewReader(content))
if err != nil {
t.Fatalf("unexpected error digesting random content: %v", err)
}
driver := inmemory.New()
path := "/random"
fw, err := newFileWriter(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileWriter: %v", err)
}
defer fw.Close()
n, err = fw.Write(content)
if err != nil {
t.Fatalf("unexpected error writing content: %v", err)
}
if n != len(content) {
t.Fatalf("unexpected write length: %d != %d", n, len(content))
}
fr, err := newFileReader(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
verifier := digest.NewDigestVerifier(dgst)
io.Copy(verifier, fr)
if !verifier.Verified() {
t.Fatalf("unable to verify write data")
}
// Check the seek position is equal to the content length
end, err := fw.Seek(0, os.SEEK_END)
if err != nil {
t.Fatalf("unexpected error seeking: %v", err)
}
if end != int64(len(content)) {
t.Fatalf("write did not advance offset: %d != %d", end, len(content))
}
// Double the content, but use the WriteAt method
doubled := append(content, content...)
doubledgst, err := digest.FromReader(bytes.NewReader(doubled))
if err != nil {
t.Fatalf("unexpected error digesting doubled content: %v", err)
}
n, err = fw.WriteAt(content, end)
if err != nil {
t.Fatalf("unexpected error writing content at %d: %v", end, err)
}
if n != len(content) {
t.Fatalf("writeat was short: %d != %d", n, len(content))
}
fr, err = newFileReader(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
verifier = digest.NewDigestVerifier(doubledgst)
io.Copy(verifier, fr)
if !verifier.Verified() {
t.Fatalf("unable to verify write data")
}
// Check that WriteAt didn't update the offset.
end, err = fw.Seek(0, os.SEEK_END)
if err != nil {
t.Fatalf("unexpected error seeking: %v", err)
}
if end != int64(len(content)) {
t.Fatalf("write did not advance offset: %d != %d", end, len(content))
}
// Now, we copy from one path to another, running the data through the
// fileReader to fileWriter, rather than the driver.Move command to ensure
// everything is working correctly.
fr, err = newFileReader(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
fw, err = newFileWriter(driver, "/copied")
if err != nil {
t.Fatalf("unexpected error creating fileWriter: %v", err)
}
defer fw.Close()
nn, err := io.Copy(fw, fr)
if err != nil {
t.Fatalf("unexpected error copying data: %v", err)
}
if nn != int64(len(doubled)) {
t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled))
}
fr, err = newFileReader(driver, "/copied")
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
verifier = digest.NewDigestVerifier(doubledgst)
io.Copy(verifier, fr)
if !verifier.Verified() {
t.Fatalf("unable to verify write data")
}
}

View file

@ -24,8 +24,7 @@ type Layer interface {
// layers.
Digest() digest.Digest
// CreatedAt returns the time this layer was created. Until we implement
// Stat call on storagedriver, this just returns the zero time.
// CreatedAt returns the time this layer was created.
CreatedAt() time.Time
}
@ -33,26 +32,22 @@ type Layer interface {
// Instances can be obtained from the LayerService.Upload and
// LayerService.Resume.
type LayerUpload interface {
io.WriteCloser
// UUID returns the identifier for this upload.
UUID() string
io.WriteSeeker
io.Closer
// Name of the repository under which the layer will be linked.
Name() string
// Offset returns the position of the last byte written to this layer.
Offset() int64
// UUID returns the identifier for this upload.
UUID() string
// TODO(stevvooe): Consider completely removing the size check from this
// interface. The digest check may be adequate and we are making it
// optional in the HTTP API.
// StartedAt returns the time this layer upload was started.
StartedAt() time.Time
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and digest are validated against the
// contents of the uploaded layer. If the size is negative, only the
// digest will be checked.
Finish(size int64, digest digest.Digest) (Layer, error)
// uploaded layer. The digest is validated against the contents of the
// uploaded layer.
Finish(digest digest.Digest) (Layer, error)
// Cancel the layer upload process.
Cancel() error
@ -84,11 +79,11 @@ func (err ErrUnknownLayer) Error() string {
// ErrLayerInvalidDigest returned when tarsum check fails.
type ErrLayerInvalidDigest struct {
FSLayer manifest.FSLayer
Digest digest.Digest
}
func (err ErrLayerInvalidDigest) Error() string {
return fmt.Sprintf("invalid digest for referenced layer: %v", err.FSLayer.BlobSum)
return fmt.Sprintf("invalid digest for referenced layer: %v", err.Digest)
}
// ErrLayerInvalidSize returned when length check fails.

View file

@ -26,21 +26,18 @@ func TestSimpleLayerUpload(t *testing.T) {
dgst := digest.Digest(tarSumStr)
uploadStore, err := newTemporaryLocalFSLayerUploadStore()
if err != nil {
t.Fatalf("error allocating upload store: %v", err)
}
imageName := "foo/bar"
driver := inmemory.New()
ls := &layerStore{
driver: driver,
driver: inmemory.New(),
pathMapper: &pathMapper{
root: "/storage/testing",
version: storagePathVersion,
},
uploadStore: uploadStore,
}
h := sha256.New()
@ -58,7 +55,7 @@ func TestSimpleLayerUpload(t *testing.T) {
}
// Do a resume, get unknown upload
layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()})
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID())
if err != ErrLayerUploadUnknown {
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
}
@ -84,26 +81,31 @@ func TestSimpleLayerUpload(t *testing.T) {
t.Fatalf("layer data write incomplete")
}
if layerUpload.Offset() != nn {
t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn)
offset, err := layerUpload.Seek(0, os.SEEK_CUR)
if err != nil {
t.Fatalf("unexpected error seeking layer upload: %v", err)
}
if offset != nn {
t.Fatalf("layerUpload not updated with correct offset: %v != %v", offset, nn)
}
layerUpload.Close()
// Do a resume, for good fun
layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()})
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID())
if err != nil {
t.Fatalf("unexpected error resuming upload: %v", err)
}
sha256Digest := digest.NewDigest("sha256", h)
layer, err := layerUpload.Finish(randomDataSize, dgst)
layer, err := layerUpload.Finish(dgst)
if err != nil {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// After finishing an upload, it should no longer exist.
if _, err := ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}); err != ErrLayerUploadUnknown {
if _, err := ls.Resume(layerUpload.Name(), layerUpload.UUID()); err != ErrLayerUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err)
}

View file

@ -1,6 +1,9 @@
package storage
import (
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
@ -9,7 +12,6 @@ import (
type layerStore struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
uploadStore layerUploadStore
}
func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) {
@ -66,31 +68,86 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
// the same two layers. Should it be disallowed? For now, we allow both
// parties to proceed and the the first one uploads the layer.
lus, err := ls.uploadStore.New(name)
uuid := uuid.New()
startedAt := time.Now().UTC()
path, err := ls.pathMapper.path(uploadDataPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
return ls.newLayerUpload(lus), nil
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
// Write a startedat file for this upload
if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err
}
return ls.newLayerUpload(name, uuid, path, startedAt)
}
// Resume continues an in progress layer upload, returning the current
// state of the upload.
func (ls *layerStore) Resume(lus LayerUploadState) (LayerUpload, error) {
_, err := ls.uploadStore.GetState(lus.UUID)
func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
return ls.newLayerUpload(lus), nil
startedAtBytes, err := ls.driver.GetContent(startedAtPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return nil, ErrLayerUploadUnknown
default:
return nil, err
}
}
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
if err != nil {
return nil, err
}
path, err := ls.pathMapper.path(uploadDataPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
return ls.newLayerUpload(name, uuid, path, startedAt)
}
// newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload {
func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) {
fw, err := newFileWriter(ls.driver, path)
if err != nil {
return nil, err
}
return &layerUploadController{
LayerUploadState: lus,
layerStore: ls,
uploadStore: ls.uploadStore,
}
name: name,
uuid: uuid,
startedAt: startedAt,
fileWriter: *fw,
}, nil
}

View file

@ -1,229 +1,84 @@
package storage
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"code.google.com/p/go-uuid/uuid"
"path"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/docker/pkg/tarsum"
)
// LayerUploadState captures the state serializable state of the layer upload.
type LayerUploadState struct {
// name is the primary repository under which the layer will be linked.
Name string
// UUID identifies the upload.
UUID string
// offset contains the current progress of the upload.
Offset int64
}
// layerUploadController is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type layerUploadController struct {
LayerUploadState
layerStore *layerStore
uploadStore layerUploadStore
fp layerFile
err error // terminal error, if set, controller is closed
}
// layerFile documents the interface used while writing layer files, similar
// to *os.File. This is separate from layerReader, for now, because we want to
// store uploads on the local file system until we have write-through hashing
// support. They should be combined once this is worked out.
type layerFile interface {
io.WriteSeeker
io.Reader
io.Closer
name string
uuid string
startedAt time.Time
// Sync commits the contents of the writer to storage.
Sync() (err error)
}
// layerUploadStore provides storage for temporary files and upload state of
// layers. This is be used by the LayerService to manage the state of ongoing
// uploads. This interface will definitely change and will most likely end up
// being exported to the app layer. Move the layer.go when it's ready to go.
type layerUploadStore interface {
New(name string) (LayerUploadState, error)
Open(uuid string) (layerFile, error)
GetState(uuid string) (LayerUploadState, error)
// TODO: factor this method back in
// SaveState(lus LayerUploadState) error
DeleteState(uuid string) error
fileWriter
}
var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string {
return luc.LayerUploadState.Name
return luc.name
}
// UUID returns the identifier for this upload.
func (luc *layerUploadController) UUID() string {
return luc.LayerUploadState.UUID
return luc.uuid
}
// Offset returns the position of the last byte written to this layer.
func (luc *layerUploadController) Offset() int64 {
return luc.LayerUploadState.Offset
func (luc *layerUploadController) StartedAt() time.Time {
return luc.startedAt
}
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) {
// This section is going to be pretty ugly now. We will have to read the
// file twice. First, to get the tarsum and checksum. When those are
// available, and validated, we will upload it to the blob store and link
// it into the repository. In the future, we need to use resumable hash
// calculations for tarsum and checksum that can be calculated during the
// upload. This will allow us to cut the data directly into a temporary
// directory in the storage backend.
fp, err := luc.file()
if err != nil {
// Cleanup?
return nil, err
}
digest, err = luc.validateLayer(fp, size, digest)
func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) {
canonical, err := luc.validateLayer(digest)
if err != nil {
return nil, err
}
if nn, err := luc.writeLayer(fp, digest); err != nil {
// Cleanup?
return nil, err
} else if size >= 0 && nn != size {
// TODO(stevvooe): Short write. Will have to delete the location and
// report an error. This error needs to be reported to the client.
return nil, fmt.Errorf("short write writing layer")
}
// Yes! We have written some layer data. Let's make it visible. Link the
// layer blob into the repository.
if err := luc.linkLayer(digest); err != nil {
if err := luc.moveLayer(canonical); err != nil {
// TODO(stevvooe): Cleanup?
return nil, err
}
// Ok, the upload has completed and finished. Delete the state.
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
// Can we ignore this error?
// Link the layer blob into the repository.
if err := luc.linkLayer(canonical); err != nil {
return nil, err
}
return luc.layerStore.Fetch(luc.Name(), digest)
if err := luc.removeResources(); err != nil {
return nil, err
}
return luc.layerStore.Fetch(luc.Name(), canonical)
}
// Cancel the layer upload process.
func (luc *layerUploadController) Cancel() error {
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
if err := luc.removeResources(); err != nil {
return err
}
return luc.Close()
luc.Close()
return nil
}
func (luc *layerUploadController) Write(p []byte) (int, error) {
wr, err := luc.file()
if err != nil {
return 0, err
}
n, err := wr.Write(p)
// Because we expect the reported offset to be consistent with the storage
// state, unfortunately, we need to Sync on every call to write.
if err := wr.Sync(); err != nil {
// Effectively, ignore the write state if the Sync fails. Report that
// no bytes were written and seek back to the starting offset.
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
if seekErr != nil {
// What do we do here? Quite disasterous.
luc.reset()
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
}
if offset != luc.Offset() {
return 0, fmt.Errorf("unexpected offset after seek")
}
return 0, err
}
luc.LayerUploadState.Offset += int64(n)
return n, err
}
func (luc *layerUploadController) Close() error {
if luc.err != nil {
return luc.err
}
if luc.fp != nil {
luc.err = luc.fp.Close()
}
return luc.err
}
func (luc *layerUploadController) file() (layerFile, error) {
if luc.fp != nil {
return luc.fp, nil
}
fp, err := luc.uploadStore.Open(luc.UUID())
if err != nil {
return nil, err
}
// TODO(stevvooe): We may need a more aggressive check here to ensure that
// the file length is equal to the current offset. We may want to sync the
// offset before return the layer upload to the client so it can be
// validated before proceeding with any writes.
// Seek to the current layer offset for good measure.
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
return nil, err
}
luc.fp = fp
return luc.fp, nil
}
// reset closes and drops the current writer.
func (luc *layerUploadController) reset() {
if luc.fp != nil {
luc.fp.Close()
luc.fp = nil
}
}
// validateLayer runs several checks on the layer file to ensure its validity.
// This is currently very expensive and relies on fast io and fast seek on the
// local host. If successful, the latest digest is returned, which should be
// used over the passed in value.
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) {
// validateLayer checks the layer data against the digest, returning an error
// if it does not match. The canonical digest is returned.
func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) {
// First, check the incoming tarsum version of the digest.
version, err := tarsum.GetVersionFromTarsum(dgst.String())
if err != nil {
@ -239,87 +94,65 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d
}
digestVerifier := digest.NewDigestVerifier(dgst)
lengthVerifier := digest.NewLengthVerifier(size)
// First, seek to the end of the file, checking the size is as expected.
end, err := fp.Seek(0, os.SEEK_END)
// TODO(stevvooe): Store resumable hash calculations in upload directory
// in driver. Something like a file at path <uuid>/resumablehash/<offest>
// with the hash state up to that point would be perfect. The hasher would
// then only have to fetch the difference.
// Read the file from the backend driver and validate it.
fr, err := newFileReader(luc.fileWriter.driver, luc.path)
if err != nil {
return "", err
}
// Only check size if it is greater than
if size >= 0 && end != size {
// Fast path length check.
return "", ErrLayerInvalidSize{Size: size}
}
// Now seek back to start and take care of the digest.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
return "", err
}
tr := io.TeeReader(fp, digestVerifier)
// Only verify the size if a positive size argument has been passed.
if size >= 0 {
tr = io.TeeReader(tr, lengthVerifier)
}
tr := io.TeeReader(fr, digestVerifier)
// TODO(stevvooe): This is one of the places we need a Digester write
// sink. Instead, its read driven. This migth be okay.
// sink. Instead, its read driven. This might be okay.
// Calculate an updated digest with the latest version.
dgst, err = digest.FromReader(tr)
canonical, err := digest.FromReader(tr)
if err != nil {
return "", err
}
if size >= 0 && !lengthVerifier.Verified() {
return "", ErrLayerInvalidSize{Size: size}
}
if !digestVerifier.Verified() {
return "", ErrLayerInvalidDigest{manifest.FSLayer{BlobSum: dgst}}
return "", ErrLayerInvalidDigest{Digest: dgst}
}
return dgst, nil
return canonical, nil
}
// writeLayer actually writes the the layer file into its final destination,
// moveLayer moves the data into its final, hash-qualified destination,
// identified by dgst. The layer should be validated before commencing the
// write.
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
// move.
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
digest: dgst,
})
if err != nil {
return 0, err
return err
}
// Check for existence
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
// TODO(stevvooe): This check is kind of problematic and very racy.
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
default:
// TODO(stevvooe): This isn't actually an error: the blob store is
// content addressable and we should just use this to ensure we
// have it written. Although, we do need to verify that the
// content that is there is the correct length.
return 0, err
return err
}
} else {
// If the path exists, we can assume that the content has already
// been uploaded, since the blob storage is content-addressable.
// While it may be corrupted, detection of such corruption belongs
// elsewhere.
return nil
}
// Seek our local layer file back now.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
// Cleanup?
return 0, err
}
// Okay: we can write the file to the blob store.
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
return luc.driver.Move(luc.path, blobPath)
}
// linkLayer links a valid, written layer blob into the registry under the
@ -337,85 +170,35 @@ func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest))
}
// localFSLayerUploadStore implements a local layerUploadStore. There are some
// complexities around hashsums that make round tripping to the storage
// backend problematic, so we'll store and read locally for now. By GO-beta,
// this should be fully implemented on top of the backend storagedriver.
//
// For now, the directory layout is as follows:
//
// /<temp dir>/registry-layer-upload/
// <uuid>/
// -> state.json
// -> data
//
// Each upload, identified by uuid, has its own directory with a state file
// and a data file. The state file has a json representation of the current
// state. The data file is the in-progress upload data.
type localFSLayerUploadStore struct {
root string
}
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
path, err := ioutil.TempDir("", "registry-layer-upload")
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (luc *layerUploadController) removeResources() error {
dataPath, err := luc.layerStore.pathMapper.path(uploadDataPathSpec{
name: luc.name,
uuid: luc.uuid,
})
if err != nil {
return nil, err
}
return &localFSLayerUploadStore{
root: path,
}, nil
}
func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) {
lus := LayerUploadState{
Name: name,
UUID: uuid.New(),
}
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
return fp, nil
}
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
var lus LayerUploadState
if _, err := os.Stat(llufs.path(uuid, "")); err != nil {
if os.IsNotExist(err) {
return lus, ErrLayerUploadUnknown
}
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
if os.IsNotExist(err) {
return ErrLayerUploadUnknown
}
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := luc.driver.Delete(dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
return filepath.Join(llufs.root, uuid, file)
}

View file

@ -153,6 +153,6 @@ func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) {
panic("not implemented")
}
func (mockedExistenceLayerService) Resume(lus LayerUploadState) (LayerUpload, error) {
func (mockedExistenceLayerService) Resume(name, uuid string) (LayerUpload, error) {
panic("not implemented")
}

View file

@ -23,20 +23,26 @@ const storagePathVersion = "v2"
// <manifests by tag name>
// -> layers/
// <layer links to blob store>
// -> uploads/<uuid>
// data
// startedat
// -> blob/<algorithm>
// <split directory content addressable storage>
//
// There are few important components to this path layout. First, we have the
// repository store identified by name. This contains the image manifests and
// a layer store with links to CAS blob ids. Outside of the named repo area,
// we have the the blob store. It contains the actual layer data and any other
// data that can be referenced by a CAS id.
// a layer store with links to CAS blob ids. Upload coordination data is also
// stored here. Outside of the named repo area, we have the the blob store. It
// contains the actual layer data and any other data that can be referenced by
// a CAS id.
//
// We cover the path formats implemented by this path mapper below.
//
// manifestPathSpec: <root>/v2/repositories/<name>/manifests/<tag>
// layerLinkPathSpec: <root>/v2/repositories/<name>/layers/tarsum/<tarsum version>/<tarsum hash alg>/<tarsum hash>
// blobPathSpec: <root>/v2/blob/<algorithm>/<first two hex bytes of digest>/<hex digest>
// uploadDataPathSpec: <root>/v2/repositories/<name>/uploads/<uuid>/data
// uploadStartedAtPathSpec: <root>/v2/repositories/<name>/uploads/<uuid>/startedat
//
// For more information on the semantic meaning of each path and their
// contents, please see the path spec documentation.
@ -103,6 +109,10 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) {
blobPathPrefix := append(rootPrefix, "blob")
return path.Join(append(blobPathPrefix, components...)...), nil
case uploadDataPathSpec:
return path.Join(append(repoPrefix, v.name, "uploads", v.uuid, "data")...), nil
case uploadStartedAtPathSpec:
return path.Join(append(repoPrefix, v.name, "uploads", v.uuid, "startedat")...), nil
default:
// TODO(sday): This is an internal error. Ensure it doesn't escape (panic?).
return "", fmt.Errorf("unknown path spec: %#v", v)
@ -170,6 +180,29 @@ type blobPathSpec struct {
func (blobPathSpec) pathSpec() {}
// uploadDataPathSpec defines the path parameters of the data file for
// uploads.
type uploadDataPathSpec struct {
name string
uuid string
}
func (uploadDataPathSpec) pathSpec() {}
// uploadDataPathSpec defines the path parameters for the file that stores the
// start time of an uploads. If it is missing, the upload is considered
// unknown. Admittedly, the presence of this file is an ugly hack to make sure
// we have a way to cleanup old or stalled uploads that doesn't rely on driver
// FileInfo behavior. If we come up with a more clever way to do this, we
// should remove this file immediately and rely on the startetAt field from
// the client to enforce time out policies.
type uploadStartedAtPathSpec struct {
name string
uuid string
}
func (uploadStartedAtPathSpec) pathSpec() {}
// digestPathComoponents provides a consistent path breakdown for a given
// digest. For a generic digest, it will be as follows:
//

View file

@ -43,10 +43,18 @@ func TestPathMapper(t *testing.T) {
expected: "/pathmapper-test/blob/tarsum/v1/sha256/ab/abcdefabcdefabcdef908909909",
},
{
spec: blobPathSpec{
digest: digest.Digest("tarsum+sha256:abcdefabcdefabcdef908909909"),
spec: uploadDataPathSpec{
name: "foo/bar",
uuid: "asdf-asdf-asdf-adsf",
},
expected: "/pathmapper-test/blob/tarsum/v0/sha256/ab/abcdefabcdefabcdef908909909",
expected: "/pathmapper-test/repositories/foo/bar/uploads/asdf-asdf-asdf-adsf/data",
},
{
spec: uploadStartedAtPathSpec{
name: "foo/bar",
uuid: "asdf-asdf-asdf-adsf",
},
expected: "/pathmapper-test/repositories/foo/bar/uploads/asdf-asdf-asdf-adsf/startedat",
},
} {
p, err := pm.path(testcase.spec)

View file

@ -11,26 +11,16 @@ import (
type Services struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
layerUploadStore layerUploadStore
}
// NewServices creates a new Services object to access docker objects stored
// in the underlying driver.
func NewServices(driver storagedriver.StorageDriver) *Services {
layerUploadStore, err := newTemporaryLocalFSLayerUploadStore()
if err != nil {
// TODO(stevvooe): This failure needs to be understood in the context
// of the lifecycle of the services object, which is uncertain at this
// point.
panic("unable to allocate layerUploadStore: " + err.Error())
}
return &Services{
driver: driver,
// TODO(sday): This should be configurable.
pathMapper: defaultPathMapper,
layerUploadStore: layerUploadStore,
}
}
@ -38,7 +28,7 @@ func NewServices(driver storagedriver.StorageDriver) *Services {
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (ss *Services) Layers() LayerService {
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore}
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper}
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
@ -78,7 +68,8 @@ type LayerService interface {
// returning a handle.
Upload(name string) (LayerUpload, error)
// Resume continues an in progress layer upload, returning the current
// state of the upload.
Resume(layerUploadState LayerUploadState) (LayerUpload, error)
// Resume continues an in progress layer upload, returning a handle to the
// upload. The caller should seek to the latest desired upload location
// before proceeding.
Resume(name, uuid string) (LayerUpload, error)
}

View file

@ -133,6 +133,11 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn in
return 0, fmt.Errorf("not a file")
}
// Unlock while we are reading from the source, in case we are reading
// from the same mfs instance. This can be fixed by a more granular
// locking model.
d.mutex.Unlock()
d.mutex.RLock() // Take the readlock to block other writers.
var buf bytes.Buffer
nn, err = buf.ReadFrom(reader)
@ -142,9 +147,13 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn in
// backend. What is the correct return value? Really, the caller needs
// to know that the reader has been advanced and reattempting the
// operation is incorrect.
d.mutex.RUnlock()
d.mutex.Lock()
return nn, err
}
d.mutex.RUnlock()
d.mutex.Lock()
f.WriteAt(buf.Bytes(), offset)
return nn, err
}