Allow to resume badger migration using a given key
This commit is contained in:
parent
f7c33d0878
commit
f7da9a6f30
1 changed files with 131 additions and 128 deletions
|
@ -2,11 +2,11 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
@ -62,6 +62,7 @@ func main() {
|
||||||
var v1, v2 bool
|
var v1, v2 bool
|
||||||
var dir, valueDir string
|
var dir, valueDir string
|
||||||
var typ, database string
|
var typ, database string
|
||||||
|
var key string
|
||||||
|
|
||||||
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
|
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
|
||||||
|
|
||||||
|
@ -71,26 +72,34 @@ func main() {
|
||||||
fs.StringVar(&valueDir, "value-dir", "", "badger database value directory")
|
fs.StringVar(&valueDir, "value-dir", "", "badger database value directory")
|
||||||
fs.StringVar(&typ, "type", "", "the destination database type to use")
|
fs.StringVar(&typ, "type", "", "the destination database type to use")
|
||||||
fs.StringVar(&database, "database", "", "the destination driver-specific data source name")
|
fs.StringVar(&database, "database", "", "the destination driver-specific data source name")
|
||||||
|
fs.StringVar(&key, "key", "", "the key used to resume the migration")
|
||||||
fs.Usage = func() { usage(fs) }
|
fs.Usage = func() { usage(fs) }
|
||||||
fs.Parse(os.Args[1:])
|
fs.Parse(os.Args[1:])
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case v1 == v2:
|
case v1 == v2:
|
||||||
fatal("flag --v1 or --v2 are required")
|
fatal("flag -v1 or -v2 are required")
|
||||||
case dir == "":
|
case dir == "":
|
||||||
fatal("flag --dir is required")
|
fatal("flag -dir is required")
|
||||||
case typ != "postgresql" && typ != "mysql":
|
case typ != "postgresql" && typ != "mysql":
|
||||||
fatal(`flag --type must be "postgresql" or "mysql"`)
|
fatal(`flag -type must be "postgresql" or "mysql"`)
|
||||||
case database == "":
|
case database == "":
|
||||||
fatal("flag --database required")
|
fatal("flag --database required")
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
v1DB *badgerv1.DB
|
v1DB *badgerv1.DB
|
||||||
v2DB *badgerv2.DB
|
v2DB *badgerv2.DB
|
||||||
|
lastKey []byte
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if key != "" {
|
||||||
|
if lastKey, err = base64.StdEncoding.DecodeString(key); err != nil {
|
||||||
|
fatal("error decoding key: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if v1 {
|
if v1 {
|
||||||
if v1DB, err = badgerV1Open(dir, valueDir); err != nil {
|
if v1DB, err = badgerV1Open(dir, valueDir); err != nil {
|
||||||
fatal("error opening badger v1 database: %v", err)
|
fatal("error opening badger v1 database: %v", err)
|
||||||
|
@ -109,30 +118,55 @@ func main() {
|
||||||
allTables := append([]string{}, authorityTables...)
|
allTables := append([]string{}, authorityTables...)
|
||||||
allTables = append(allTables, acmeTables...)
|
allTables = append(allTables, acmeTables...)
|
||||||
|
|
||||||
for _, table := range allTables {
|
// Convert prefix names to badger key prefixes
|
||||||
|
badgerKeys := make([][]byte, len(allTables))
|
||||||
|
for i, name := range allTables {
|
||||||
|
badgerKeys[i], err = badgerEncode([]byte(name))
|
||||||
|
if err != nil {
|
||||||
|
fatal("error encoding table %s: %v", name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, prefix := range badgerKeys {
|
||||||
|
table := allTables[i]
|
||||||
|
|
||||||
|
// With a key flag, resume from that table and prefix
|
||||||
|
if lastKey != nil {
|
||||||
|
bucket, _ := parseBadgerEncode(lastKey)
|
||||||
|
if table != string(bucket) {
|
||||||
|
fmt.Printf("skipping table %s\n", table)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Continue with a new prefix
|
||||||
|
prefix = lastKey
|
||||||
|
lastKey = nil
|
||||||
|
}
|
||||||
|
|
||||||
var n int64
|
var n int64
|
||||||
fmt.Printf("migrating %s ...\n", table)
|
fmt.Printf("migrating %s ...", table)
|
||||||
if err := db.CreateTable([]byte(table)); err != nil {
|
if err := db.CreateTable([]byte(table)); err != nil {
|
||||||
fatal("error creating table %s: %v", table, err)
|
fatal("error creating table %s: %v", table, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v1 {
|
if v1 {
|
||||||
if err := badgerV1Iterate(v1DB, []byte(table), func(bucket, key, value []byte) error {
|
if badgerKey, err := badgerV1Iterate(v1DB, prefix, func(bucket, key, value []byte) error {
|
||||||
n++
|
n++
|
||||||
return db.Set(bucket, key, value)
|
return db.Set(bucket, key, value)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
fatal("error inserting into %s: %v", table, err)
|
fmt.Println()
|
||||||
|
fatal("error inserting into %s: %v\nLast key: %s", table, err, base64.StdEncoding.EncodeToString(badgerKey))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := badgerV2Iterate(v2DB, []byte(table), func(bucket, key, value []byte) error {
|
if badgerKey, err := badgerV2Iterate(v2DB, prefix, func(bucket, key, value []byte) error {
|
||||||
n++
|
n++
|
||||||
return db.Set(bucket, key, value)
|
return db.Set(bucket, key, value)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
fatal("error inserting into %s: %v", table, err)
|
fmt.Println()
|
||||||
|
fatal("error inserting into %s: %v\nLast key: %s", table, err, base64.StdEncoding.EncodeToString(badgerKey))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("%d rows\n", n)
|
fmt.Printf(" %d rows\n", n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,95 +192,70 @@ func badgerV2Open(dir, valueDir string) (*badgerv2.DB, error) {
|
||||||
return badgerv2.Open(opts)
|
return badgerv2.Open(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func badgerV1Iterate(db *badgerv1.DB, table []byte, fn func(table, key, value []byte) error) error {
|
type Iterator interface {
|
||||||
return db.View(func(txn *badgerv1.Txn) error {
|
Seek([]byte)
|
||||||
var tableExists bool
|
ValidForPrefix([]byte) bool
|
||||||
|
Next()
|
||||||
it := txn.NewIterator(badgerv1.DefaultIteratorOptions)
|
|
||||||
defer it.Close()
|
|
||||||
|
|
||||||
prefix, err := badgerEncode(table)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
|
|
||||||
tableExists = true
|
|
||||||
item := it.Item()
|
|
||||||
bk := item.KeyCopy(nil)
|
|
||||||
if isBadgerTable(bk) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
bucket, key, err := fromBadgerKey(bk)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error converting from badger key %s", bk)
|
|
||||||
}
|
|
||||||
if !bytes.Equal(table, bucket) {
|
|
||||||
return fmt.Errorf("bucket names do not match; want %s, but got %s", table, bucket)
|
|
||||||
}
|
|
||||||
|
|
||||||
v, err := item.ValueCopy(nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error retrieving contents from database value: %w", err)
|
|
||||||
}
|
|
||||||
value := cloneBytes(v)
|
|
||||||
|
|
||||||
if err := fn(bucket, key, value); err != nil {
|
|
||||||
return fmt.Errorf("error exporting %s[%s]=%v", table, key, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !tableExists {
|
|
||||||
fmt.Printf("bucket %s not found\n", table)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func badgerV2Iterate(db *badgerv2.DB, table []byte, fn func(table, key, value []byte) error) error {
|
type Item interface {
|
||||||
return db.View(func(txn *badgerv2.Txn) error {
|
KeyCopy([]byte) []byte
|
||||||
var tableExists bool
|
ValueCopy([]byte) ([]byte, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func badgerV1Iterate(db *badgerv1.DB, prefix []byte, fn func(bucket, key, value []byte) error) (badgerKey []byte, err error) {
|
||||||
|
err = db.View(func(txn *badgerv1.Txn) error {
|
||||||
|
it := txn.NewIterator(badgerv1.DefaultIteratorOptions)
|
||||||
|
defer it.Close()
|
||||||
|
badgerKey, err = badgerIterate(it, prefix, fn)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func badgerV2Iterate(db *badgerv2.DB, prefix []byte, fn func(bucket, key, value []byte) error) (badgerKey []byte, err error) {
|
||||||
|
err = db.View(func(txn *badgerv2.Txn) error {
|
||||||
it := txn.NewIterator(badgerv2.DefaultIteratorOptions)
|
it := txn.NewIterator(badgerv2.DefaultIteratorOptions)
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
|
badgerKey, err = badgerIterate(it, prefix, fn)
|
||||||
prefix, err := badgerEncode(table)
|
return err
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
|
|
||||||
tableExists = true
|
|
||||||
item := it.Item()
|
|
||||||
bk := item.KeyCopy(nil)
|
|
||||||
if isBadgerTable(bk) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
bucket, key, err := fromBadgerKey(bk)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error converting from badgerKey %s: %w", bk, err)
|
|
||||||
}
|
|
||||||
if !bytes.Equal(table, bucket) {
|
|
||||||
return fmt.Errorf("bucket names do not match; want %s, but got %s", table, bucket)
|
|
||||||
}
|
|
||||||
|
|
||||||
v, err := item.ValueCopy(nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error retrieving contents from database value: %w", err)
|
|
||||||
}
|
|
||||||
value := cloneBytes(v)
|
|
||||||
|
|
||||||
if err := fn(bucket, key, value); err != nil {
|
|
||||||
return fmt.Errorf("error exporting %s[%s]=%v", table, key, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !tableExists {
|
|
||||||
log.Printf("bucket %s not found", table)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func badgerIterate(it Iterator, prefix []byte, fn func(bucket, key, value []byte) error) ([]byte, error) {
|
||||||
|
var badgerKey []byte
|
||||||
|
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
|
||||||
|
var item Item
|
||||||
|
switch itt := it.(type) {
|
||||||
|
case *badgerv1.Iterator:
|
||||||
|
item = itt.Item()
|
||||||
|
case *badgerv2.Iterator:
|
||||||
|
item = itt.Item()
|
||||||
|
default:
|
||||||
|
return badgerKey, fmt.Errorf("unexpected iterator type %T", it)
|
||||||
|
}
|
||||||
|
|
||||||
|
badgerKey = item.KeyCopy(nil)
|
||||||
|
if isBadgerTable(badgerKey) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, key, err := fromBadgerKey(badgerKey)
|
||||||
|
if err != nil {
|
||||||
|
return badgerKey, fmt.Errorf("error converting from badger key %s", badgerKey)
|
||||||
|
}
|
||||||
|
value, err := item.ValueCopy(nil)
|
||||||
|
if err != nil {
|
||||||
|
return badgerKey, fmt.Errorf("error retrieving contents from database value: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fn(bucket, key, value); err != nil {
|
||||||
|
return badgerKey, fmt.Errorf("error exporting %s[%s]=%x", bucket, key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return badgerKey, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// badgerEncode encodes a byte slice into a section of a BadgerKey.
|
// badgerEncode encodes a byte slice into a section of a BadgerKey.
|
||||||
|
@ -267,6 +276,31 @@ func badgerEncode(val []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parseBadgerEncode decodes the badger key and returns the bucket and the rest.
|
||||||
|
func parseBadgerEncode(bk []byte) (value, rest []byte) {
|
||||||
|
var (
|
||||||
|
keyLen uint16
|
||||||
|
start = uint16(2)
|
||||||
|
length = uint16(len(bk))
|
||||||
|
)
|
||||||
|
if uint16(len(bk)) < start {
|
||||||
|
return nil, bk
|
||||||
|
}
|
||||||
|
// First 2 bytes stores the length of the value.
|
||||||
|
if err := binary.Read(bytes.NewReader(bk[:2]), binary.LittleEndian, &keyLen); err != nil {
|
||||||
|
return nil, bk
|
||||||
|
}
|
||||||
|
end := start + keyLen
|
||||||
|
switch {
|
||||||
|
case length < end:
|
||||||
|
return nil, bk
|
||||||
|
case length == end:
|
||||||
|
return bk[start:end], nil
|
||||||
|
default:
|
||||||
|
return bk[start:end], bk[end:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// isBadgerTable returns True if the slice is a badgerTable token, false otherwise.
|
// isBadgerTable returns True if the slice is a badgerTable token, false otherwise.
|
||||||
// badgerTable means that the slice contains only the [size|value] of one section
|
// badgerTable means that the slice contains only the [size|value] of one section
|
||||||
// of a badgerKey and no remainder. A badgerKey is [buket|key], while a badgerTable
|
// of a badgerKey and no remainder. A badgerKey is [buket|key], while a badgerTable
|
||||||
|
@ -293,34 +327,3 @@ func fromBadgerKey(bk []byte) ([]byte, []byte, error) {
|
||||||
|
|
||||||
return bucket, key, nil
|
return bucket, key, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// cloneBytes returns a copy of a given slice.
|
|
||||||
func cloneBytes(v []byte) []byte {
|
|
||||||
var clone = make([]byte, len(v))
|
|
||||||
copy(clone, v)
|
|
||||||
return clone
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseBadgerEncode(bk []byte) (value, rest []byte) {
|
|
||||||
var (
|
|
||||||
keyLen uint16
|
|
||||||
start = uint16(2)
|
|
||||||
length = uint16(len(bk))
|
|
||||||
)
|
|
||||||
if uint16(len(bk)) < start {
|
|
||||||
return nil, bk
|
|
||||||
}
|
|
||||||
// First 2 bytes stores the length of the value.
|
|
||||||
if err := binary.Read(bytes.NewReader(bk[:2]), binary.LittleEndian, &keyLen); err != nil {
|
|
||||||
return nil, bk
|
|
||||||
}
|
|
||||||
end := start + keyLen
|
|
||||||
switch {
|
|
||||||
case length < end:
|
|
||||||
return nil, bk
|
|
||||||
case length == end:
|
|
||||||
return bk[start:end], nil
|
|
||||||
default:
|
|
||||||
return bk[start:end], bk[end:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue