EC put #1064

Merged
fyrchik merged 5 commits from dstepanov-yadro/frostfs-node:feat/ec_put into master 2024-04-09 07:08:55 +00:00
21 changed files with 802 additions and 92 deletions

View file

@ -276,15 +276,19 @@ env-up: all
echo "Frostfs contracts not found"; exit 1; \ echo "Frostfs contracts not found"; exit 1; \
fi fi
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH} ${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet.json --gas 10.0 ${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet01.json --gas 10.0
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet02.json --gas 10.0
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet03.json --gas 10.0
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet04.json --gas 10.0
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \ @if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
make locode-download; \ make locode-download; \
fi fi
mkdir -p ./$(TMP_DIR)/state
mkdir -p ./$(TMP_DIR)/storage
# Shutdown dev environment # Shutdown dev environment
env-down: env-down:
docker compose -f dev/docker-compose.yml down docker compose -f dev/docker-compose.yml down
docker volume rm -f frostfs-node_neo-go docker volume rm -f frostfs-node_neo-go
rm -f ./.cache/.frostfs-ir-state rm -rf ./$(TMP_DIR)/state
rm -f ./.cache/.frostfs-node-state rm -rf ./$(TMP_DIR)/storage
rm -rf ./.cache/storage

View file

@ -171,6 +171,15 @@ func printHeader(cmd *cobra.Command, obj *objectSDK.Object) error {
cmd.Printf(" signature: %s\n", hex.EncodeToString(sigV2.GetSign())) cmd.Printf(" signature: %s\n", hex.EncodeToString(sigV2.GetSign()))
} }
if ecHeader := obj.ECHeader(); ecHeader != nil {
cmd.Print("EC header:\n")
cmd.Printf(" parent object ID: %s\n", ecHeader.Parent().EncodeToString())
cmd.Printf(" index: %d\n", ecHeader.Index())
cmd.Printf(" total: %d\n", ecHeader.Total())
cmd.Printf(" header length: %d\n", ecHeader.HeaderLength())
}
return printSplitHeader(cmd, obj) return printSplitHeader(cmd, obj)
} }

View file

@ -27,12 +27,12 @@
"FROSTFS_IR_NETMAP_CLEANER_THRESHOLD":"3", "FROSTFS_IR_NETMAP_CLEANER_THRESHOLD":"3",
"FROSTFS_IR_LOCODE_DB_PATH":"${workspaceFolder}/.cache/locode_db", "FROSTFS_IR_LOCODE_DB_PATH":"${workspaceFolder}/.cache/locode_db",
"FROSTFS_IR_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8090", "FROSTFS_IR_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8090",
"FROSTFS_IR_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-ir-state" "FROSTFS_IR_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-ir-state"
}, },
"postDebugTask": "env-down" "postDebugTask": "env-down"
}, },
{ {
"name": "Storage node", "name": "Storage node 1",
"type": "go", "type": "go",
"request": "launch", "request": "launch",
"mode": "debug", "mode": "debug",
@ -42,7 +42,8 @@
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s", "FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws", "FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0", "FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet.json", "FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet01.json",
"FROSTFS_NODE_WALLET_PASSWORD":"", "FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080", "FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080", "FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
@ -50,31 +51,187 @@
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a", "FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev", "FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW", "FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state", "FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s1-state",
"FROSTFS_TREE_ENABLED":"true", "FROSTFS_TREE_ENABLED":"true",
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10", "FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true", "FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/wc0", "FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s1/wc0",
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/meta0", "FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s1/meta0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza", "FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/blobovnicza0", "FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s1/blobovnicza0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2", "FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4", "FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree", "FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/fstree0", "FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s1/fstree0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2", "FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/pilorama0", "FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama0",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true", "FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/wc1", "FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s1/wc1",
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/meta1", "FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s1/meta1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza", "FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/blobovnicza1", "FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s1/blobovnicza1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2", "FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4", "FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree", "FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/fstree1", "FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s1/fstree1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2", "FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/pilorama1" "FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9090",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
},
"postDebugTask": "env-down"
},
{
"name": "Storage node 2",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "cmd/frostfs-node",
"env": {
"FROSTFS_LOGGER_LEVEL":"debug",
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet02.json",
"FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8082",
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8082",
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8083",
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s2-state",
"FROSTFS_TREE_ENABLED":"true",
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s2/wc0",
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s2/meta0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s2/blobovnicza0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s2/fstree0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama0",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s2/wc1",
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s2/meta1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s2/blobovnicza1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s2/fstree1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9091",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
},
"postDebugTask": "env-down"
},
{
"name": "Storage node 3",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "cmd/frostfs-node",
"env": {
"FROSTFS_LOGGER_LEVEL":"debug",
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet03.json",
"FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8084",
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8084",
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8085",
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s3-state",
"FROSTFS_TREE_ENABLED":"true",
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s3/wc0",
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s3/meta0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s3/blobovnicza0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s3/fstree0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama0",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s3/wc1",
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s3/meta1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s3/blobovnicza1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s3/fstree1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9092",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
},
"postDebugTask": "env-down"
},
{
"name": "Storage node 4",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "cmd/frostfs-node",
"env": {
"FROSTFS_LOGGER_LEVEL":"debug",
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet04.json",
"FROSTFS_NODE_WALLET_PASSWORD":"",
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8086",
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8086",
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8087",
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s4-state",
"FROSTFS_TREE_ENABLED":"true",
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s4/wc0",
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s4/meta0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s4/blobovnicza0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s4/fstree0",
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama0",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s4/wc1",
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s4/meta1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s4/blobovnicza1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s4/fstree1",
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9093",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
}, },
"postDebugTask": "env-down" "postDebugTask": "env-down"
} }
@ -82,7 +239,13 @@
"compounds": [ "compounds": [
{ {
"name": "IR+Storage node", "name": "IR+Storage node",
"configurations": ["IR", "Storage node"], "configurations": ["IR", "Storage node 1"],
"preLaunchTask": "env-up",
"stopAll": true
},
{
"name": "IR + 4 storage nodes",
"configurations": ["IR", "Storage node 1", "Storage node 2", "Storage node 3", "Storage node 4"],
"preLaunchTask": "env-up", "preLaunchTask": "env-up",
"stopAll": true "stopAll": true
} }

30
dev/storage/wallet02.json Normal file
View file

@ -0,0 +1,30 @@
{
"version":"3.0",
"accounts":[
{
"address":"NVXXy3hNTvwVEZa2dAibALyJB3Q86aiHvL",
"key":"6PYXd9hxMYfaCkgeZp3q1RoMB921RQFkRxYftcacTJ2S7MUwnivrxi6Yk5",
"label":"",
"contract":{
"script":"DCED/2W2rnkTSk3OnQ0504Uem6tO6Xq/hugeHFu8UM0oJq5BVuezJw==",
"parameters":[
{
"name":"parameter0",
"type":"Signature"
}
],
"deployed":false
},
"lock":false,
"isDefault":false
}
],
"scrypt":{
"n":16384,
"r":8,
"p":8
},
"extra":{
"Tokens":null
}
}

30
dev/storage/wallet03.json Normal file
View file

@ -0,0 +1,30 @@
{
"version":"3.0",
"accounts":[
{
"address":"NPTmih9X14Y7xLvmD6RVtDHdH1Y9qJwoTe",
"key":"6PYXNeQzge9fWztVnWYRbr5Mh9q1y4npKVARHYGb484Hct1iNd3vXGR1kk",
"label":"",
"contract":{
"script":"DCECrJIM198LYbKJBy5rlG4tpOGjG5qxxiG7R14w+kqxAsNBVuezJw==",
"parameters":[
{
"name":"parameter0",
"type":"Signature"
}
],
"deployed":false
},
"lock":false,
"isDefault":false
}
],
"scrypt":{
"n":16384,
"r":8,
"p":8
},
"extra":{
"Tokens":null
}
}

30
dev/storage/wallet04.json Normal file
View file

@ -0,0 +1,30 @@
{
"version":"3.0",
"accounts":[
{
"address":"Ne2DAQbWvP1s7TbtFc7BStKMnjKJdBaVRm",
"key":"6PYWCsGWx8uSVYK94tvK7Ccit8x8Z3f3dHADTFTgLhT9NBXTBqBECL8AyC",
"label":"",
"contract":{
"script":"DCEDjIYpWeVrQ+IPeRh8T+ngvHyMZsFgPmzw7H+Hq2sI3DVBVuezJw==",
"parameters":[
{
"name":"parameter0",
"type":"Signature"
}
],
"deployed":false
},
"lock":false,
"isDefault":false
}
],
"scrypt":{
"n":16384,
"r":8,
"p":8
},
"extra":{
"Tokens":null
}
}

1
go.mod
View file

@ -85,6 +85,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-cid v0.4.1 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -579,4 +579,6 @@ const (
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node" EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node" EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node" EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
ECFailedToSendToContainerNode = "failed to send EC object to container node"
ECFailedToSaveECPart = "failed to save EC part"
) )

11
pkg/core/container/ec.go Normal file
View file

@ -0,0 +1,11 @@
package container
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// IsECContainer returns True if container has erasure coding policy.
func IsECContainer(cnr containerSDK.Container) bool {
return policy.IsECPlacement(cnr.PlacementPolicy())
}

13
pkg/core/object/ec.go Normal file
View file

@ -0,0 +1,13 @@
package object
import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
// IsECSupported returns True if EC supported for object.
//
// EC supported only for regular, not linking objects.
func IsECSupported(obj *objectSDK.Object) bool {
return obj.Type() == objectSDK.TypeRegular &&
len(obj.Children()) == 0
}

20
pkg/core/policy/ec.go Normal file
View file

@ -0,0 +1,20 @@
package policy
import (
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
// IsECPlacement returns True if policy is erasure coding policy.
func IsECPlacement(policy netmapSDK.PlacementPolicy) bool {
return policy.NumberOfReplicas() == 1 && policy.ReplicaDescriptor(0).GetECDataCount() > 0
}
// ECDataCount returns EC data count for EC placement policy.
func ECDataCount(policy netmapSDK.PlacementPolicy) int {
return int(policy.ReplicaDescriptor(0).GetECDataCount())
}
// ECParityCount returns EC parity count for EC placement policy.
func ECParityCount(policy netmapSDK.PlacementPolicy) int {
return int(policy.ReplicaDescriptor(0).GetECParityCount())
}

View file

@ -0,0 +1,54 @@
package putsvc
import (
"context"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
var _ transformer.ChunkedObjectWriter = (*inMemoryObjectBuilder)(nil)
type inMemoryObjectBuilder struct {
objectWriter transformer.ObjectWriter
payload *payload
obj *objectSDK.Object
}
func newInMemoryObjectBuilder(objectWriter transformer.ObjectWriter) *inMemoryObjectBuilder {
return &inMemoryObjectBuilder{
objectWriter: objectWriter,
payload: getPayload(),
}
}
func (b *inMemoryObjectBuilder) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
defer func() {
putPayload(b.payload)
b.payload = nil
}()
b.obj.SetPayload(b.payload.Data)
if err := b.objectWriter.WriteObject(ctx, b.obj); err != nil {
return nil, err
}
id, _ := b.obj.ID()
return &transformer.AccessIdentifiers{
SelfID: id,
}, nil
}
func (b *inMemoryObjectBuilder) Write(_ context.Context, p []byte) (int, error) {
b.payload.Data = append(b.payload.Data, p...)
return len(p), nil
}
func (b *inMemoryObjectBuilder) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
b.obj = obj
return nil
}

View file

@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
) )
type preparedObjectTarget interface { type preparedObjectTarget interface {
@ -16,15 +15,12 @@ type preparedObjectTarget interface {
type distributedTarget struct { type distributedTarget struct {
placementOpts []placement.Option placementOpts []placement.Option
extraBroadcastEnabled bool
obj *objectSDK.Object obj *objectSDK.Object
objMeta object.ContentMeta objMeta object.ContentMeta
*cfg *cfg
payload *payload
nodeTargetInitializer func(nodeDesc) preparedObjectTarget nodeTargetInitializer func(nodeDesc) preparedObjectTarget
relay func(context.Context, nodeDesc) error relay func(context.Context, nodeDesc) error
@ -91,36 +87,6 @@ func (x errIncompletePut) Error() string {
return commonMsg return commonMsg
} }
func (t *distributedTarget) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
t.obj = obj
return nil
}
func (t *distributedTarget) Write(_ context.Context, p []byte) (n int, err error) {
t.payload.Data = append(t.payload.Data, p...)
return len(p), nil
}
func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
defer func() {
putPayload(t.payload)
t.payload = nil
}()
t.obj.SetPayload(t.payload.Data)
if err := t.WriteObject(ctx, t.obj); err != nil {
return nil, err
}
id, _ := t.obj.ID()
return &transformer.AccessIdentifiers{
SelfID: id,
}, nil
}
// WriteObject implements the transformer.ObjectWriter interface. // WriteObject implements the transformer.ObjectWriter interface.
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error { func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
t.obj = obj t.obj = obj

View file

@ -0,0 +1,265 @@
package putsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var _ transformer.ObjectWriter = (*ecWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
type ecWriter struct {
cfg *cfg
placementOpts []placement.Option
container containerSDK.Container
key *ecdsa.PrivateKey
commonPrm *svcutil.CommonPrm
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
objMeta object.ContentMeta
objMetaValid bool
}
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx)
if err != nil {
return err
}
if relayed {
return nil
}
if !object.IsECSupported(obj) {
fyrchik marked this conversation as resolved Outdated

What do you mean by this comment?

What do you mean by this comment?

ec_writer supports only regular objects. Linking objects, tombstones and locks write should be handled by defaultWriter (aka repWriter).
The current implementation satisfies this condition. It's just an additional check and foolproof.

`ec_writer` supports only regular objects. Linking objects, tombstones and locks write should be handled by `defaultWriter` (aka `repWriter`). The current implementation satisfies this condition. It's just an additional check and foolproof.
// must be resolved by caller
return errUnsupportedECObject
}
if !e.objMetaValid {
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
}
e.objMetaValid = true
}
if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj)
}
return e.writeRawObject(ctx, obj)
}
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
if e.relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil {
return false, err
}
if currentNodeIsContainerNode {
// object can be splitted or saved local
return false, nil
}
if err := e.relayToContainerNode(ctx); err != nil {
return false, err
}
return true, nil
}
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return false, err
}
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for _, node := range nodes {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
return true, nil
}
}
}
return false, nil
}
func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return err
}
var lastErr error
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for _, node := range nodes {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
c, err := e.cfg.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = e.relay(ctx, info, c)
}); poolErr != nil {
close(completed)
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
return poolErr
}
<-completed
if err == nil {
return nil
}
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
lastErr = err
}
}
if lastErr == nil {
return nil
}
return errIncompletePut{
singleErr: lastErr,
}
}
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
eg.Go(func() error {
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes)
})
}
if err := eg.Wait(); err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
// now only single EC policy is supported
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
if err != nil {
return err
}
parts, err := c.Split(obj, e.key)
if err != nil {
return err
}
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for idx := range parts {
idx := idx
eg.Go(func() error {
return e.writePart(egCtx, parts[idx], idx, nodes)
})
}
}
if err := eg.Wait(); err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error {
var err error
node := nodes[partIdx%len(nodes)]
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
fyrchik marked this conversation as resolved Outdated

We can't try all others, because this violates the assumption of how many nodes can fail. With this code it could happen that all chunks reside on one node.

I suggest to fail here: we already fail in a similar way for big objects (so there could be garbage).
Later we can improve this (probably i < len(nodes) should be replaced with something else, but not i < parityCount, so this is not obvious).

We can't try _all_ others, because this violates the assumption of how many nodes can fail. With this code it could happen that all chunks reside on one node. I suggest to fail here: we already fail in a similar way for big objects (so there could be garbage). Later we can improve this (probably `i < len(nodes)` should be replaced with something else, but not `i < parityCount`, so this is not obvious).

fixed

fixed
err = e.writePartLocal(ctx, obj)
} else {
err = e.writePartRemote(ctx, obj, node)
}
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err))
return err
}
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error
localTarget := localTarget{
storage: e.cfg.localStore,
}
completed := make(chan interface{})
if poolErr := e.cfg.localPool.Submit(func() {
defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
var clientNodeInfo client.NodeInfo
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteTarget{
privateKey: e.key,
clientConstructor: e.cfg.clientConstructor,
commonPrm: e.commonPrm,
nodeInfo: clientNodeInfo,
}
var err error
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}

View file

@ -16,6 +16,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -25,6 +26,7 @@ import (
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/tzhash/tz" "git.frostfs.info/TrueCloudLab/tzhash/tz"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -32,7 +34,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum") var (
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
errInvalidECObject = errors.New("object must be splitted to EC parts")
)
type putSingleRequestSigner struct { type putSingleRequestSigner struct {
req *objectAPI.PutSingleRequest req *objectAPI.PutSingleRequest
@ -148,12 +153,20 @@ func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Ob
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
localOnly := req.GetMetaHeader().GetTTL() <= 1 localOnly := req.GetMetaHeader().GetTTL() <= 1
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly) placement, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
if err != nil { if err != nil {
return err return err
} }
iter := s.cfg.newNodeIterator(placementOptions) if placement.isEC {
return s.saveToECReplicas(ctx, placement, obj, req, meta)
}
return s.saveToREPReplicas(ctx, placement, obj, localOnly, req, meta)
}
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly) iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
@ -167,38 +180,83 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
}) })
} }
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) { func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
var result []placement.Option if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
if len(copiesNumber) > 0 { return errInvalidECObject
result = append(result, placement.WithCopyNumbers(copiesNumber))
} }
commonPrm, err := svcutil.CommonPrmFromV2(req)
if err != nil {
return err
}
key, err := s.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
signer: &sync.Once{},
}
w := ecWriter{
cfg: s.cfg,
placementOpts: placement.placementOptions,
objMeta: meta,
objMetaValid: true,
commonPrm: commonPrm,
container: placement.container,
key: key,
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
},
}
return w.WriteObject(ctx, obj)
}
type putSinglePlacement struct {
placementOptions []placement.Option
isEC bool
container containerSDK.Container
}
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
var result putSinglePlacement
cnrID, ok := obj.ContainerID() cnrID, ok := obj.ContainerID()
if !ok { if !ok {
return nil, errors.New("missing container ID") return result, errors.New("missing container ID")
} }
cnrInfo, err := s.cnrSrc.Get(cnrID) cnrInfo, err := s.cnrSrc.Get(cnrID)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get container by ID: %w", err) return result, fmt.Errorf("could not get container by ID: %w", err)
} }
result = append(result, placement.ForContainer(cnrInfo.Value)) result.container = cnrInfo.Value
result.isEC = container.IsECContainer(cnrInfo.Value) && object.IsECSupported(obj)
if len(copiesNumber) > 0 && !result.isEC {
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
}
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
objID, ok := obj.ID() objID, ok := obj.ID()
if !ok { if !ok {
return nil, errors.New("missing object ID") return result, errors.New("missing object ID")
} }
result = append(result, placement.ForObject(objID)) if obj.ECHeader() != nil {
objID = obj.ECHeader().Parent()
}
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc) latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get latest network map: %w", err) return result, fmt.Errorf("could not get latest network map: %w", err)
} }
builder := placement.NewNetworkMapBuilder(latestNetmap) builder := placement.NewNetworkMapBuilder(latestNetmap)
if localOnly { if localOnly {
result = append(result, placement.SuccessAfter(1)) result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys) builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
} }
result = append(result, placement.UseBuilder(builder)) result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
return result, nil return result, nil
} }

View file

@ -7,12 +7,13 @@ import (
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
) )
@ -20,7 +21,7 @@ import (
type Streamer struct { type Streamer struct {
*cfg *cfg
sessionKey *ecdsa.PrivateKey privateKey *ecdsa.PrivateKey
target transformer.ChunkedObjectWriter target transformer.ChunkedObjectWriter
@ -77,9 +78,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
p.relay = prm.relay p.relay = prm.relay
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
p.privateKey = nodeKey
// prepare untrusted-Put object target // prepare untrusted-Put object target
p.target = &validatingPreparedTarget{ p.target = &validatingPreparedTarget{
nextTarget: p.newCommonTarget(prm), nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
fmt: p.fmtValidator, fmt: p.fmtValidator,
maxPayloadSz: p.maxPayloadSz, maxPayloadSz: p.maxPayloadSz,
@ -103,7 +110,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
} }
} }
sessionKey, err := p.keyStorage.GetKey(sessionInfo) key, err := p.keyStorage.GetKey(sessionInfo)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not receive session key: %w", p, err) return fmt.Errorf("(%T) could not receive session key: %w", p, err)
} }
@ -117,7 +124,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
if sToken == nil { if sToken == nil {
var ownerSession user.ID var ownerSession user.ID
user.IDFromKey(&ownerSession, sessionKey.PublicKey) user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) { if !ownerObj.Equals(ownerSession) {
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p) return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
@ -128,12 +135,12 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
} }
} }
p.sessionKey = sessionKey p.privateKey = key
p.target = &validatingTarget{ p.target = &validatingTarget{
fmt: p.fmtValidator, fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: sessionKey, Key: key,
NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) }, NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
NetworkState: p.networkState, NetworkState: p.networkState,
MaxSize: p.maxPayloadSz, MaxSize: p.maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
@ -172,7 +179,12 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
placement.ForContainer(prm.cnr), placement.ForContainer(prm.cnr),
) )
if id, ok := prm.hdr.ID(); ok { if ech := prm.hdr.ECHeader(); ech != nil {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.hdr.ID(); ok {
prm.traverseOpts = append(prm.traverseOpts, prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object // set identifier of the processing object
placement.ForObject(id), placement.ForObject(id),
@ -196,7 +208,14 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
return nil return nil
} }
func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget { func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm)
}
return p.newDefaultObjectWriter(prm)
}
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error var relay func(context.Context, nodeDesc) error
if p.relay != nil { if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error { relay = func(ctx context.Context, node nodeDesc) error {
@ -213,16 +232,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
} }
} }
// enable additional container broadcast on non-local operation
// if object has TOMBSTONE or LOCK type.
typ := prm.hdr.Type()
withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock)
return &distributedTarget{ return &distributedTarget{
cfg: p.cfg, cfg: p.cfg,
placementOpts: prm.traverseOpts, placementOpts: prm.traverseOpts,
extraBroadcastEnabled: withBroadcast,
payload: getPayload(),
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local { if node.local {
return localTarget{ return localTarget{
@ -231,7 +243,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
} }
rt := &remoteTarget{ rt := &remoteTarget{
privateKey: p.sessionKey, privateKey: p.privateKey,
commonPrm: prm.common, commonPrm: prm.common,
clientConstructor: p.clientConstructor, clientConstructor: p.clientConstructor,
} }
@ -244,6 +256,20 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
} }
} }
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ecWriter{

What happens if I have EC 2+1 and 4 nodes in the container?

What happens if I have `EC 2+1` and 4 nodes in the container?

What exactly are you asking? EC chunks will be saved on 3 of 4 container nodes.

What exactly are you asking? EC chunks will be saved on 3 of 4 container nodes.
cfg: p.cfg,
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
container: prm.cnr,
key: p.privateKey,
commonPrm: prm.common,
relay: p.relay,
},
repWriter: p.newDefaultObjectWriter(prm),
}
}
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error { func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil { if p.target == nil {
return errNotInit return errNotInit

View file

@ -0,0 +1,23 @@
package putsvc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
var _ transformer.ObjectWriter = (*objectWriterDispatcher)(nil)
type objectWriterDispatcher struct {
ecWriter transformer.ObjectWriter
repWriter transformer.ObjectWriter
aarifullin marked this conversation as resolved Outdated
  1. I don't see any reason to keep repWriter within multiObjectWriter at least because you don't use newECWriter if container does not support EC policy. So, it seems it always skips m.repWriter.WriteObject(ctx, obj). Please, fix me, if I am incorrect

  2. multi- prefix sounds like the object writer is able to choose how to write an object to a container: either by EC placement or by REP although a container adheres to single placement. What do you think about the idea to use objectWriterDispatcher name instead?

1. I don't see any reason to keep `repWriter` within `multiObjectWriter` at least because you don't use `newECWriter` if container does not support EC policy. So, it seems it always skips `m.repWriter.WriteObject(ctx, obj)`. Please, fix me, if I am incorrect 2. `multi-` prefix sounds like the object writer is able to *choose* how to write an object to a container: either by `EC` placement or by `REP` although a container adheres to single placement. What do you think about the idea to use `objectWriterDispatcher` name instead?
  1. EC container stores some objects (tombstones, locks, linking objects) as regular objects: these objects must be saved on every container node without EC splitting.
  2. done
1. EC container stores some objects (tombstones, locks, linking objects) as regular objects: these objects must be saved on every container node without EC splitting. 2. done

EC container stores some objects (tombstones, locks, linking objects) as regular objects: these objects must be saved on every container node without EC splitting.

Thanks for explanation

UPD: btw, I incorrectly read your code

if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr)

I didn't notice you pass header to IsECSupported - that is why I've got confused. But for now it is OK 👍

> EC container stores some objects (tombstones, locks, linking objects) as regular objects: these objects must be saved on every container node without EC splitting. Thanks for explanation UPD: btw, I incorrectly read your code ```go if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) ``` I didn't notice you pass header to `IsECSupported` - that is why I've got confused. But for now it is OK 👍
}
func (m *objectWriterDispatcher) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
if object.IsECSupported(obj) {
return m.ecWriter.WriteObject(ctx, obj)
}
return m.repWriter.WriteObject(ctx, obj)
}

View file

@ -137,7 +137,7 @@ func defaultCopiesVector(policy netmap.PlacementPolicy) []int {
copyVector := make([]int, 0, replNum) copyVector := make([]int, 0, replNum)
for i := 0; i < replNum; i++ { for i := 0; i < replNum; i++ {
copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects())) copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()+policy.ReplicaDescriptor(i).GetECDataCount()+policy.ReplicaDescriptor(i).GetECParityCount()))
} }
return copyVector return copyVector

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -40,6 +41,10 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
} }
policy := cnr.Value.PlacementPolicy() policy := cnr.Value.PlacementPolicy()
if policycore.IsECPlacement(policy) {
// EC not supported yet by policer
return nil
}
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
if err != nil { if err != nil {