EC put #1064
12
Makefile
|
@ -276,15 +276,19 @@ env-up: all
|
||||||
echo "Frostfs contracts not found"; exit 1; \
|
echo "Frostfs contracts not found"; exit 1; \
|
||||||
fi
|
fi
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet.json --gas 10.0
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet01.json --gas 10.0
|
||||||
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet02.json --gas 10.0
|
||||||
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet03.json --gas 10.0
|
||||||
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet04.json --gas 10.0
|
||||||
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
|
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
|
||||||
make locode-download; \
|
make locode-download; \
|
||||||
fi
|
fi
|
||||||
|
mkdir -p ./$(TMP_DIR)/state
|
||||||
|
mkdir -p ./$(TMP_DIR)/storage
|
||||||
|
|
||||||
# Shutdown dev environment
|
# Shutdown dev environment
|
||||||
env-down:
|
env-down:
|
||||||
docker compose -f dev/docker-compose.yml down
|
docker compose -f dev/docker-compose.yml down
|
||||||
docker volume rm -f frostfs-node_neo-go
|
docker volume rm -f frostfs-node_neo-go
|
||||||
rm -f ./.cache/.frostfs-ir-state
|
rm -rf ./$(TMP_DIR)/state
|
||||||
rm -f ./.cache/.frostfs-node-state
|
rm -rf ./$(TMP_DIR)/storage
|
||||||
rm -rf ./.cache/storage
|
|
||||||
|
|
|
@ -171,6 +171,15 @@ func printHeader(cmd *cobra.Command, obj *objectSDK.Object) error {
|
||||||
cmd.Printf(" signature: %s\n", hex.EncodeToString(sigV2.GetSign()))
|
cmd.Printf(" signature: %s\n", hex.EncodeToString(sigV2.GetSign()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
||||||
|
cmd.Print("EC header:\n")
|
||||||
|
|
||||||
|
cmd.Printf(" parent object ID: %s\n", ecHeader.Parent().EncodeToString())
|
||||||
|
cmd.Printf(" index: %d\n", ecHeader.Index())
|
||||||
|
cmd.Printf(" total: %d\n", ecHeader.Total())
|
||||||
|
cmd.Printf(" header length: %d\n", ecHeader.HeaderLength())
|
||||||
|
}
|
||||||
|
|
||||||
return printSplitHeader(cmd, obj)
|
return printSplitHeader(cmd, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,12 +27,12 @@
|
||||||
"FROSTFS_IR_NETMAP_CLEANER_THRESHOLD":"3",
|
"FROSTFS_IR_NETMAP_CLEANER_THRESHOLD":"3",
|
||||||
"FROSTFS_IR_LOCODE_DB_PATH":"${workspaceFolder}/.cache/locode_db",
|
"FROSTFS_IR_LOCODE_DB_PATH":"${workspaceFolder}/.cache/locode_db",
|
||||||
"FROSTFS_IR_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8090",
|
"FROSTFS_IR_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8090",
|
||||||
"FROSTFS_IR_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-ir-state"
|
"FROSTFS_IR_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-ir-state"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "Storage node",
|
"name": "Storage node 1",
|
||||||
"type": "go",
|
"type": "go",
|
||||||
"request": "launch",
|
"request": "launch",
|
||||||
"mode": "debug",
|
"mode": "debug",
|
||||||
|
@ -42,7 +42,8 @@
|
||||||
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet.json",
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet01.json",
|
||||||
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
||||||
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
|
||||||
|
@ -50,31 +51,187 @@
|
||||||
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state",
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s1-state",
|
||||||
"FROSTFS_TREE_ENABLED":"true",
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/wc0",
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s1/wc0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/meta0",
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s1/meta0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/blobovnicza0",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s1/blobovnicza0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/fstree0",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s1/fstree0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/pilorama0",
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama0",
|
||||||
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/wc1",
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s1/wc1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/meta1",
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s1/meta1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/blobovnicza1",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s1/blobovnicza1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/fstree1",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s1/fstree1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/pilorama1"
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9090",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
|
},
|
||||||
|
"postDebugTask": "env-down"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Storage node 2",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "debug",
|
||||||
|
"program": "cmd/frostfs-node",
|
||||||
|
"env": {
|
||||||
|
"FROSTFS_LOGGER_LEVEL":"debug",
|
||||||
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet02.json",
|
||||||
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8082",
|
||||||
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8082",
|
||||||
|
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8083",
|
||||||
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s2-state",
|
||||||
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s2/wc0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s2/meta0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s2/blobovnicza0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s2/fstree0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s2/wc1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s2/meta1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s2/blobovnicza1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s2/fstree1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9091",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
|
},
|
||||||
|
"postDebugTask": "env-down"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Storage node 3",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "debug",
|
||||||
|
"program": "cmd/frostfs-node",
|
||||||
|
"env": {
|
||||||
|
"FROSTFS_LOGGER_LEVEL":"debug",
|
||||||
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet03.json",
|
||||||
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8084",
|
||||||
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8084",
|
||||||
|
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8085",
|
||||||
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s3-state",
|
||||||
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s3/wc0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s3/meta0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s3/blobovnicza0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s3/fstree0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s3/wc1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s3/meta1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s3/blobovnicza1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s3/fstree1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9092",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
|
},
|
||||||
|
"postDebugTask": "env-down"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Storage node 4",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "debug",
|
||||||
|
"program": "cmd/frostfs-node",
|
||||||
|
"env": {
|
||||||
|
"FROSTFS_LOGGER_LEVEL":"debug",
|
||||||
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet04.json",
|
||||||
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8086",
|
||||||
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8086",
|
||||||
|
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8087",
|
||||||
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s4-state",
|
||||||
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s4/wc0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s4/meta0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s4/blobovnicza0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s4/fstree0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s4/wc1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s4/meta1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s4/blobovnicza1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s4/fstree1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9093",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
}
|
}
|
||||||
|
@ -82,7 +239,13 @@
|
||||||
"compounds": [
|
"compounds": [
|
||||||
{
|
{
|
||||||
"name": "IR+Storage node",
|
"name": "IR+Storage node",
|
||||||
"configurations": ["IR", "Storage node"],
|
"configurations": ["IR", "Storage node 1"],
|
||||||
|
"preLaunchTask": "env-up",
|
||||||
|
"stopAll": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "IR + 4 storage nodes",
|
||||||
|
"configurations": ["IR", "Storage node 1", "Storage node 2", "Storage node 3", "Storage node 4"],
|
||||||
"preLaunchTask": "env-up",
|
"preLaunchTask": "env-up",
|
||||||
"stopAll": true
|
"stopAll": true
|
||||||
}
|
}
|
||||||
|
|
30
dev/storage/wallet02.json
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
{
|
||||||
|
"version":"3.0",
|
||||||
|
"accounts":[
|
||||||
|
{
|
||||||
|
"address":"NVXXy3hNTvwVEZa2dAibALyJB3Q86aiHvL",
|
||||||
|
"key":"6PYXd9hxMYfaCkgeZp3q1RoMB921RQFkRxYftcacTJ2S7MUwnivrxi6Yk5",
|
||||||
|
"label":"",
|
||||||
|
"contract":{
|
||||||
|
"script":"DCED/2W2rnkTSk3OnQ0504Uem6tO6Xq/hugeHFu8UM0oJq5BVuezJw==",
|
||||||
|
"parameters":[
|
||||||
|
{
|
||||||
|
"name":"parameter0",
|
||||||
|
"type":"Signature"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"deployed":false
|
||||||
|
},
|
||||||
|
"lock":false,
|
||||||
|
"isDefault":false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"scrypt":{
|
||||||
|
"n":16384,
|
||||||
|
"r":8,
|
||||||
|
"p":8
|
||||||
|
},
|
||||||
|
"extra":{
|
||||||
|
"Tokens":null
|
||||||
|
}
|
||||||
|
}
|
30
dev/storage/wallet03.json
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
{
|
||||||
|
"version":"3.0",
|
||||||
|
"accounts":[
|
||||||
|
{
|
||||||
|
"address":"NPTmih9X14Y7xLvmD6RVtDHdH1Y9qJwoTe",
|
||||||
|
"key":"6PYXNeQzge9fWztVnWYRbr5Mh9q1y4npKVARHYGb484Hct1iNd3vXGR1kk",
|
||||||
|
"label":"",
|
||||||
|
"contract":{
|
||||||
|
"script":"DCECrJIM198LYbKJBy5rlG4tpOGjG5qxxiG7R14w+kqxAsNBVuezJw==",
|
||||||
|
"parameters":[
|
||||||
|
{
|
||||||
|
"name":"parameter0",
|
||||||
|
"type":"Signature"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"deployed":false
|
||||||
|
},
|
||||||
|
"lock":false,
|
||||||
|
"isDefault":false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"scrypt":{
|
||||||
|
"n":16384,
|
||||||
|
"r":8,
|
||||||
|
"p":8
|
||||||
|
},
|
||||||
|
"extra":{
|
||||||
|
"Tokens":null
|
||||||
|
}
|
||||||
|
}
|
30
dev/storage/wallet04.json
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
{
|
||||||
|
"version":"3.0",
|
||||||
|
"accounts":[
|
||||||
|
{
|
||||||
|
"address":"Ne2DAQbWvP1s7TbtFc7BStKMnjKJdBaVRm",
|
||||||
|
"key":"6PYWCsGWx8uSVYK94tvK7Ccit8x8Z3f3dHADTFTgLhT9NBXTBqBECL8AyC",
|
||||||
|
"label":"",
|
||||||
|
"contract":{
|
||||||
|
"script":"DCEDjIYpWeVrQ+IPeRh8T+ngvHyMZsFgPmzw7H+Hq2sI3DVBVuezJw==",
|
||||||
|
"parameters":[
|
||||||
|
{
|
||||||
|
"name":"parameter0",
|
||||||
|
"type":"Signature"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"deployed":false
|
||||||
|
},
|
||||||
|
"lock":false,
|
||||||
|
"isDefault":false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"scrypt":{
|
||||||
|
"n":16384,
|
||||||
|
"r":8,
|
||||||
|
"p":8
|
||||||
|
},
|
||||||
|
"extra":{
|
||||||
|
"Tokens":null
|
||||||
|
}
|
||||||
|
}
|
1
go.mod
|
@ -85,6 +85,7 @@ require (
|
||||||
github.com/ipfs/go-cid v0.4.1 // indirect
|
github.com/ipfs/go-cid v0.4.1 // indirect
|
||||||
github.com/josharian/intern v1.0.0 // indirect
|
github.com/josharian/intern v1.0.0 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||||
|
github.com/klauspost/reedsolomon v1.12.1 // indirect
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
github.com/magiconair/properties v1.8.7 // indirect
|
||||||
github.com/mailru/easyjson v0.7.7 // indirect
|
github.com/mailru/easyjson v0.7.7 // indirect
|
||||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||||
|
|
BIN
go.sum
|
@ -579,4 +579,6 @@ const (
|
||||||
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
||||||
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
||||||
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
||||||
|
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||||
|
ECFailedToSaveECPart = "failed to save EC part"
|
||||||
)
|
)
|
||||||
|
|
11
pkg/core/container/ec.go
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsECContainer returns True if container has erasure coding policy.
|
||||||
|
func IsECContainer(cnr containerSDK.Container) bool {
|
||||||
|
return policy.IsECPlacement(cnr.PlacementPolicy())
|
||||||
|
}
|
13
pkg/core/object/ec.go
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsECSupported returns True if EC supported for object.
|
||||||
|
//
|
||||||
|
// EC supported only for regular, not linking objects.
|
||||||
|
func IsECSupported(obj *objectSDK.Object) bool {
|
||||||
|
return obj.Type() == objectSDK.TypeRegular &&
|
||||||
|
len(obj.Children()) == 0
|
||||||
|
}
|
20
pkg/core/policy/ec.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package policy
|
||||||
|
|
||||||
|
import (
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsECPlacement returns True if policy is erasure coding policy.
|
||||||
|
func IsECPlacement(policy netmapSDK.PlacementPolicy) bool {
|
||||||
|
return policy.NumberOfReplicas() == 1 && policy.ReplicaDescriptor(0).GetECDataCount() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECDataCount returns EC data count for EC placement policy.
|
||||||
|
func ECDataCount(policy netmapSDK.PlacementPolicy) int {
|
||||||
|
return int(policy.ReplicaDescriptor(0).GetECDataCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECParityCount returns EC parity count for EC placement policy.
|
||||||
|
func ECParityCount(policy netmapSDK.PlacementPolicy) int {
|
||||||
|
return int(policy.ReplicaDescriptor(0).GetECParityCount())
|
||||||
|
}
|
54
pkg/services/object/put/builder.go
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ transformer.ChunkedObjectWriter = (*inMemoryObjectBuilder)(nil)
|
||||||
|
|
||||||
|
type inMemoryObjectBuilder struct {
|
||||||
|
objectWriter transformer.ObjectWriter
|
||||||
|
payload *payload
|
||||||
|
|
||||||
|
obj *objectSDK.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func newInMemoryObjectBuilder(objectWriter transformer.ObjectWriter) *inMemoryObjectBuilder {
|
||||||
|
return &inMemoryObjectBuilder{
|
||||||
|
objectWriter: objectWriter,
|
||||||
|
payload: getPayload(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
||||||
|
defer func() {
|
||||||
|
putPayload(b.payload)
|
||||||
|
b.payload = nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
b.obj.SetPayload(b.payload.Data)
|
||||||
|
|
||||||
|
if err := b.objectWriter.WriteObject(ctx, b.obj); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
id, _ := b.obj.ID()
|
||||||
|
return &transformer.AccessIdentifiers{
|
||||||
|
SelfID: id,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) Write(_ context.Context, p []byte) (int, error) {
|
||||||
|
b.payload.Data = append(b.payload.Data, p...)
|
||||||
|
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
|
||||||
|
b.obj = obj
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type preparedObjectTarget interface {
|
type preparedObjectTarget interface {
|
||||||
|
@ -16,15 +15,12 @@ type preparedObjectTarget interface {
|
||||||
|
|
||||||
type distributedTarget struct {
|
type distributedTarget struct {
|
||||||
placementOpts []placement.Option
|
placementOpts []placement.Option
|
||||||
extraBroadcastEnabled bool
|
|
||||||
|
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
objMeta object.ContentMeta
|
objMeta object.ContentMeta
|
||||||
|
|
||||||
*cfg
|
*cfg
|
||||||
|
|
||||||
payload *payload
|
|
||||||
|
|
||||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||||
|
|
||||||
relay func(context.Context, nodeDesc) error
|
relay func(context.Context, nodeDesc) error
|
||||||
|
@ -91,36 +87,6 @@ func (x errIncompletePut) Error() string {
|
||||||
return commonMsg
|
return commonMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
|
|
||||||
t.obj = obj
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *distributedTarget) Write(_ context.Context, p []byte) (n int, err error) {
|
|
||||||
t.payload.Data = append(t.payload.Data, p...)
|
|
||||||
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
|
||||||
defer func() {
|
|
||||||
putPayload(t.payload)
|
|
||||||
t.payload = nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
t.obj.SetPayload(t.payload.Data)
|
|
||||||
|
|
||||||
if err := t.WriteObject(ctx, t.obj); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
id, _ := t.obj.ID()
|
|
||||||
return &transformer.AccessIdentifiers{
|
|
||||||
SelfID: id,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteObject implements the transformer.ObjectWriter interface.
|
// WriteObject implements the transformer.ObjectWriter interface.
|
||||||
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
t.obj = obj
|
t.obj = obj
|
||||||
|
|
265
pkg/services/object/put/ec.go
Normal file
|
@ -0,0 +1,265 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ transformer.ObjectWriter = (*ecWriter)(nil)
|
||||||
|
|
||||||
|
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||||
|
|
||||||
|
type ecWriter struct {
|
||||||
|
cfg *cfg
|
||||||
|
placementOpts []placement.Option
|
||||||
|
container containerSDK.Container
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
commonPrm *svcutil.CommonPrm
|
||||||
|
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||||
|
|
||||||
|
objMeta object.ContentMeta
|
||||||
|
objMetaValid bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
relayed, err := e.relayIfNotContainerNode(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if relayed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !object.IsECSupported(obj) {
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
// must be resolved by caller
|
||||||
|
return errUnsupportedECObject
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.objMetaValid {
|
||||||
|
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
|
||||||
|
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
|
||||||
|
}
|
||||||
|
e.objMetaValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj.ECHeader() != nil {
|
||||||
|
return e.writeECPart(ctx, obj)
|
||||||
|
}
|
||||||
|
return e.writeRawObject(ctx, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
|
||||||
|
if e.relay == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if currentNodeIsContainerNode {
|
||||||
|
// object can be splitted or saved local
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err := e.relayToContainerNode(ctx); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, node := range nodes {
|
||||||
|
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var lastErr error
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, node := range nodes {
|
||||||
|
var info client.NodeInfo
|
||||||
|
client.NodeInfoFromNetmapElement(&info, node)
|
||||||
|
|
||||||
|
c, err := e.cfg.clientConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
completed := make(chan interface{})
|
||||||
|
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||||
|
defer close(completed)
|
||||||
|
err = e.relay(ctx, info, c)
|
||||||
|
}); poolErr != nil {
|
||||||
|
close(completed)
|
||||||
|
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
|
||||||
|
return poolErr
|
||||||
|
}
|
||||||
|
<-completed
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastErr == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errIncompletePut{
|
||||||
|
singleErr: lastErr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return errIncompletePut{
|
||||||
|
singleErr: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
// now only single EC policy is supported
|
||||||
|
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
parts, err := c.Split(obj, e.key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx := range parts {
|
||||||
|
idx := idx
|
||||||
|
eg.Go(func() error {
|
||||||
|
return e.writePart(egCtx, parts[idx], idx, nodes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return errIncompletePut{
|
||||||
|
singleErr: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error {
|
||||||
|
var err error
|
||||||
|
node := nodes[partIdx%len(nodes)]
|
||||||
|
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We can't try all others, because this violates the assumption of how many nodes can fail. With this code it could happen that all chunks reside on one node. I suggest to fail here: we already fail in a similar way for big objects (so there could be garbage). We can't try _all_ others, because this violates the assumption of how many nodes can fail. With this code it could happen that all chunks reside on one node.
I suggest to fail here: we already fail in a similar way for big objects (so there could be garbage).
Later we can improve this (probably `i < len(nodes)` should be replaced with something else, but not `i < parityCount`, so this is not obvious).
dstepanov-yadro
commented
fixed fixed
|
|||||||
|
err = e.writePartLocal(ctx, obj)
|
||||||
|
} else {
|
||||||
|
err = e.writePartRemote(ctx, obj, node)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
var err error
|
||||||
|
localTarget := localTarget{
|
||||||
|
storage: e.cfg.localStore,
|
||||||
|
}
|
||||||
|
completed := make(chan interface{})
|
||||||
|
if poolErr := e.cfg.localPool.Submit(func() {
|
||||||
|
defer close(completed)
|
||||||
|
err = localTarget.WriteObject(ctx, obj, e.objMeta)
|
||||||
|
}); poolErr != nil {
|
||||||
|
close(completed)
|
||||||
|
return poolErr
|
||||||
|
}
|
||||||
|
<-completed
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||||
|
var clientNodeInfo client.NodeInfo
|
||||||
|
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
||||||
|
|
||||||
|
remoteTaget := remoteTarget{
|
||||||
|
privateKey: e.key,
|
||||||
|
clientConstructor: e.cfg.clientConstructor,
|
||||||
|
commonPrm: e.commonPrm,
|
||||||
|
nodeInfo: clientNodeInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
completed := make(chan interface{})
|
||||||
|
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||||
|
defer close(completed)
|
||||||
|
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
|
||||||
|
}); poolErr != nil {
|
||||||
|
close(completed)
|
||||||
|
return poolErr
|
||||||
|
}
|
||||||
|
<-completed
|
||||||
|
return err
|
||||||
|
}
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
@ -25,6 +26,7 @@ import (
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -32,7 +34,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
var (
|
||||||
|
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||||
|
errInvalidECObject = errors.New("object must be splitted to EC parts")
|
||||||
|
)
|
||||||
|
|
||||||
type putSingleRequestSigner struct {
|
type putSingleRequestSigner struct {
|
||||||
req *objectAPI.PutSingleRequest
|
req *objectAPI.PutSingleRequest
|
||||||
|
@ -148,12 +153,20 @@ func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Ob
|
||||||
|
|
||||||
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
||||||
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
placement, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
iter := s.cfg.newNodeIterator(placementOptions)
|
if placement.isEC {
|
||||||
|
return s.saveToECReplicas(ctx, placement, obj, req, meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.saveToREPReplicas(ctx, placement, obj, localOnly, req, meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
|
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
||||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
||||||
|
|
||||||
signer := &putSingleRequestSigner{
|
signer := &putSingleRequestSigner{
|
||||||
|
@ -167,38 +180,83 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
|
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
var result []placement.Option
|
if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
|
||||||
if len(copiesNumber) > 0 {
|
return errInvalidECObject
|
||||||
result = append(result, placement.WithCopyNumbers(copiesNumber))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
key, err := s.cfg.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
signer := &putSingleRequestSigner{
|
||||||
|
req: req,
|
||||||
|
keyStorage: s.keyStorage,
|
||||||
|
signer: &sync.Once{},
|
||||||
|
}
|
||||||
|
|
||||||
|
w := ecWriter{
|
||||||
|
cfg: s.cfg,
|
||||||
|
placementOpts: placement.placementOptions,
|
||||||
|
objMeta: meta,
|
||||||
|
objMetaValid: true,
|
||||||
|
commonPrm: commonPrm,
|
||||||
|
container: placement.container,
|
||||||
|
key: key,
|
||||||
|
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
|
||||||
|
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return w.WriteObject(ctx, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
type putSinglePlacement struct {
|
||||||
|
placementOptions []placement.Option
|
||||||
|
isEC bool
|
||||||
|
container containerSDK.Container
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
||||||
|
var result putSinglePlacement
|
||||||
|
|
||||||
cnrID, ok := obj.ContainerID()
|
cnrID, ok := obj.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("missing container ID")
|
return result, errors.New("missing container ID")
|
||||||
}
|
}
|
||||||
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get container by ID: %w", err)
|
return result, fmt.Errorf("could not get container by ID: %w", err)
|
||||||
}
|
}
|
||||||
result = append(result, placement.ForContainer(cnrInfo.Value))
|
result.container = cnrInfo.Value
|
||||||
|
result.isEC = container.IsECContainer(cnrInfo.Value) && object.IsECSupported(obj)
|
||||||
|
if len(copiesNumber) > 0 && !result.isEC {
|
||||||
|
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
|
||||||
|
}
|
||||||
|
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
||||||
|
|
||||||
objID, ok := obj.ID()
|
objID, ok := obj.ID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("missing object ID")
|
return result, errors.New("missing object ID")
|
||||||
}
|
}
|
||||||
result = append(result, placement.ForObject(objID))
|
if obj.ECHeader() != nil {
|
||||||
|
objID = obj.ECHeader().Parent()
|
||||||
|
}
|
||||||
|
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
|
||||||
|
|
||||||
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get latest network map: %w", err)
|
return result, fmt.Errorf("could not get latest network map: %w", err)
|
||||||
}
|
}
|
||||||
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
||||||
if localOnly {
|
if localOnly {
|
||||||
result = append(result, placement.SuccessAfter(1))
|
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
|
||||||
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
||||||
}
|
}
|
||||||
result = append(result, placement.UseBuilder(builder))
|
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,12 +7,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
)
|
)
|
||||||
|
@ -20,7 +21,7 @@ import (
|
||||||
type Streamer struct {
|
type Streamer struct {
|
||||||
*cfg
|
*cfg
|
||||||
|
|
||||||
sessionKey *ecdsa.PrivateKey
|
privateKey *ecdsa.PrivateKey
|
||||||
|
|
||||||
target transformer.ChunkedObjectWriter
|
target transformer.ChunkedObjectWriter
|
||||||
|
|
||||||
|
@ -77,9 +78,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||||
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||||
p.relay = prm.relay
|
p.relay = prm.relay
|
||||||
|
|
||||||
|
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.privateKey = nodeKey
|
||||||
|
|
||||||
// prepare untrusted-Put object target
|
// prepare untrusted-Put object target
|
||||||
p.target = &validatingPreparedTarget{
|
p.target = &validatingPreparedTarget{
|
||||||
nextTarget: p.newCommonTarget(prm),
|
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
|
|
||||||
maxPayloadSz: p.maxPayloadSz,
|
maxPayloadSz: p.maxPayloadSz,
|
||||||
|
@ -103,7 +110,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sessionKey, err := p.keyStorage.GetKey(sessionInfo)
|
key, err := p.keyStorage.GetKey(sessionInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
||||||
}
|
}
|
||||||
|
@ -117,7 +124,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
|
|
||||||
if sToken == nil {
|
if sToken == nil {
|
||||||
var ownerSession user.ID
|
var ownerSession user.ID
|
||||||
user.IDFromKey(&ownerSession, sessionKey.PublicKey)
|
user.IDFromKey(&ownerSession, key.PublicKey)
|
||||||
|
|
||||||
if !ownerObj.Equals(ownerSession) {
|
if !ownerObj.Equals(ownerSession) {
|
||||||
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
|
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
|
||||||
|
@ -128,12 +135,12 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.sessionKey = sessionKey
|
p.privateKey = key
|
||||||
p.target = &validatingTarget{
|
p.target = &validatingTarget{
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
Key: sessionKey,
|
Key: key,
|
||||||
NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) },
|
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
|
||||||
NetworkState: p.networkState,
|
NetworkState: p.networkState,
|
||||||
MaxSize: p.maxPayloadSz,
|
MaxSize: p.maxPayloadSz,
|
||||||
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
||||||
|
@ -172,7 +179,12 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
placement.ForContainer(prm.cnr),
|
placement.ForContainer(prm.cnr),
|
||||||
)
|
)
|
||||||
|
|
||||||
if id, ok := prm.hdr.ID(); ok {
|
if ech := prm.hdr.ECHeader(); ech != nil {
|
||||||
|
prm.traverseOpts = append(prm.traverseOpts,
|
||||||
|
// set identifier of the processing object
|
||||||
|
placement.ForObject(ech.Parent()),
|
||||||
|
)
|
||||||
|
} else if id, ok := prm.hdr.ID(); ok {
|
||||||
prm.traverseOpts = append(prm.traverseOpts,
|
prm.traverseOpts = append(prm.traverseOpts,
|
||||||
// set identifier of the processing object
|
// set identifier of the processing object
|
||||||
placement.ForObject(id),
|
placement.ForObject(id),
|
||||||
|
@ -196,7 +208,14 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||||
|
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
|
||||||
|
return p.newECWriter(prm)
|
||||||
|
}
|
||||||
|
return p.newDefaultObjectWriter(prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||||
var relay func(context.Context, nodeDesc) error
|
var relay func(context.Context, nodeDesc) error
|
||||||
if p.relay != nil {
|
if p.relay != nil {
|
||||||
relay = func(ctx context.Context, node nodeDesc) error {
|
relay = func(ctx context.Context, node nodeDesc) error {
|
||||||
|
@ -213,16 +232,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// enable additional container broadcast on non-local operation
|
|
||||||
// if object has TOMBSTONE or LOCK type.
|
|
||||||
typ := prm.hdr.Type()
|
|
||||||
withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock)
|
|
||||||
|
|
||||||
return &distributedTarget{
|
return &distributedTarget{
|
||||||
cfg: p.cfg,
|
cfg: p.cfg,
|
||||||
placementOpts: prm.traverseOpts,
|
placementOpts: prm.traverseOpts,
|
||||||
extraBroadcastEnabled: withBroadcast,
|
|
||||||
payload: getPayload(),
|
|
||||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||||
if node.local {
|
if node.local {
|
||||||
return localTarget{
|
return localTarget{
|
||||||
|
@ -231,7 +243,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
||||||
}
|
}
|
||||||
|
|
||||||
rt := &remoteTarget{
|
rt := &remoteTarget{
|
||||||
privateKey: p.sessionKey,
|
privateKey: p.privateKey,
|
||||||
commonPrm: prm.common,
|
commonPrm: prm.common,
|
||||||
clientConstructor: p.clientConstructor,
|
clientConstructor: p.clientConstructor,
|
||||||
}
|
}
|
||||||
|
@ -244,6 +256,20 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||||
|
return &objectWriterDispatcher{
|
||||||
|
ecWriter: &ecWriter{
|
||||||
fyrchik
commented
What happens if I have What happens if I have `EC 2+1` and 4 nodes in the container?
dstepanov-yadro
commented
What exactly are you asking? EC chunks will be saved on 3 of 4 container nodes. What exactly are you asking? EC chunks will be saved on 3 of 4 container nodes.
|
|||||||
|
cfg: p.cfg,
|
||||||
|
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||||
|
container: prm.cnr,
|
||||||
|
key: p.privateKey,
|
||||||
|
commonPrm: prm.common,
|
||||||
|
relay: p.relay,
|
||||||
|
},
|
||||||
|
repWriter: p.newDefaultObjectWriter(prm),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
|
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
|
||||||
if p.target == nil {
|
if p.target == nil {
|
||||||
return errNotInit
|
return errNotInit
|
||||||
|
|
23
pkg/services/object/put/writer.go
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ transformer.ObjectWriter = (*objectWriterDispatcher)(nil)
|
||||||
|
|
||||||
|
type objectWriterDispatcher struct {
|
||||||
|
ecWriter transformer.ObjectWriter
|
||||||
|
repWriter transformer.ObjectWriter
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
1. I don't see any reason to keep `repWriter` within `multiObjectWriter` at least because you don't use `newECWriter` if container does not support EC policy. So, it seems it always skips `m.repWriter.WriteObject(ctx, obj)`. Please, fix me, if I am incorrect
2. `multi-` prefix sounds like the object writer is able to *choose* how to write an object to a container: either by `EC` placement or by `REP` although a container adheres to single placement. What do you think about the idea to use `objectWriterDispatcher` name instead?
dstepanov-yadro
commented
1. EC container stores some objects (tombstones, locks, linking objects) as regular objects: these objects must be saved on every container node without EC splitting.
2. done
aarifullin
commented
Thanks for explanation UPD: btw, I incorrectly read your code
I didn't notice you pass header to > EC container stores some objects (tombstones, locks, linking objects) as regular objects: these objects must be saved on every container node without EC splitting.
Thanks for explanation
UPD: btw, I incorrectly read your code
```go
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr)
```
I didn't notice you pass header to `IsECSupported` - that is why I've got confused. But for now it is OK 👍
|
|||||||
|
}
|
||||||
|
|
||||||
|
func (m *objectWriterDispatcher) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
if object.IsECSupported(obj) {
|
||||||
|
return m.ecWriter.WriteObject(ctx, obj)
|
||||||
|
}
|
||||||
|
return m.repWriter.WriteObject(ctx, obj)
|
||||||
|
}
|
|
@ -137,7 +137,7 @@ func defaultCopiesVector(policy netmap.PlacementPolicy) []int {
|
||||||
copyVector := make([]int, 0, replNum)
|
copyVector := make([]int, 0, replNum)
|
||||||
|
|
||||||
for i := 0; i < replNum; i++ {
|
for i := 0; i < replNum; i++ {
|
||||||
copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()))
|
copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()+policy.ReplicaDescriptor(i).GetECDataCount()+policy.ReplicaDescriptor(i).GetECParityCount()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return copyVector
|
return copyVector
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -40,6 +41,10 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
}
|
}
|
||||||
|
|
||||||
policy := cnr.Value.PlacementPolicy()
|
policy := cnr.Value.PlacementPolicy()
|
||||||
|
if policycore.IsECPlacement(policy) {
|
||||||
|
// EC not supported yet by policer
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
What do you mean by this comment?
ec_writer
supports only regular objects. Linking objects, tombstones and locks write should be handled bydefaultWriter
(akarepWriter
).The current implementation satisfies this condition. It's just an additional check and foolproof.