diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index ab255849..8c0830a2 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -88,6 +88,11 @@ "ImportPath": "github.com/jlhawn/go-crypto", "Rev": "cd738dde20f0b3782516181b0866c9bb9db47401" }, + { + "ImportPath": "github.com/noahdesu/go-ceph/rados", + "Comment": "v.0.3.0-29-gb15639c", + "Rev": "b15639c44c05368348355229070361395d9152ee" + }, { "ImportPath": "github.com/yvasiyarov/go-metrics", "Rev": "57bccd1ccd43f94bb17fdd8bf3007059b802f85e" diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/conn.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/conn.go new file mode 100644 index 00000000..af3cfebe --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/conn.go @@ -0,0 +1,300 @@ +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +import "C" + +import "unsafe" +import "bytes" + +// ClusterStat represents Ceph cluster statistics. +type ClusterStat struct { + Kb uint64 + Kb_used uint64 + Kb_avail uint64 + Num_objects uint64 +} + +// Conn is a connection handle to a Ceph cluster. +type Conn struct { + cluster C.rados_t +} + +// PingMonitor sends a ping to a monitor and returns the reply. +func (c *Conn) PingMonitor(id string) (string, error) { + c_id := C.CString(id) + defer C.free(unsafe.Pointer(c_id)) + + var strlen C.size_t + var strout *C.char + + ret := C.rados_ping_monitor(c.cluster, c_id, &strout, &strlen) + defer C.rados_buffer_free(strout) + + if ret == 0 { + reply := C.GoStringN(strout, (C.int)(strlen)) + return reply, nil + } else { + return "", RadosError(int(ret)) + } +} + +// Connect establishes a connection to a RADOS cluster. It returns an error, +// if any. +func (c *Conn) Connect() error { + ret := C.rados_connect(c.cluster) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Shutdown disconnects from the cluster. +func (c *Conn) Shutdown() { + C.rados_shutdown(c.cluster) +} + +// ReadConfigFile configures the connection using a Ceph configuration file. +func (c *Conn) ReadConfigFile(path string) error { + c_path := C.CString(path) + defer C.free(unsafe.Pointer(c_path)) + ret := C.rados_conf_read_file(c.cluster, c_path) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// ReadDefaultConfigFile configures the connection using a Ceph configuration +// file located at default locations. +func (c *Conn) ReadDefaultConfigFile() error { + ret := C.rados_conf_read_file(c.cluster, nil) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +func (c *Conn) OpenIOContext(pool string) (*IOContext, error) { + c_pool := C.CString(pool) + defer C.free(unsafe.Pointer(c_pool)) + ioctx := &IOContext{} + ret := C.rados_ioctx_create(c.cluster, c_pool, &ioctx.ioctx) + if ret == 0 { + return ioctx, nil + } else { + return nil, RadosError(int(ret)) + } +} + +// ListPools returns the names of all existing pools. +func (c *Conn) ListPools() (names []string, err error) { + buf := make([]byte, 4096) + for { + ret := int(C.rados_pool_list(c.cluster, + (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) + if ret < 0 { + return nil, RadosError(int(ret)) + } + + if ret > len(buf) { + buf = make([]byte, ret) + continue + } + + tmp := bytes.SplitAfter(buf[:ret-1], []byte{0}) + for _, s := range tmp { + if len(s) > 0 { + name := C.GoString((*C.char)(unsafe.Pointer(&s[0]))) + names = append(names, name) + } + } + + return names, nil + } +} + +// SetConfigOption sets the value of the configuration option identified by +// the given name. +func (c *Conn) SetConfigOption(option, value string) error { + c_opt, c_val := C.CString(option), C.CString(value) + defer C.free(unsafe.Pointer(c_opt)) + defer C.free(unsafe.Pointer(c_val)) + ret := C.rados_conf_set(c.cluster, c_opt, c_val) + if ret < 0 { + return RadosError(int(ret)) + } else { + return nil + } +} + +// GetConfigOption returns the value of the Ceph configuration option +// identified by the given name. +func (c *Conn) GetConfigOption(name string) (value string, err error) { + buf := make([]byte, 4096) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_name)) + ret := int(C.rados_conf_get(c.cluster, c_name, + (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) + // FIXME: ret may be -ENAMETOOLONG if the buffer is not large enough. We + // can handle this case, but we need a reliable way to test for + // -ENAMETOOLONG constant. Will the syscall/Errno stuff in Go help? + if ret == 0 { + value = C.GoString((*C.char)(unsafe.Pointer(&buf[0]))) + return value, nil + } else { + return "", RadosError(ret) + } +} + +// WaitForLatestOSDMap blocks the caller until the latest OSD map has been +// retrieved. +func (c *Conn) WaitForLatestOSDMap() error { + ret := C.rados_wait_for_latest_osdmap(c.cluster) + if ret < 0 { + return RadosError(int(ret)) + } else { + return nil + } +} + +// GetClusterStat returns statistics about the cluster associated with the +// connection. +func (c *Conn) GetClusterStats() (stat ClusterStat, err error) { + c_stat := C.struct_rados_cluster_stat_t{} + ret := C.rados_cluster_stat(c.cluster, &c_stat) + if ret < 0 { + return ClusterStat{}, RadosError(int(ret)) + } else { + return ClusterStat{ + Kb: uint64(c_stat.kb), + Kb_used: uint64(c_stat.kb_used), + Kb_avail: uint64(c_stat.kb_avail), + Num_objects: uint64(c_stat.num_objects), + }, nil + } +} + +// ParseCmdLineArgs configures the connection from command line arguments. +func (c *Conn) ParseCmdLineArgs(args []string) error { + // add an empty element 0 -- Ceph treats the array as the actual contents + // of argv and skips the first element (the executable name) + argc := C.int(len(args) + 1) + argv := make([]*C.char, argc) + + // make the first element a string just in case it is ever examined + argv[0] = C.CString("placeholder") + defer C.free(unsafe.Pointer(argv[0])) + + for i, arg := range args { + argv[i+1] = C.CString(arg) + defer C.free(unsafe.Pointer(argv[i+1])) + } + + ret := C.rados_conf_parse_argv(c.cluster, argc, &argv[0]) + if ret < 0 { + return RadosError(int(ret)) + } else { + return nil + } +} + +// ParseDefaultConfigEnv configures the connection from the default Ceph +// environment variable(s). +func (c *Conn) ParseDefaultConfigEnv() error { + ret := C.rados_conf_parse_env(c.cluster, nil) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// GetFSID returns the fsid of the cluster as a hexadecimal string. The fsid +// is a unique identifier of an entire Ceph cluster. +func (c *Conn) GetFSID() (fsid string, err error) { + buf := make([]byte, 37) + ret := int(C.rados_cluster_fsid(c.cluster, + (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) + // FIXME: the success case isn't documented correctly in librados.h + if ret == 36 { + fsid = C.GoString((*C.char)(unsafe.Pointer(&buf[0]))) + return fsid, nil + } else { + return "", RadosError(int(ret)) + } +} + +// GetInstanceID returns a globally unique identifier for the cluster +// connection instance. +func (c *Conn) GetInstanceID() uint64 { + // FIXME: are there any error cases for this? + return uint64(C.rados_get_instance_id(c.cluster)) +} + +// MakePool creates a new pool with default settings. +func (c *Conn) MakePool(name string) error { + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_name)) + ret := int(C.rados_pool_create(c.cluster, c_name)) + if ret == 0 { + return nil + } else { + return RadosError(ret) + } +} + +// DeletePool deletes a pool and all the data inside the pool. +func (c *Conn) DeletePool(name string) error { + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_name)) + ret := int(C.rados_pool_delete(c.cluster, c_name)) + if ret == 0 { + return nil + } else { + return RadosError(ret) + } +} + +// MonCommand sends a command to one of the monitors +func (c *Conn) MonCommand(args []byte) (buffer []byte, info string, err error) { + argv := make([]*C.char, len(args)) + for i, _ := range args { + argv[i] = (*C.char)(unsafe.Pointer(&args[i])) + } + + var ( + outs, outbuf *C.char + outslen, outbuflen C.size_t + ) + inbuf := C.CString("") + defer C.free(unsafe.Pointer(inbuf)) + + ret := C.rados_mon_command(c.cluster, + &argv[0], C.size_t(len(args)), + inbuf, // bulk input (e.g. crush map) + C.size_t(0), // length inbuf + &outbuf, // buffer + &outbuflen, // buffer length + &outs, // status string + &outslen) + + if outslen > 0 { + info = C.GoStringN(outs, C.int(outslen)) + C.free(unsafe.Pointer(outs)) + } + if outbuflen > 0 { + buffer = C.GoBytes(unsafe.Pointer(outbuf), C.int(outbuflen)) + C.free(unsafe.Pointer(outbuf)) + } + if ret != 0 { + err = RadosError(int(ret)) + return nil, info, err + } + + return +} diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/doc.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/doc.go new file mode 100644 index 00000000..14babe93 --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/doc.go @@ -0,0 +1,4 @@ +/* +Set of wrappers around librados API. +*/ +package rados diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/ioctx.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/ioctx.go new file mode 100644 index 00000000..ef67b4fb --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/ioctx.go @@ -0,0 +1,547 @@ +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +import "C" + +import "unsafe" +import "time" + +// PoolStat represents Ceph pool statistics. +type PoolStat struct { + // space used in bytes + Num_bytes uint64 + // space used in KB + Num_kb uint64 + // number of objects in the pool + Num_objects uint64 + // number of clones of objects + Num_object_clones uint64 + // num_objects * num_replicas + Num_object_copies uint64 + Num_objects_missing_on_primary uint64 + // number of objects found on no OSDs + Num_objects_unfound uint64 + // number of objects replicated fewer times than they should be + // (but found on at least one OSD) + Num_objects_degraded uint64 + Num_rd uint64 + Num_rd_kb uint64 + Num_wr uint64 + Num_wr_kb uint64 +} + +// ObjectStat represents an object stat information +type ObjectStat struct { + // current length in bytes + Size uint64 + // last modification time + ModTime time.Time +} + +// IOContext represents a context for performing I/O within a pool. +type IOContext struct { + ioctx C.rados_ioctx_t +} + +// Pointer returns a uintptr representation of the IOContext. +func (ioctx *IOContext) Pointer() uintptr { + return uintptr(ioctx.ioctx) +} + +// Write writes len(data) bytes to the object with key oid starting at byte +// offset offset. It returns an error, if any. +func (ioctx *IOContext) Write(oid string, data []byte, offset uint64) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_write(ioctx.ioctx, c_oid, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data)), + (C.uint64_t)(offset)) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Read reads up to len(data) bytes from the object with key oid starting at byte +// offset offset. It returns the number of bytes read and an error, if any. +func (ioctx *IOContext) Read(oid string, data []byte, offset uint64) (int, error) { + if len(data) == 0 { + return 0, nil + } + + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_read( + ioctx.ioctx, + c_oid, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data)), + (C.uint64_t)(offset)) + + if ret >= 0 { + return int(ret), nil + } else { + return 0, RadosError(int(ret)) + } +} + +// Delete deletes the object with key oid. It returns an error, if any. +func (ioctx *IOContext) Delete(oid string) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_remove(ioctx.ioctx, c_oid) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Truncate resizes the object with key oid to size size. If the operation +// enlarges the object, the new area is logically filled with zeroes. If the +// operation shrinks the object, the excess data is removed. It returns an +// error, if any. +func (ioctx *IOContext) Truncate(oid string, size uint64) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_trunc(ioctx.ioctx, c_oid, (C.uint64_t)(size)) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Destroy informs librados that the I/O context is no longer in use. +// Resources associated with the context may not be freed immediately, and the +// context should not be used again after calling this method. +func (ioctx *IOContext) Destroy() { + C.rados_ioctx_destroy(ioctx.ioctx) +} + +// Stat returns a set of statistics about the pool associated with this I/O +// context. +func (ioctx *IOContext) GetPoolStats() (stat PoolStat, err error) { + c_stat := C.struct_rados_pool_stat_t{} + ret := C.rados_ioctx_pool_stat(ioctx.ioctx, &c_stat) + if ret < 0 { + return PoolStat{}, RadosError(int(ret)) + } else { + return PoolStat{ + Num_bytes: uint64(c_stat.num_bytes), + Num_kb: uint64(c_stat.num_kb), + Num_objects: uint64(c_stat.num_objects), + Num_object_clones: uint64(c_stat.num_object_clones), + Num_object_copies: uint64(c_stat.num_object_copies), + Num_objects_missing_on_primary: uint64(c_stat.num_objects_missing_on_primary), + Num_objects_unfound: uint64(c_stat.num_objects_unfound), + Num_objects_degraded: uint64(c_stat.num_objects_degraded), + Num_rd: uint64(c_stat.num_rd), + Num_rd_kb: uint64(c_stat.num_rd_kb), + Num_wr: uint64(c_stat.num_wr), + Num_wr_kb: uint64(c_stat.num_wr_kb), + }, nil + } +} + +// GetPoolName returns the name of the pool associated with the I/O context. +func (ioctx *IOContext) GetPoolName() (name string, err error) { + buf := make([]byte, 128) + for { + ret := C.rados_ioctx_get_pool_name(ioctx.ioctx, + (*C.char)(unsafe.Pointer(&buf[0])), C.unsigned(len(buf))) + if ret == -34 { // FIXME + buf = make([]byte, len(buf)*2) + continue + } else if ret < 0 { + return "", RadosError(ret) + } + name = C.GoStringN((*C.char)(unsafe.Pointer(&buf[0])), ret) + return name, nil + } +} + +// ObjectListFunc is the type of the function called for each object visited +// by ListObjects. +type ObjectListFunc func(oid string) + +// ListObjects lists all of the objects in the pool associated with the I/O +// context, and called the provided listFn function for each object, passing +// to the function the name of the object. +func (ioctx *IOContext) ListObjects(listFn ObjectListFunc) error { + var ctx C.rados_list_ctx_t + ret := C.rados_objects_list_open(ioctx.ioctx, &ctx) + if ret < 0 { + return RadosError(ret) + } + defer func() { C.rados_objects_list_close(ctx) }() + + for { + var c_entry *C.char + ret := C.rados_objects_list_next(ctx, &c_entry, nil) + if ret == -2 { // FIXME + return nil + } else if ret < 0 { + return RadosError(ret) + } + listFn(C.GoString(c_entry)) + } + + panic("invalid state") +} + +// Stat returns the size of the object and its last modification time +func (ioctx *IOContext) Stat(object string) (stat ObjectStat, err error) { + var c_psize C.uint64_t + var c_pmtime C.time_t + c_object := C.CString(object) + defer C.free(unsafe.Pointer(c_object)) + + ret := C.rados_stat( + ioctx.ioctx, + c_object, + &c_psize, + &c_pmtime) + + if ret < 0 { + return ObjectStat{}, RadosError(int(ret)) + } else { + return ObjectStat{ + Size: uint64(c_psize), + ModTime: time.Unix(int64(c_pmtime), 0), + }, nil + } +} + +// GetXattr gets an xattr with key `name`, it returns the length of +// the key read or an error if not successful +func (ioctx *IOContext) GetXattr(object string, name string, data []byte) (int, error) { + c_object := C.CString(object) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_object)) + defer C.free(unsafe.Pointer(c_name)) + + ret := C.rados_getxattr( + ioctx.ioctx, + c_object, + c_name, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data))) + + if ret >= 0 { + return int(ret), nil + } else { + return 0, RadosError(int(ret)) + } +} + +// Sets an xattr for an object with key `name` with value as `data` +func (ioctx *IOContext) SetXattr(object string, name string, data []byte) error { + c_object := C.CString(object) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_object)) + defer C.free(unsafe.Pointer(c_name)) + + ret := C.rados_setxattr( + ioctx.ioctx, + c_object, + c_name, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data))) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// function that lists all the xattrs for an object, since xattrs are +// a k-v pair, this function returns a map of k-v pairs on +// success, error code on failure +func (ioctx *IOContext) ListXattrs(oid string) (map[string][]byte, error) { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + var it C.rados_xattrs_iter_t + + ret := C.rados_getxattrs(ioctx.ioctx, c_oid, &it) + if ret < 0 { + return nil, RadosError(ret) + } + defer func() { C.rados_getxattrs_end(it) }() + m := make(map[string][]byte) + for { + var c_name, c_val *C.char + var c_len C.size_t + defer C.free(unsafe.Pointer(c_name)) + defer C.free(unsafe.Pointer(c_val)) + + ret := C.rados_getxattrs_next(it, &c_name, &c_val, &c_len) + if ret < 0 { + return nil, RadosError(int(ret)) + } + // rados api returns a null name,val & 0-length upon + // end of iteration + if c_name == nil { + return m, nil // stop iteration + } + m[C.GoString(c_name)] = C.GoBytes(unsafe.Pointer(c_val), (C.int)(c_len)) + } +} + +// Remove an xattr with key `name` from object `oid` +func (ioctx *IOContext) RmXattr(oid string, name string) error { + c_oid := C.CString(oid) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_name)) + + ret := C.rados_rmxattr( + ioctx.ioctx, + c_oid, + c_name) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Append the map `pairs` to the omap `oid` +func (ioctx *IOContext) SetOmap(oid string, pairs map[string][]byte) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + var s C.size_t + var c *C.char + ptrSize := unsafe.Sizeof(c) + + c_keys := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize)) + c_values := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize)) + c_lengths := C.malloc(C.size_t(len(pairs)) * C.size_t(unsafe.Sizeof(s))) + + defer C.free(unsafe.Pointer(c_keys)) + defer C.free(unsafe.Pointer(c_values)) + defer C.free(unsafe.Pointer(c_lengths)) + + i := 0 + for key, value := range pairs { + // key + c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize)) + *c_key_ptr = C.CString(key) + defer C.free(unsafe.Pointer(*c_key_ptr)) + + // value and its length + c_value_ptr := (**C.char)(unsafe.Pointer(uintptr(c_values) + uintptr(i) * ptrSize)) + + var c_length C.size_t + if len(value) > 0 { + *c_value_ptr = (*C.char)(unsafe.Pointer(&value[0])) + c_length = C.size_t(len(value)) + } else { + *c_value_ptr = nil + c_length = C.size_t(0) + } + + c_length_ptr := (*C.size_t)(unsafe.Pointer(uintptr(c_lengths) + uintptr(i) * ptrSize)) + *c_length_ptr = c_length + + i++ + } + + op := C.rados_create_write_op() + C.rados_write_op_omap_set( + op, + (**C.char)(c_keys), + (**C.char)(c_values), + (*C.size_t)(c_lengths), + C.size_t(len(pairs))) + + ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0) + C.rados_release_write_op(op) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// OmapListFunc is the type of the function called for each omap key +// visited by ListOmapValues +type OmapListFunc func(key string, value []byte) + +// Iterate on a set of keys and their values from an omap +// `startAfter`: iterate only on the keys after this specified one +// `filterPrefix`: iterate only on the keys beginning with this prefix +// `maxReturn`: iterate no more than `maxReturn` key/value pairs +// `listFn`: the function called at each iteration +func (ioctx *IOContext) ListOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64, listFn OmapListFunc) error { + c_oid := C.CString(oid) + c_start_after := C.CString(startAfter) + c_filter_prefix := C.CString(filterPrefix) + c_max_return := C.uint64_t(maxReturn) + + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_start_after)) + defer C.free(unsafe.Pointer(c_filter_prefix)) + + op := C.rados_create_read_op() + + var c_iter C.rados_omap_iter_t + var c_prval C.int + C.rados_read_op_omap_get_vals( + op, + c_start_after, + c_filter_prefix, + c_max_return, + &c_iter, + &c_prval, + ) + + ret := C.rados_read_op_operate(op, ioctx.ioctx, c_oid, 0) + + if int(c_prval) != 0 { + return RadosError(int(c_prval)) + } else if int(ret) != 0 { + return RadosError(int(ret)) + } + + for { + var c_key *C.char + var c_val *C.char + var c_len C.size_t + + ret = C.rados_omap_get_next(c_iter, &c_key, &c_val, &c_len) + + if int(ret) != 0 { + return RadosError(int(ret)) + } + + if c_key == nil { + break + } + + listFn(C.GoString(c_key), C.GoBytes(unsafe.Pointer(c_val), C.int(c_len))) + } + + C.rados_omap_get_end(c_iter) + C.rados_release_read_op(op) + + return nil +} + +// Fetch a set of keys and their values from an omap and returns then as a map +// `startAfter`: retrieve only the keys after this specified one +// `filterPrefix`: retrieve only the keys beginning with this prefix +// `maxReturn`: retrieve no more than `maxReturn` key/value pairs +func (ioctx *IOContext) GetOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64) (map[string][]byte, error) { + omap := map[string][]byte{} + + err := ioctx.ListOmapValues( + oid, startAfter, filterPrefix, maxReturn, + func(key string, value []byte) { + omap[key] = value + }, + ) + + return omap, err +} + +// Fetch all the keys and their values from an omap and returns then as a map +// `startAfter`: retrieve only the keys after this specified one +// `filterPrefix`: retrieve only the keys beginning with this prefix +// `iteratorSize`: internal number of keys to fetch during a read operation +func (ioctx *IOContext) GetAllOmapValues(oid string, startAfter string, filterPrefix string, iteratorSize int64) (map[string][]byte, error) { + omap := map[string][]byte{} + omapSize := 0 + + for { + err := ioctx.ListOmapValues( + oid, startAfter, filterPrefix, iteratorSize, + func (key string, value []byte) { + omap[key] = value + startAfter = key + }, + ) + + if err != nil { + return omap, err + } + + // End of omap + if len(omap) == omapSize { + break + } + + omapSize = len(omap) + } + + return omap, nil +} + +// Remove the specified `keys` from the omap `oid` +func (ioctx *IOContext) RmOmapKeys(oid string, keys []string) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + var c *C.char + ptrSize := unsafe.Sizeof(c) + + c_keys := C.malloc(C.size_t(len(keys)) * C.size_t(ptrSize)) + defer C.free(unsafe.Pointer(c_keys)) + + i := 0 + for _, key := range keys { + c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize)) + *c_key_ptr = C.CString(key) + defer C.free(unsafe.Pointer(*c_key_ptr)) + i++ + } + + op := C.rados_create_write_op() + C.rados_write_op_omap_rm_keys( + op, + (**C.char)(c_keys), + C.size_t(len(keys))) + + ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0) + C.rados_release_write_op(op) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Clear the omap `oid` +func (ioctx *IOContext) CleanOmap(oid string) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + op := C.rados_create_write_op() + C.rados_write_op_omap_clear(op) + + ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0) + C.rados_release_write_op(op) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados.go new file mode 100644 index 00000000..935bc248 --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados.go @@ -0,0 +1,54 @@ +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +import "C" + +import ( + "fmt" + "unsafe" +) + +type RadosError int + +func (e RadosError) Error() string { + return fmt.Sprintf("rados: ret=%d", e) +} + +// Version returns the major, minor, and patch components of the version of +// the RADOS library linked against. +func Version() (int, int, int) { + var c_major, c_minor, c_patch C.int + C.rados_version(&c_major, &c_minor, &c_patch) + return int(c_major), int(c_minor), int(c_patch) +} + +// NewConn creates a new connection object. It returns the connection and an +// error, if any. +func NewConn() (*Conn, error) { + conn := &Conn{} + ret := C.rados_create(&conn.cluster, nil) + + if ret == 0 { + return conn, nil + } else { + return nil, RadosError(int(ret)) + } +} + +// NewConnWithUser creates a new connection object with a custom username. +// It returns the connection and an error, if any. +func NewConnWithUser(user string) (*Conn, error) { + c_user := C.CString(user) + defer C.free(unsafe.Pointer(c_user)) + + conn := &Conn{} + ret := C.rados_create(&conn.cluster, c_user) + + if ret == 0 { + return conn, nil + } else { + return nil, RadosError(int(ret)) + } +} diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados_test.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados_test.go new file mode 100644 index 00000000..a31c1872 --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados_test.go @@ -0,0 +1,703 @@ +package rados_test + +import "testing" + +//import "bytes" +import "github.com/noahdesu/go-ceph/rados" +import "github.com/stretchr/testify/assert" +import "os" +import "os/exec" +import "io" +import "io/ioutil" +import "time" +import "net" +import "fmt" +import "sort" +import "encoding/json" + +func GetUUID() string { + out, _ := exec.Command("uuidgen").Output() + return string(out[:36]) +} + +func TestVersion(t *testing.T) { + var major, minor, patch = rados.Version() + assert.False(t, major < 0 || major > 1000, "invalid major") + assert.False(t, minor < 0 || minor > 1000, "invalid minor") + assert.False(t, patch < 0 || patch > 1000, "invalid patch") +} + +func TestGetSetConfigOption(t *testing.T) { + conn, _ := rados.NewConn() + + // rejects invalid options + err := conn.SetConfigOption("wefoijweojfiw", "welfkwjelkfj") + assert.Error(t, err, "Invalid option") + + // verify SetConfigOption changes a values + log_file_val, err := conn.GetConfigOption("log_file") + assert.NotEqual(t, log_file_val, "/dev/null") + + err = conn.SetConfigOption("log_file", "/dev/null") + assert.NoError(t, err, "Invalid option") + + log_file_val, err = conn.GetConfigOption("log_file") + assert.Equal(t, log_file_val, "/dev/null") +} + +func TestParseDefaultConfigEnv(t *testing.T) { + conn, _ := rados.NewConn() + + log_file_val, _ := conn.GetConfigOption("log_file") + assert.NotEqual(t, log_file_val, "/dev/null") + + err := os.Setenv("CEPH_ARGS", "--log-file /dev/null") + assert.NoError(t, err) + + err = conn.ParseDefaultConfigEnv() + assert.NoError(t, err) + + log_file_val, _ = conn.GetConfigOption("log_file") + assert.Equal(t, log_file_val, "/dev/null") +} + +func TestParseCmdLineArgs(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + + mon_host_val, _ := conn.GetConfigOption("mon_host") + assert.NotEqual(t, mon_host_val, "1.1.1.1") + + args := []string{"--mon-host", "1.1.1.1"} + err := conn.ParseCmdLineArgs(args) + assert.NoError(t, err) + + mon_host_val, _ = conn.GetConfigOption("mon_host") + assert.Equal(t, mon_host_val, "1.1.1.1") +} + +func TestGetClusterStats(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + // grab current stats + prev_stat, err := conn.GetClusterStats() + fmt.Printf("prev_stat: %+v\n", prev_stat) + assert.NoError(t, err) + + // make some changes to the cluster + buf := make([]byte, 1<<20) + for i := 0; i < 10; i++ { + objname := GetUUID() + pool.Write(objname, buf, 0) + } + + // wait a while for the stats to change + for i := 0; i < 30; i++ { + stat, err := conn.GetClusterStats() + assert.NoError(t, err) + + // wait for something to change + if stat == prev_stat { + fmt.Printf("curr_stat: %+v (trying again...)\n", stat) + time.Sleep(time.Second) + } else { + // success + fmt.Printf("curr_stat: %+v (change detected)\n", stat) + conn.Shutdown() + return + } + } + + pool.Destroy() + conn.Shutdown() + t.Error("Cluster stats aren't changing") +} + +func TestGetFSID(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + fsid, err := conn.GetFSID() + assert.NoError(t, err) + assert.NotEqual(t, fsid, "") + + conn.Shutdown() +} + +func TestGetInstanceID(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + id := conn.GetInstanceID() + assert.NotEqual(t, id, 0) + + conn.Shutdown() +} + +func TestMakeDeletePool(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // get current list of pool + pools, err := conn.ListPools() + assert.NoError(t, err) + + // check that new pool name is unique + new_name := GetUUID() + for _, poolname := range pools { + if new_name == poolname { + t.Error("Random pool name exists!") + return + } + } + + // create pool + err = conn.MakePool(new_name) + assert.NoError(t, err) + + // get updated list of pools + pools, err = conn.ListPools() + assert.NoError(t, err) + + // verify that the new pool name exists + found := false + for _, poolname := range pools { + if new_name == poolname { + found = true + } + } + + if !found { + t.Error("Cannot find newly created pool") + } + + // delete the pool + err = conn.DeletePool(new_name) + assert.NoError(t, err) + + // verify that it is gone + + // get updated list of pools + pools, err = conn.ListPools() + assert.NoError(t, err) + + // verify that the new pool name exists + found = false + for _, poolname := range pools { + if new_name == poolname { + found = true + } + } + + if found { + t.Error("Deleted pool still exists") + } + + conn.Shutdown() +} + +func TestPingMonitor(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // mon id that should work with vstart.sh + reply, err := conn.PingMonitor("a") + if err == nil { + assert.NotEqual(t, reply, "") + return + } + + // mon id that should work with micro-osd.sh + reply, err = conn.PingMonitor("0") + if err == nil { + assert.NotEqual(t, reply, "") + return + } + + // try to use a hostname as the monitor id + mon_addr, _ := conn.GetConfigOption("mon_host") + hosts, _ := net.LookupAddr(mon_addr) + for _, host := range hosts { + reply, err := conn.PingMonitor(host) + if err == nil { + assert.NotEqual(t, reply, "") + return + } + } + + t.Error("Could not find a valid monitor id") + + conn.Shutdown() +} + +func TestReadConfigFile(t *testing.T) { + conn, _ := rados.NewConn() + + // check current log_file value + log_file_val, err := conn.GetConfigOption("log_file") + assert.NoError(t, err) + assert.NotEqual(t, log_file_val, "/dev/null") + + // create a temporary ceph.conf file that changes the log_file conf + // option. + file, err := ioutil.TempFile("/tmp", "go-rados") + assert.NoError(t, err) + + _, err = io.WriteString(file, "[global]\nlog_file = /dev/null\n") + assert.NoError(t, err) + + // parse the config file + err = conn.ReadConfigFile(file.Name()) + assert.NoError(t, err) + + // check current log_file value + log_file_val, err = conn.GetConfigOption("log_file") + assert.NoError(t, err) + assert.Equal(t, log_file_val, "/dev/null") + + // cleanup + file.Close() + os.Remove(file.Name()) +} + +func TestWaitForLatestOSDMap(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + err := conn.WaitForLatestOSDMap() + assert.NoError(t, err) + + conn.Shutdown() +} + +func TestReadWrite(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // make pool + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + bytes_out := make([]byte, len(bytes_in)) + n_out, err := pool.Read("obj", bytes_out, 0) + + assert.Equal(t, n_out, len(bytes_in)) + assert.Equal(t, bytes_in, bytes_out) + + pool.Destroy() +} + +func TestObjectStat(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + stat, err := pool.Stat("obj") + assert.Equal(t, uint64(len(bytes_in)), stat.Size) + assert.NotNil(t, stat.ModTime) + + pool.Destroy() + conn.Shutdown() +} + +func TestGetPoolStats(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + // grab current stats + prev_stat, err := pool.GetPoolStats() + fmt.Printf("prev_stat: %+v\n", prev_stat) + assert.NoError(t, err) + + // make some changes to the cluster + buf := make([]byte, 1<<20) + for i := 0; i < 10; i++ { + objname := GetUUID() + pool.Write(objname, buf, 0) + } + + // wait a while for the stats to change + for i := 0; i < 30; i++ { + stat, err := pool.GetPoolStats() + assert.NoError(t, err) + + // wait for something to change + if stat == prev_stat { + fmt.Printf("curr_stat: %+v (trying again...)\n", stat) + time.Sleep(time.Second) + } else { + // success + fmt.Printf("curr_stat: %+v (change detected)\n", stat) + conn.Shutdown() + return + } + } + + pool.Destroy() + conn.Shutdown() + t.Error("Pool stats aren't changing") +} + +func TestGetPoolName(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + ioctx, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + poolname_ret, err := ioctx.GetPoolName() + assert.NoError(t, err) + + assert.Equal(t, poolname, poolname_ret) + + ioctx.Destroy() + conn.Shutdown() +} + +func TestMonCommand(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + command, err := json.Marshal(map[string]string{"prefix": "df", "format": "json"}) + assert.NoError(t, err) + + buf, info, err := conn.MonCommand(command) + assert.NoError(t, err) + assert.Equal(t, info, "") + + var message map[string]interface{} + err = json.Unmarshal(buf, &message) + assert.NoError(t, err) + + conn.Shutdown() +} + +func TestObjectIterator(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + ioctx, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + objectList := []string{} + err = ioctx.ListObjects(func(oid string) { + objectList = append(objectList, oid) + }) + assert.NoError(t, err) + assert.True(t, len(objectList) == 0) + + createdList := []string{} + for i := 0; i < 200; i++ { + oid := GetUUID() + bytes_in := []byte("input data") + err = ioctx.Write(oid, bytes_in, 0) + assert.NoError(t, err) + createdList = append(createdList, oid) + } + assert.True(t, len(createdList) == 200) + + err = ioctx.ListObjects(func(oid string) { + objectList = append(objectList, oid) + }) + assert.NoError(t, err) + assert.Equal(t, len(objectList), len(createdList)) + + sort.Strings(objectList) + sort.Strings(createdList) + + assert.Equal(t, objectList, createdList) +} + +func TestNewConnWithUser(t *testing.T) { + _, err := rados.NewConnWithUser("admin") + assert.Equal(t, err, nil) +} + +func TestReadWriteXattr(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // make pool + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + my_xattr_in := []byte("my_value") + err = pool.SetXattr("obj", "my_key", my_xattr_in) + assert.NoError(t, err) + + my_xattr_out := make([]byte, len(my_xattr_in)) + n_out, err := pool.GetXattr("obj", "my_key", my_xattr_out) + + assert.Equal(t, n_out, len(my_xattr_in)) + assert.Equal(t, my_xattr_in, my_xattr_out) + + pool.Destroy() +} + +func TestListXattrs(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // make pool + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + input_xattrs := make(map[string][]byte) + for i := 0; i < 200; i++ { + name := fmt.Sprintf("key_%d", i) + data := []byte(GetUUID()) + err = pool.SetXattr("obj", name, data) + assert.NoError(t, err) + input_xattrs[name] = data + } + + output_xattrs := make(map[string][]byte) + output_xattrs, err = pool.ListXattrs("obj") + assert.NoError(t, err) + assert.Equal(t, len(input_xattrs), len(output_xattrs)) + assert.Equal(t, input_xattrs, output_xattrs) + + pool.Destroy() +} + +func TestRmXattr(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + key := "key1" + val := []byte("val1") + err = pool.SetXattr("obj", key, val) + assert.NoError(t, err) + + key = "key2" + val = []byte("val2") + err = pool.SetXattr("obj", key, val) + assert.NoError(t, err) + + xattr_list := make(map[string][]byte) + xattr_list, err = pool.ListXattrs("obj") + assert.NoError(t, err) + assert.Equal(t, len(xattr_list), 2) + + pool.RmXattr("obj", "key2") + xattr_list, err = pool.ListXattrs("obj") + assert.NoError(t, err) + assert.Equal(t, len(xattr_list), 1) + + found := false + for key, _ = range xattr_list { + if key == "key2" { + found = true + } + + } + + if found { + t.Error("Deleted pool still exists") + } + + pool.Destroy() +} + +func TestReadWriteOmap(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + // Set + orig := map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "prefixed-key3": []byte("value3"), + "empty": []byte(""), + } + + err = pool.SetOmap("obj", orig) + assert.NoError(t, err) + + // List + remaining := map[string][]byte{} + for k, v := range orig { + remaining[k] = v + } + + err = pool.ListOmapValues("obj", "", "", 4, func(key string, value []byte) { + assert.Equal(t, remaining[key], value) + delete(remaining, key) + }) + assert.NoError(t, err) + assert.Equal(t, 0, len(remaining)) + + // Get (with a fixed number of keys) + fetched, err := pool.GetOmapValues("obj", "", "", 4) + assert.NoError(t, err) + assert.Equal(t, orig, fetched) + + // Get All (with an iterator size bigger than the map size) + fetched, err = pool.GetAllOmapValues("obj", "", "", 100) + assert.NoError(t, err) + assert.Equal(t, orig, fetched) + + // Get All (with an iterator size smaller than the map size) + fetched, err = pool.GetAllOmapValues("obj", "", "", 1) + assert.NoError(t, err) + assert.Equal(t, orig, fetched) + + // Remove + err = pool.RmOmapKeys("obj", []string{"key1", "prefixed-key3"}) + assert.NoError(t, err) + + fetched, err = pool.GetOmapValues("obj", "", "", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "key2": []byte("value2"), + "empty": []byte(""), + }, fetched) + + // Clear + err = pool.CleanOmap("obj") + assert.NoError(t, err) + + fetched, err = pool.GetOmapValues("obj", "", "", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{}, fetched) + + pool.Destroy() +} + +func TestReadFilterOmap(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + orig := map[string][]byte{ + "key1": []byte("value1"), + "prefixed-key3": []byte("value3"), + "key2": []byte("value2"), + } + + err = pool.SetOmap("obj", orig) + assert.NoError(t, err) + + // filter by prefix + fetched, err := pool.GetOmapValues("obj", "", "prefixed", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "prefixed-key3": []byte("value3"), + }, fetched) + + // "start_after" a key + fetched, err = pool.GetOmapValues("obj", "key1", "", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "prefixed-key3": []byte("value3"), + "key2": []byte("value2"), + }, fetched) + + // maxReturn + fetched, err = pool.GetOmapValues("obj", "", "key", 1) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "key1": []byte("value1"), + }, fetched) + + pool.Destroy() +} + diff --git a/Makefile b/Makefile index 974d0191..32cd23aa 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ version/version.go: ${PREFIX}/bin/registry: version/version.go $(shell find . -type f -name '*.go') @echo "+ $@" - @go build -o $@ ${GO_LDFLAGS} ./cmd/registry + @go build -tags "${DOCKER_BUILDTAGS}" -o $@ ${GO_LDFLAGS} ./cmd/registry ${PREFIX}/bin/registry-api-descriptor-template: version/version.go $(shell find . -type f -name '*.go') @echo "+ $@" @@ -46,11 +46,11 @@ lint: build: @echo "+ $@" - @go build -v ${GO_LDFLAGS} ./... + @go build -tags "${DOCKER_BUILDTAGS}" -v ${GO_LDFLAGS} ./... test: @echo "+ $@" - @go test -test.short ./... + @go test -test.short -tags "${DOCKER_BUILDTAGS}" ./... test-full: @echo "+ $@" diff --git a/circle.yml b/circle.yml index b841cdec..18f95148 100644 --- a/circle.yml +++ b/circle.yml @@ -3,6 +3,9 @@ machine: pre: # Install gvm - bash < <(curl -s -S -L https://raw.githubusercontent.com/moovweb/gvm/1.0.22/binscripts/gvm-installer) + # Install ceph to test rados driver & create pool + - sudo -i ~/distribution/contrib/ceph/ci-setup.sh + - ceph osd pool create docker-distribution 1 post: # Install many go versions @@ -18,8 +21,11 @@ machine: BASE_OLD: ../../../$HOME/.gvm/pkgsets/old/global/$BASE_DIR BASE_STABLE: ../../../$HOME/.gvm/pkgsets/stable/global/$BASE_DIR # BASE_BLEED: ../../../$HOME/.gvm/pkgsets/bleed/global/$BASE_DIR + DOCKER_BUILDTAGS: "include_rados" # Workaround Circle parsing dumb bugs and/or YAML wonkyness CIRCLE_PAIN: "mode: set" + # Ceph config + RADOS_POOL: "docker-distribution" hosts: # Not used yet @@ -95,7 +101,7 @@ test: - gvm use stable; go list ./... | xargs -L 1 -I{} rm -f $GOPATH/src/{}/coverage.out: pwd: $BASE_STABLE - - gvm use stable; go list ./... | xargs -L 1 -I{} godep go test -test.short -coverprofile=$GOPATH/src/{}/coverage.out {}: + - gvm use stable; go list ./... | xargs -L 1 -I{} godep go test -tags "$DOCKER_BUILDTAGS" -test.short -coverprofile=$GOPATH/src/{}/coverage.out {}: timeout: 600 pwd: $BASE_STABLE diff --git a/cmd/registry/rados.go b/cmd/registry/rados.go new file mode 100644 index 00000000..e7ea770a --- /dev/null +++ b/cmd/registry/rados.go @@ -0,0 +1,5 @@ +// +build include_rados + +package main + +import _ "github.com/docker/distribution/registry/storage/driver/rados" diff --git a/contrib/ceph/ci-setup.sh b/contrib/ceph/ci-setup.sh new file mode 100755 index 00000000..200d69d9 --- /dev/null +++ b/contrib/ceph/ci-setup.sh @@ -0,0 +1,119 @@ +#! /bin/bash +# +# Ceph cluster setup in Circle CI +# + +set -x +set -e +set -u + +NODE=$(hostname) +CEPHDIR=/tmp/ceph + +mkdir cluster +pushd cluster + +# Install +retries=0 +until [ $retries -ge 5 ]; do + pip install ceph-deploy && break + retries=$[$retries+1] + sleep 30 +done + +retries=0 +until [ $retries -ge 5 ]; do + ceph-deploy install --release hammer $NODE && break + retries=$[$retries+1] + sleep 30 +done + +retries=0 +until [ $retries -ge 5 ]; do + ceph-deploy pkg --install librados-dev $NODE && break + retries=$[$retries+1] + sleep 30 +done + +echo $(ip route get 1 | awk '{print $NF;exit}') $(hostname) >> /etc/hosts +ssh-keygen -t rsa -f ~/.ssh/id_rsa -q -N "" +cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys +ssh-keyscan $NODE >> ~/.ssh/known_hosts +ceph-deploy new $NODE + +cat >> ceph.conf < +### rados + +This storage backend uses [Ceph Object Storage](http://ceph.com/docs/master/rados/). + + + + + + + + + + + + + + + + + + + + + + +
ParameterRequiredDescription
+ poolname + + yes + + Ceph pool name. +
+ username + + no + + Ceph cluster user to connect as (i.e. admin, not client.admin). +
+ chunksize + + no + + Size of the written RADOS objects. Default value is 4MB (4194304). +
+ ### S3 diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 84465820..c888ed4e 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -10,4 +10,5 @@ - ['registry/storage-drivers/azure.md', '**HIDDEN**' ] - ['registry/storage-drivers/filesystem.md', '**HIDDEN**' ] - ['registry/storage-drivers/inmemory.md', '**HIDDEN**' ] +- ['registry/storage-drivers/rados.md', '**HIDDEN**' ] - ['registry/storage-drivers/s3.md','**HIDDEN**' ] diff --git a/docs/storage-drivers/rados.md b/docs/storage-drivers/rados.md new file mode 100644 index 00000000..41f51c33 --- /dev/null +++ b/docs/storage-drivers/rados.md @@ -0,0 +1,37 @@ + + +# Ceph RADOS storage driver + +An implementation of the `storagedriver.StorageDriver` interface which uses +[Ceph RADOS Object Storage][rados] for storage backend. + +## Parameters + +The following parameters must be used to configure the storage driver +(case-sensitive): + +* `poolname`: Name of the Ceph pool +* `username` *optional*: The user to connect as (i.e. admin, not client.admin) +* `chunksize` *optional*: Size of the written RADOS objects. Default value is +4MB (4194304). + +This drivers loads the [Ceph client configuration][rados-config] from the +following regular paths (the first found is used): + +* `$CEPH_CONF` (environment variable) +* `/etc/ceph/ceph.conf` +* `~/.ceph/config` +* `ceph.conf` (in the current working directory) + +## Developing + +To include this driver when building Docker Distribution, use the build tag +`include_rados`. Please see the [building documentation][building] for details. + +[rados]: http://ceph.com/docs/master/rados/ +[rados-config]: http://ceph.com/docs/master/rados/configuration/ceph-conf/ +[building]: https://github.com/docker/distribution/blob/master/docs/building.md#optional-build-tags diff --git a/docs/storagedrivers.md b/docs/storagedrivers.md index e476457d..d18ea73b 100644 --- a/docs/storagedrivers.md +++ b/docs/storagedrivers.md @@ -16,6 +16,7 @@ This storage driver package comes bundled with several drivers: - [filesystem](storage-drivers/filesystem.md): A local storage driver configured to use a directory tree in the local filesystem. - [s3](storage-drivers/s3.md): A driver storing objects in an Amazon Simple Storage Solution (S3) bucket. - [azure](storage-drivers/azure.md): A driver storing objects in [Microsoft Azure Blob Storage](http://azure.microsoft.com/en-us/services/storage/). +- [rados](storage-drivers/rados.md): A driver storing objects in a [Ceph Object Storage](http://ceph.com/docs/master/rados/) pool. ## Storage Driver API diff --git a/registry/storage/driver/rados/rados.go b/registry/storage/driver/rados/rados.go new file mode 100644 index 00000000..999b06b0 --- /dev/null +++ b/registry/storage/driver/rados/rados.go @@ -0,0 +1,628 @@ +package rados + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "path" + "strconv" + + "code.google.com/p/go-uuid/uuid" + log "github.com/Sirupsen/logrus" + "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/base" + "github.com/docker/distribution/registry/storage/driver/factory" + "github.com/noahdesu/go-ceph/rados" +) + +const driverName = "rados" + +// Prefix all the stored blob +const objectBlobPrefix = "blob:" + +// Stripes objects size to 4M +const defaultChunkSize = 4 << 20 +const defaultXattrTotalSizeName = "total-size" + +// Max number of keys fetched from omap at each read operation +const defaultKeysFetched = 1 + +//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set +type DriverParameters struct { + poolname string + username string + chunksize uint64 +} + +func init() { + factory.Register(driverName, &radosDriverFactory{}) +} + +// radosDriverFactory implements the factory.StorageDriverFactory interface +type radosDriverFactory struct{} + +func (factory *radosDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + return FromParameters(parameters) +} + +type driver struct { + Conn *rados.Conn + Ioctx *rados.IOContext + chunksize uint64 +} + +type baseEmbed struct { + base.Base +} + +// Driver is a storagedriver.StorageDriver implementation backed by Ceph RADOS +// Objects are stored at absolute keys in the provided bucket. +type Driver struct { + baseEmbed +} + +// FromParameters constructs a new Driver with a given parameters map +// Required parameters: +// - poolname: the ceph pool name +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + + pool, ok := parameters["poolname"] + if !ok { + return nil, fmt.Errorf("No poolname parameter provided") + } + + username, ok := parameters["username"] + if !ok { + username = "" + } + + chunksize := uint64(defaultChunkSize) + chunksizeParam, ok := parameters["chunksize"] + if ok { + chunksize, ok = chunksizeParam.(uint64) + if !ok { + return nil, fmt.Errorf("The chunksize parameter should be a number") + } + } + + params := DriverParameters{ + fmt.Sprint(pool), + fmt.Sprint(username), + chunksize, + } + + return New(params) +} + +// New constructs a new Driver +func New(params DriverParameters) (*Driver, error) { + var conn *rados.Conn + var err error + + if params.username != "" { + log.Infof("Opening connection to pool %s using user %s", params.poolname, params.username) + conn, err = rados.NewConnWithUser(params.username) + } else { + log.Infof("Opening connection to pool %s", params.poolname) + conn, err = rados.NewConn() + } + + if err != nil { + return nil, err + } + + err = conn.ReadDefaultConfigFile() + if err != nil { + return nil, err + } + + err = conn.Connect() + if err != nil { + return nil, err + } + + log.Infof("Connected") + + ioctx, err := conn.OpenIOContext(params.poolname) + + log.Infof("Connected to pool %s", params.poolname) + + if err != nil { + return nil, err + } + + d := &driver{ + Ioctx: ioctx, + Conn: conn, + chunksize: params.chunksize, + } + + return &Driver{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: d, + }, + }, + }, nil +} + +// Implement the storagedriver.StorageDriver interface + +func (d *driver) Name() string { + return driverName +} + +// GetContent retrieves the content stored at "path" as a []byte. +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { + rc, err := d.ReadStream(ctx, path, 0) + if err != nil { + return nil, err + } + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + + return p, nil +} + +// PutContent stores the []byte content at a location designated by "path". +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { + if _, err := d.WriteStream(ctx, path, 0, bytes.NewReader(contents)); err != nil { + return err + } + + return nil +} + +// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// given byte offset. +type readStreamReader struct { + driver *driver + oid string + size uint64 + offset uint64 +} + +func (r *readStreamReader) Read(b []byte) (n int, err error) { + // Determine the part available to read + bufferOffset := uint64(0) + bufferSize := uint64(len(b)) + + // End of the object, read less than the buffer size + if bufferSize > r.size-r.offset { + bufferSize = r.size - r.offset + } + + // Fill `b` + for bufferOffset < bufferSize { + // Get the offset in the object chunk + chunkedOid, chunkedOffset := r.driver.getChunkNameFromOffset(r.oid, r.offset) + + // Determine the best size to read + bufferEndOffset := bufferSize + if bufferEndOffset-bufferOffset > r.driver.chunksize-chunkedOffset { + bufferEndOffset = bufferOffset + (r.driver.chunksize - chunkedOffset) + } + + // Read the chunk + n, err = r.driver.Ioctx.Read(chunkedOid, b[bufferOffset:bufferEndOffset], chunkedOffset) + + if err != nil { + return int(bufferOffset), err + } + + bufferOffset += uint64(n) + r.offset += uint64(n) + } + + // EOF if the offset is at the end of the object + if r.offset == r.size { + return int(bufferOffset), io.EOF + } + + return int(bufferOffset), nil +} + +func (r *readStreamReader) Close() error { + return nil +} + +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + // get oid from filename + oid, err := d.getOid(path) + + if err != nil { + return nil, err + } + + // get object stat + stat, err := d.Stat(ctx, path) + + if err != nil { + return nil, err + } + + if offset > stat.Size() { + return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + return &readStreamReader{ + driver: d, + oid: oid, + size: uint64(stat.Size()), + offset: uint64(offset), + }, nil +} + +func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { + buf := make([]byte, d.chunksize) + totalRead = 0 + + oid, err := d.getOid(path) + if err != nil { + switch err.(type) { + // Trying to write new object, generate new blob identifier for it + case storagedriver.PathNotFoundError: + oid = d.generateOid() + err = d.putOid(path, oid) + if err != nil { + return 0, err + } + default: + return 0, err + } + } else { + // Check total object size only for existing ones + totalSize, err := d.getXattrTotalSize(ctx, oid) + if err != nil { + return 0, err + } + + // If offset if after the current object size, fill the gap with zeros + for totalSize < uint64(offset) { + sizeToWrite := d.chunksize + if totalSize-uint64(offset) < sizeToWrite { + sizeToWrite = totalSize - uint64(offset) + } + + chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(totalSize)) + err = d.Ioctx.Write(chunkName, buf[:sizeToWrite], uint64(chunkOffset)) + if err != nil { + return totalRead, err + } + + totalSize += sizeToWrite + } + } + + // Writer + for { + // Align to chunk size + sizeRead := uint64(0) + sizeToRead := uint64(offset+totalRead) % d.chunksize + if sizeToRead == 0 { + sizeToRead = d.chunksize + } + + // Read from `reader` + for sizeRead < sizeToRead { + nn, err := reader.Read(buf[sizeRead:sizeToRead]) + sizeRead += uint64(nn) + + if err != nil { + if err != io.EOF { + return totalRead, err + } + + break + } + } + + // End of file and nothing was read + if sizeRead == 0 { + break + } + + // Write chunk object + chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(offset+totalRead)) + err = d.Ioctx.Write(chunkName, buf[:sizeRead], uint64(chunkOffset)) + + if err != nil { + return totalRead, err + } + + // Update total object size as xattr in the first chunk of the object + err = d.setXattrTotalSize(oid, uint64(offset+totalRead)+sizeRead) + if err != nil { + return totalRead, err + } + + totalRead += int64(sizeRead) + + // End of file + if sizeRead < sizeToRead { + break + } + } + + return totalRead, nil +} + +// Stat retrieves the FileInfo for the given path, including the current size +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + // get oid from filename + oid, err := d.getOid(path) + + if err != nil { + return nil, err + } + + // the path is a virtual directory? + if oid == "" { + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: 0, + IsDir: true, + }, + }, nil + } + + // stat first chunk + stat, err := d.Ioctx.Stat(oid + "-0") + + if err != nil { + return nil, err + } + + // get total size of chunked object + totalSize, err := d.getXattrTotalSize(ctx, oid) + + if err != nil { + return nil, err + } + + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: int64(totalSize), + ModTime: stat.ModTime, + }, + }, nil +} + +// List returns a list of the objects that are direct descendants of the given path. +func (d *driver) List(ctx context.Context, dirPath string) ([]string, error) { + files, err := d.listDirectoryOid(dirPath) + + if err != nil { + return nil, err + } + + keys := make([]string, 0, len(files)) + for k := range files { + keys = append(keys, path.Join(dirPath, k)) + } + + return keys, nil +} + +// Move moves an object stored at sourcePath to destPath, removing the original +// object. +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { + // Get oid + oid, err := d.getOid(sourcePath) + + if err != nil { + return err + } + + // Move reference + err = d.putOid(destPath, oid) + + if err != nil { + return err + } + + // Delete old reference + err = d.deleteOid(sourcePath) + + if err != nil { + return err + } + + return nil +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (d *driver) Delete(ctx context.Context, objectPath string) error { + // Get oid + oid, err := d.getOid(objectPath) + + if err != nil { + return err + } + + // Deleting virtual directory + if oid == "" { + objects, err := d.listDirectoryOid(objectPath) + if err != nil { + return err + } + + for object := range objects { + err = d.Delete(ctx, path.Join(objectPath, object)) + if err != nil { + return err + } + } + } else { + // Delete object chunks + totalSize, err := d.getXattrTotalSize(ctx, oid) + + if err != nil { + return err + } + + for offset := uint64(0); offset < totalSize; offset += d.chunksize { + chunkName, _ := d.getChunkNameFromOffset(oid, offset) + + err = d.Ioctx.Delete(chunkName) + if err != nil { + return err + } + } + + // Delete reference + err = d.deleteOid(objectPath) + if err != nil { + return err + } + } + + return nil +} + +// URLFor returns a URL which may be used to retrieve the content stored at the given path. +// May return an UnsupportedMethodErr in certain StorageDriver implementations. +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + return "", storagedriver.ErrUnsupportedMethod +} + +// Generate a blob identifier +func (d *driver) generateOid() string { + return objectBlobPrefix + uuid.New() +} + +// Reference a object and its hierarchy +func (d *driver) putOid(objectPath string, oid string) error { + directory := path.Dir(objectPath) + base := path.Base(objectPath) + createParentReference := true + + // After creating this reference, skip the parents referencing since the + // hierarchy already exists + if oid == "" { + firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) + if (err == nil) && (len(firstReference) > 0) { + createParentReference = false + } + } + + oids := map[string][]byte{ + base: []byte(oid), + } + + // Reference object + err := d.Ioctx.SetOmap(directory, oids) + if err != nil { + return err + } + + // Esure parent virtual directories + if createParentReference && directory != "/" { + return d.putOid(directory, "") + } + + return nil +} + +// Get the object identifier from an object name +func (d *driver) getOid(objectPath string) (string, error) { + directory := path.Dir(objectPath) + base := path.Base(objectPath) + + files, err := d.Ioctx.GetOmapValues(directory, "", base, 1) + + if (err != nil) || (files[base] == nil) { + return "", storagedriver.PathNotFoundError{Path: objectPath} + } + + return string(files[base]), nil +} + +// List the objects of a virtual directory +func (d *driver) listDirectoryOid(path string) (list map[string][]byte, err error) { + return d.Ioctx.GetAllOmapValues(path, "", "", defaultKeysFetched) +} + +// Remove a file from the files hierarchy +func (d *driver) deleteOid(objectPath string) error { + // Remove object reference + directory := path.Dir(objectPath) + base := path.Base(objectPath) + err := d.Ioctx.RmOmapKeys(directory, []string{base}) + + if err != nil { + return err + } + + // Remove virtual directory if empty (no more references) + firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) + + if err != nil { + return err + } + + if len(firstReference) == 0 { + // Delete omap + err := d.Ioctx.Delete(directory) + + if err != nil { + return err + } + + // Remove reference on parent omaps + if directory != "/" { + return d.deleteOid(directory) + } + } + + return nil +} + +// Takes an offset in an chunked object and return the chunk name and a new +// offset in this chunk object +func (d *driver) getChunkNameFromOffset(oid string, offset uint64) (string, uint64) { + chunkID := offset / d.chunksize + chunkedOid := oid + "-" + strconv.FormatInt(int64(chunkID), 10) + chunkedOffset := offset % d.chunksize + return chunkedOid, chunkedOffset +} + +// Set the total size of a chunked object `oid` +func (d *driver) setXattrTotalSize(oid string, size uint64) error { + // Convert uint64 `size` to []byte + xattr := make([]byte, binary.MaxVarintLen64) + binary.LittleEndian.PutUint64(xattr, size) + + // Save the total size as a xattr in the first chunk + return d.Ioctx.SetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) +} + +// Get the total size of the chunked object `oid` stored as xattr +func (d *driver) getXattrTotalSize(ctx context.Context, oid string) (uint64, error) { + // Fetch xattr as []byte + xattr := make([]byte, binary.MaxVarintLen64) + xattrLength, err := d.Ioctx.GetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) + + if err != nil { + return 0, err + } + + if xattrLength != len(xattr) { + context.GetLogger(ctx).Errorf("object %s xattr length mismatch: %d != %d", oid, xattrLength, len(xattr)) + return 0, storagedriver.PathNotFoundError{Path: oid} + } + + // Convert []byte as uint64 + totalSize := binary.LittleEndian.Uint64(xattr) + + return totalSize, nil +} diff --git a/registry/storage/driver/rados/rados_test.go b/registry/storage/driver/rados/rados_test.go new file mode 100644 index 00000000..29486e89 --- /dev/null +++ b/registry/storage/driver/rados/rados_test.go @@ -0,0 +1,38 @@ +package rados + +import ( + "os" + "testing" + + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/testsuites" + + "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { check.TestingT(t) } + +func init() { + poolname := os.Getenv("RADOS_POOL") + username := os.Getenv("RADOS_USER") + + driverConstructor := func() (storagedriver.StorageDriver, error) { + parameters := DriverParameters{ + poolname, + username, + defaultChunkSize, + } + + return New(parameters) + } + + skipCheck := func() string { + if poolname == "" { + return "RADOS_POOL must be set to run Rado tests" + } + return "" + } + + testsuites.RegisterInProcessSuite(driverConstructor, skipCheck) +}