diff --git a/.circleci/config.yml b/.circleci/config.yml index 24f7d9cff7..590d9cbe19 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -443,11 +443,20 @@ jobs: - checkout - setup_remote_docker: docker_layer_caching: true + # Build zenithdb/compute-tools:latest image and push it to Docker hub + # TODO: this should probably also use versioned tag, not just :latest. + # XXX: but should it? We build and use it only locally now. + - run: + name: Build and push compute-tools Docker image + command: | + echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin + docker build -t zenithdb/compute-tools:latest ./compute_tools/ + docker push zenithdb/compute-tools:latest - run: name: Init postgres submodule command: git submodule update --init --depth 1 - run: - name: Build and push Docker image + name: Build and push compute-node Docker image command: | echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin DOCKER_TAG=$(git log --oneline|wc -l) @@ -562,6 +571,55 @@ jobs: } }" + # + # + # compute-tools jobs + # TODO: unify with main build_and_test pipeline + # + # + compute-tools-test: + executor: zenith-build-executor + working_directory: ~/repo/compute_tools + steps: + - checkout: + path: ~/repo + + - restore_cache: + name: Restore rust cache + keys: + # Require an exact match. While an out of date cache might speed up the build, + # there's no way to clean out old packages, so the cache grows every time something + # changes. + - v03-rust-cache-deps-debug-{{ checksum "Cargo.lock" }} + + # Build the rust code, including test binaries + - run: + name: Rust build + environment: + CARGO_INCREMENTAL: 0 + command: cargo build --bins --tests + + - save_cache: + name: Save rust cache + key: v03-rust-cache-deps-debug-{{ checksum "Cargo.lock" }} + paths: + - ~/.cargo/registry + - ~/.cargo/git + - target + + # Run Rust formatting checks + - run: + name: cargo fmt check + command: cargo fmt --all -- --check + + # Run Rust linter (clippy) + - run: + name: cargo clippy check + command: cargo clippy --all --all-targets -- -Dwarnings -Drust-2018-idioms + + # Run Rust integration and unittests + - run: cargo test + workflows: build_and_test: jobs: @@ -610,6 +668,7 @@ workflows: requires: # TODO: consider adding more - other-tests-debug + - compute-tools-test - docker-image: # Context gives an ability to login context: Docker Hub @@ -632,6 +691,7 @@ workflows: requires: - pg_regress-tests-release - other-tests-release + - compute-tools-test - deploy-staging: # Context gives an ability to login context: Docker Hub diff --git a/compute_tools/.dockerignore b/compute_tools/.dockerignore new file mode 100644 index 0000000000..eb5a316cbd --- /dev/null +++ b/compute_tools/.dockerignore @@ -0,0 +1 @@ +target diff --git a/compute_tools/.gitignore b/compute_tools/.gitignore new file mode 100644 index 0000000000..eb5a316cbd --- /dev/null +++ b/compute_tools/.gitignore @@ -0,0 +1 @@ +target diff --git a/compute_tools/Cargo.lock b/compute_tools/Cargo.lock new file mode 100644 index 0000000000..6c8ef30d1f --- /dev/null +++ b/compute_tools/Cargo.lock @@ -0,0 +1,1161 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + +[[package]] +name = "anyhow" +version = "1.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf" + +[[package]] +name = "async-trait" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "time", + "winapi", +] + +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "compute_tools" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "env_logger", + "hyper", + "libc", + "log", + "postgres", + "regex", + "serde", + "serde_json", + "tar", + "tokio", +] + +[[package]] +name = "cpufeatures" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-mac" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bff07008ec701e8028e2ceb8f83f0e4274ee62bd2dbdc4fefff2e9a91824081a" +dependencies = [ + "generic-array", + "subtle", +] + +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + +[[package]] +name = "env_logger" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "filetime" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "975ccf83d8d9d0d84682850a38c8169027be83368805971cc4f238c2b245bc98" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "winapi", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" + +[[package]] +name = "futures-executor" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" + +[[package]] +name = "futures-macro" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb" +dependencies = [ + "autocfg", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11" + +[[package]] +name = "futures-task" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" + +[[package]] +name = "futures-util" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" +dependencies = [ + "autocfg", + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "h2" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c06815895acec637cd6ed6e9662c935b866d20a106f8361892893a7d9234964" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "hmac" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" +dependencies = [ + "crypto-mac", + "digest", +] + +[[package]] +name = "http" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" + +[[package]] +name = "httpdate" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "hyper" +version = "0.14.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "indexmap" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "instant" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" + +[[package]] +name = "lock_api" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", + "serde", +] + +[[package]] +name = "md-5" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" +dependencies = [ + "block-buffer", + "digest", + "opaque-debug", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "mio" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" + +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "phf" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +dependencies = [ + "siphasher", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "postgres" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7871ee579860d8183f542e387b176a25f2656b9fb5211e045397f745a68d1c2" +dependencies = [ + "bytes", + "fallible-iterator", + "futures", + "log", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" +dependencies = [ + "base64", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" + +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + +[[package]] +name = "proc-macro2" +version = "1.0.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" +dependencies = [ + "rand_core", +] + +[[package]] +name = "redox_syscall" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "serde" +version = "1.0.130" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.130" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7f9e390c27c3c0ce8bc5d725f6e4d30a29d26659494aa4b17535f7522c5c950" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sha2" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9204c41a1597a8c5af23c82d1c921cb01ec0a4c59e07a9c7306062829a3903f3" +dependencies = [ + "block-buffer", + "cfg-if", + "cpufeatures", + "digest", + "opaque-debug", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "siphasher" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b" + +[[package]] +name = "slab" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590" + +[[package]] +name = "smallvec" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" + +[[package]] +name = "socket2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + +[[package]] +name = "syn" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f58f7e8eaa0009c5fec437aabf511bd9933e4b2d7407bd05273c01a8906ea7" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "tar" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6f5515d3add52e0bbdcad7b83c388bb36ba7b754dda3b5f5bc2d38640cdba5c" +dependencies = [ + "filetime", + "libc", + "xattr", +] + +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "tinyvec" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "848a1e1181b9f6753b5e96a092749e29b11d19ede67dfbbd6c7dc7e0f49b5338" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" + +[[package]] +name = "tokio" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4efe6fc2395938c8155973d7be49fe8d03a843726e285e100a8a383cc0154ce" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "154794c8f499c2619acd19e839294703e9e32e7630ef5f46ea80d4ef0fbee5eb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-postgres" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2b1383c7e4fb9a09e292c7c6afb7da54418d53b045f1c1fac7a911411a2b8b" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "typenum" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec" + +[[package]] +name = "unicode-bidi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "246f4c42e67e7a4e3c6106ff716a5d067d4132a642840b242e357e468a2a0085" + +[[package]] +name = "unicode-normalization" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + +[[package]] +name = "unicode-xid" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + +[[package]] +name = "version_check" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" + +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "xattr" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" +dependencies = [ + "libc", +] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml new file mode 100644 index 0000000000..7116ca73c9 --- /dev/null +++ b/compute_tools/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "compute_tools" +version = "0.1.0" +authors = ["Alexey Kondratov "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[workspace] +# TODO: make it a part of global zenith worksapce + +[dependencies] +libc = "0.2" +anyhow = "1.0" +chrono = "0.4" +clap = "2.33" +env_logger = "0.8" +hyper = { version = "0.14", features = ["full"] } +log = { version = "0.4", features = ["std", "serde"] } +postgres = "0.19" +regex = "1" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tar = "0.4" +tokio = { version = "1", features = ["full"] } + +[profile.release] +debug = true diff --git a/compute_tools/Dockerfile b/compute_tools/Dockerfile new file mode 100644 index 0000000000..4a1efaf16c --- /dev/null +++ b/compute_tools/Dockerfile @@ -0,0 +1,14 @@ +# First transient image to build compute_tools binaries +FROM rust:slim-buster AS rust-build + +RUN mkdir /compute_tools +WORKDIR /compute_tools + +COPY . /compute_tools/ + +RUN cargo build --release + +# Final image that only has one binary +FROM debian:buster-slim + +COPY --from=rust-build /compute_tools/target/release/zenith_ctl /usr/local/bin/zenith_ctl diff --git a/compute_tools/README.md b/compute_tools/README.md new file mode 100644 index 0000000000..ccae3d2842 --- /dev/null +++ b/compute_tools/README.md @@ -0,0 +1,81 @@ +# Compute node tools + +Postgres wrapper (`zenith_ctl`) is intended to be run as a Docker entrypoint or as a `systemd` +`ExecStart` option. It will handle all the `zenith` specifics during compute node +initialization: +- `zenith_ctl` accepts cluster (compute node) specification as a JSON file. +- Every start is a fresh start, so the data directory is removed and + initialized again on each run. +- Next it will put configuration files into the `PGDATA` directory. +- Sync safekeepers and get commit LSN. +- Get `basebackup` from pageserver using the returned on the previous step LSN. +- Try to start `postgres` and wait until it is ready to accept connections. +- Check and alter/drop/create roles and databases. +- Hang waiting on the `postmaster` process to exit. + +Also `zenith_ctl` spawns two separate service threads: +- `compute-monitor` checks the last Postgres activity timestamp and saves it + into the shared `ComputeState`; +- `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the + last activity requests. + +Usage example: +```sh +zenith_ctl -D /var/db/postgres/compute \ + -C 'postgresql://zenith_admin@localhost/postgres' \ + -S /var/db/postgres/specs/current.json \ + -b /usr/local/bin/postgres +``` + +## Tests + +Cargo formatter: +```sh +cargo fmt +``` + +Run tests: +```sh +cargo test +``` + +Clippy linter: +```sh +cargo clippy --all --all-targets -- -Dwarnings -Drust-2018-idioms +``` + +## Cross-platform compilation + +Imaging that you are on macOS (x86) and you want a Linux GNU (`x86_64-unknown-linux-gnu` platform in `rust` terminology) executable. + +### Using docker + +You can use a throw-away Docker container ([rustlang/rust](https://hub.docker.com/r/rustlang/rust/) image) for doing that: +```sh +docker run --rm \ + -v $(pwd):/compute_tools \ + -w /compute_tools \ + -t rustlang/rust:nightly cargo build --release --target=x86_64-unknown-linux-gnu +``` +or one-line: +```sh +docker run --rm -v $(pwd):/compute_tools -w /compute_tools -t rust:latest cargo build --release --target=x86_64-unknown-linux-gnu +``` + +### Using rust native cross-compilation + +Another way is to add `x86_64-unknown-linux-gnu` target on your host system: +```sh +rustup target add x86_64-unknown-linux-gnu +``` + +Install macOS cross-compiler toolchain: +```sh +brew tap SergioBenitez/osxct +brew install x86_64-unknown-linux-gnu +``` + +And finally run `cargo build`: +```sh +CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=x86_64-unknown-linux-gnu-gcc cargo build --target=x86_64-unknown-linux-gnu --release +``` diff --git a/compute_tools/rustfmt.toml b/compute_tools/rustfmt.toml new file mode 100644 index 0000000000..758d4179d7 --- /dev/null +++ b/compute_tools/rustfmt.toml @@ -0,0 +1 @@ +max_width = 100 diff --git a/compute_tools/src/bin/zenith_ctl.rs b/compute_tools/src/bin/zenith_ctl.rs new file mode 100644 index 0000000000..41288130fc --- /dev/null +++ b/compute_tools/src/bin/zenith_ctl.rs @@ -0,0 +1,251 @@ +//! +//! Postgres wrapper (`zenith_ctl`) is intended to be run as a Docker entrypoint or as a `systemd` +//! `ExecStart` option. It will handle all the `zenith` specifics during compute node +//! initialization: +//! - `zenith_ctl` accepts cluster (compute node) specification as a JSON file. +//! - Every start is a fresh start, so the data directory is removed and +//! initialized again on each run. +//! - Next it will put configuration files into the `PGDATA` directory. +//! - Sync safekeepers and get commit LSN. +//! - Get `basebackup` from pageserver using the returned on the previous step LSN. +//! - Try to start `postgres` and wait until it is ready to accept connections. +//! - Check and alter/drop/create roles and databases. +//! - Hang waiting on the `postmaster` process to exit. +//! +//! Also `zenith_ctl` spawns two separate service threads: +//! - `compute-monitor` checks the last Postgres activity timestamp and saves it +//! into the shared `ComputeState`; +//! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the +//! last activity requests. +//! +//! Usage example: +//! ```sh +//! zenith_ctl -D /var/db/postgres/compute \ +//! -C 'postgresql://zenith_admin@localhost/postgres' \ +//! -S /var/db/postgres/specs/current.json \ +//! -b /usr/local/bin/postgres +//! ``` +//! +use std::fs::File; +use std::path::Path; +use std::process::{exit, Command, ExitStatus}; +use std::sync::{Arc, RwLock}; +use std::{env, panic}; + +use anyhow::Result; +use chrono::Utc; +use libc::{prctl, PR_SET_PDEATHSIG, SIGINT}; +use log::info; +use postgres::{Client, NoTls}; + +use compute_tools::config; +use compute_tools::http_api::launch_http_server; +use compute_tools::logger::*; +use compute_tools::monitor::launch_monitor; +use compute_tools::params::*; +use compute_tools::pg_helpers::*; +use compute_tools::spec::*; +use compute_tools::zenith::*; + +/// Do all the preparations like PGDATA directory creation, configuration, +/// safekeepers sync, basebackup, etc. +fn prepare_pgdata(state: &Arc>) -> Result<()> { + let state = state.read().unwrap(); + let spec = &state.spec; + let pgdata_path = Path::new(&state.pgdata); + let pageserver_connstr = spec + .cluster + .settings + .find("zenith.page_server_connstring") + .expect("pageserver connstr should be provided"); + let tenant = spec + .cluster + .settings + .find("zenith.zenith_tenant") + .expect("tenant id should be provided"); + let timeline = spec + .cluster + .settings + .find("zenith.zenith_timeline") + .expect("tenant id should be provided"); + + info!( + "applying spec for cluster #{}, operation #{}", + spec.cluster.cluster_id, + spec.operation_uuid.as_ref().unwrap() + ); + + // Remove/create an empty pgdata directory and put configuration there. + create_pgdata(&state.pgdata)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; + + info!("starting safekeepers syncing"); + let lsn = sync_safekeepers(&state.pgdata, &state.pgbin)?; + info!("safekeepers synced at LSN {}", lsn); + + get_basebackup(&state.pgdata, &pageserver_connstr, &tenant, &timeline, &lsn)?; + // Update pg_hba.conf received with basebackup. + update_pg_hba(pgdata_path)?; + + Ok(()) +} + +/// Start Postgres as a child process and manage DBs/roles. +/// After that this will hang waiting on the postmaster process to exit. +fn run_compute(state: &Arc>) -> Result { + let read_state = state.read().unwrap(); + let pgdata_path = Path::new(&read_state.pgdata); + + // Run postgres as a child process. + let mut pg = Command::new(&read_state.pgbin) + .args(&["-D", &read_state.pgdata]) + .spawn() + .expect("cannot start postgres process"); + + // Try default Postgres port if it is not provided + let port = read_state + .spec + .cluster + .settings + .find("port") + .unwrap_or_else(|| "5432".to_string()); + wait_for_postgres(&port, pgdata_path)?; + + let mut client = Client::connect(&read_state.connstr, NoTls)?; + + handle_roles(&read_state.spec, &mut client)?; + handle_databases(&read_state.spec, &mut client)?; + + // 'Close' connection + drop(client); + + info!( + "finished configuration of cluster #{}", + read_state.spec.cluster.cluster_id + ); + + // Release the read lock. + drop(read_state); + + // Get the write lock, update state and release the lock, so HTTP API + // was able to serve requests, while we are blocked waiting on + // Postgres. + let mut state = state.write().unwrap(); + state.ready = true; + drop(state); + + // Wait for child postgres process basically forever. In this state Ctrl+C + // will be propagated to postgres and it will be shut down as well. + let ecode = pg.wait().expect("failed to wait on postgres"); + + Ok(ecode) +} + +fn main() -> Result<()> { + // During configuration we are starting Postgres as a child process. If we + // fail we do not want to leave it running. PR_SET_PDEATHSIG sets the signal + // that will be sent to the child process when the parent dies. NB: this is + // cleared for the child of a fork(). SIGINT means fast shutdown for Postgres. + // This does not matter much for Docker, where `zenith_ctl` is an entrypoint, + // so the whole container will exit if it exits. But could be useful when + // `zenith_ctl` is used in e.g. systemd. + unsafe { + prctl(PR_SET_PDEATHSIG, SIGINT); + } + + // TODO: re-use `zenith_utils::logging` later + init_logger(DEFAULT_LOG_LEVEL)?; + + let matches = clap::App::new("zenith_ctl") + .version("0.1.0") + .arg( + clap::Arg::with_name("connstr") + .short("C") + .long("connstr") + .value_name("DATABASE_URL") + .required(true), + ) + .arg( + clap::Arg::with_name("pgdata") + .short("D") + .long("pgdata") + .value_name("DATADIR") + .required(true), + ) + .arg( + clap::Arg::with_name("pgbin") + .short("b") + .long("pgbin") + .value_name("POSTGRES_PATH"), + ) + .arg( + clap::Arg::with_name("spec") + .short("s") + .long("spec") + .value_name("SPEC_JSON"), + ) + .arg( + clap::Arg::with_name("spec-path") + .short("S") + .long("spec-path") + .value_name("SPEC_PATH"), + ) + .get_matches(); + + let pgdata = matches.value_of("pgdata").expect("PGDATA path is required"); + let connstr = matches + .value_of("connstr") + .expect("Postgres connection string is required"); + let spec = matches.value_of("spec"); + let spec_path = matches.value_of("spec-path"); + + // Try to use just 'postgres' if no path is provided + let pgbin = matches.value_of("pgbin").unwrap_or("postgres"); + + let spec: ClusterSpec = match spec { + // First, try to get cluster spec from the cli argument + Some(json) => serde_json::from_str(json)?, + None => { + // Second, try to read it from the file if path is provided + if let Some(sp) = spec_path { + let path = Path::new(sp); + let file = File::open(path)?; + serde_json::from_reader(file)? + } else { + // Finally, try to fetch it from the env + // XXX: not tested well and kept as a backup option for k8s, Docker, etc. + // TODO: remove later + match env::var("CLUSTER_SPEC") { + Ok(json) => serde_json::from_str(&json)?, + Err(_) => panic!("cluster spec should be provided via --spec, --spec-path or env variable CLUSTER_SPEC") + } + } + } + }; + + let compute_state = ComputeState { + connstr: connstr.to_string(), + pgdata: pgdata.to_string(), + pgbin: pgbin.to_string(), + spec, + ready: false, + last_active: Utc::now(), + }; + let compute_state = Arc::new(RwLock::new(compute_state)); + + // Launch service threads first, so we were able to serve availability + // requests, while configuration is still in progress. + let mut _threads = vec![ + launch_http_server(&compute_state).expect("cannot launch compute monitor thread"), + launch_monitor(&compute_state).expect("cannot launch http endpoint thread"), + ]; + + prepare_pgdata(&compute_state)?; + + // Run compute (Postgres) and hang waiting on it. Panic if any error happens, + // it will help us to trigger unwind and kill postmaster as well. + match run_compute(&compute_state) { + Ok(ec) => exit(ec.success() as i32), + Err(error) => panic!("cannot start compute node, error: {}", error), + } +} diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs new file mode 100644 index 0000000000..22134db0f8 --- /dev/null +++ b/compute_tools/src/config.rs @@ -0,0 +1,51 @@ +use std::fs::{File, OpenOptions}; +use std::io; +use std::io::prelude::*; +use std::path::Path; + +use anyhow::Result; + +use crate::pg_helpers::PgOptionsSerialize; +use crate::zenith::ClusterSpec; + +/// Check that `line` is inside a text file and put it there if it is not. +/// Create file if it doesn't exist. +pub fn line_in_file(path: &Path, line: &str) -> Result { + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .append(false) + .open(path)?; + let buf = io::BufReader::new(&file); + let mut count: usize = 0; + + for l in buf.lines() { + if l? == line { + return Ok(false); + } + count = 1; + } + + write!(file, "{}{}", "\n".repeat(count), line)?; + Ok(true) +} + +/// Create or completely rewrite configuration file specified by `path` +pub fn write_postgres_conf(path: &Path, spec: &ClusterSpec) -> Result<()> { + // File::create() destroys the file content if it exists. + let mut postgres_conf = File::create(path)?; + + write_zenith_managed_block(&mut postgres_conf, &spec.cluster.settings.as_pg_settings())?; + + Ok(()) +} + +// Write Postgres config block wrapped with generated comment section +fn write_zenith_managed_block(file: &mut File, buf: &str) -> Result<()> { + writeln!(file, "# Managed by Zenith: begin")?; + writeln!(file, "{}", buf)?; + writeln!(file, "# Managed by Zenith: end")?; + + Ok(()) +} diff --git a/compute_tools/src/http_api.rs b/compute_tools/src/http_api.rs new file mode 100644 index 0000000000..02fab08a6e --- /dev/null +++ b/compute_tools/src/http_api.rs @@ -0,0 +1,73 @@ +use std::convert::Infallible; +use std::net::SocketAddr; +use std::sync::{Arc, RwLock}; +use std::thread; + +use anyhow::Result; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use log::{error, info}; + +use crate::zenith::*; + +// Service function to handle all available routes. +fn routes(req: Request, state: Arc>) -> Response { + match (req.method(), req.uri().path()) { + // Timestamp of the last Postgres activity in the plain text. + (&Method::GET, "/last_activity") => { + info!("serving /last_active GET request"); + let state = state.read().unwrap(); + + // Use RFC3339 format for consistency. + Response::new(Body::from(state.last_active.to_rfc3339())) + } + + // Has compute setup process finished? -> true/false + (&Method::GET, "/ready") => { + info!("serving /ready GET request"); + let state = state.read().unwrap(); + Response::new(Body::from(format!("{}", state.ready))) + } + + // Return the `404 Not Found` for any other routes. + _ => { + let mut not_found = Response::new(Body::from("404 Not Found")); + *not_found.status_mut() = StatusCode::NOT_FOUND; + not_found + } + } +} + +// Main Hyper HTTP server function that runs it and blocks waiting on it forever. +#[tokio::main] +async fn serve(state: Arc>) { + let addr = SocketAddr::from(([0, 0, 0, 0], 3080)); + + let make_service = make_service_fn(move |_conn| { + let state = state.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req: Request| { + let state = state.clone(); + async move { Ok::<_, Infallible>(routes(req, state)) } + })) + } + }); + + info!("starting HTTP server on {}", addr); + + let server = Server::bind(&addr).serve(make_service); + + // Run this server forever + if let Err(e) = server.await { + error!("server error: {}", e); + } +} + +/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`. +pub fn launch_http_server(state: &Arc>) -> Result> { + let state = Arc::clone(state); + + Ok(thread::Builder::new() + .name("http-endpoint".into()) + .spawn(move || serve(state))?) +} diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs new file mode 100644 index 0000000000..592011d95e --- /dev/null +++ b/compute_tools/src/lib.rs @@ -0,0 +1,13 @@ +//! +//! Various tools and helpers to handle cluster / compute node (Postgres) +//! configuration. +//! +pub mod config; +pub mod http_api; +#[macro_use] +pub mod logger; +pub mod monitor; +pub mod params; +pub mod pg_helpers; +pub mod spec; +pub mod zenith; diff --git a/compute_tools/src/logger.rs b/compute_tools/src/logger.rs new file mode 100644 index 0000000000..dde0a950f8 --- /dev/null +++ b/compute_tools/src/logger.rs @@ -0,0 +1,43 @@ +use std::io::Write; + +use anyhow::Result; +use chrono::Utc; +use env_logger::{Builder, Env}; + +macro_rules! info_println { + ($($tts:tt)*) => { + if log_enabled!(Level::Info) { + println!($($tts)*); + } + } +} + +macro_rules! info_print { + ($($tts:tt)*) => { + if log_enabled!(Level::Info) { + print!($($tts)*); + } + } +} + +/// Initialize `env_logger` using either `default_level` or +/// `RUST_LOG` environment variable as default log level. +pub fn init_logger(default_level: &str) -> Result<()> { + let env = Env::default().filter_or("RUST_LOG", default_level); + + Builder::from_env(env) + .format(|buf, record| { + let thread_handle = std::thread::current(); + writeln!( + buf, + "{} [{}] {}: {}", + Utc::now().format("%Y-%m-%d %H:%M:%S%.3f %Z"), + thread_handle.name().unwrap_or("main"), + record.level(), + record.args() + ) + }) + .init(); + + Ok(()) +} diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs new file mode 100644 index 0000000000..596981b2d2 --- /dev/null +++ b/compute_tools/src/monitor.rs @@ -0,0 +1,109 @@ +use std::sync::{Arc, RwLock}; +use std::{thread, time}; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use log::{debug, info}; +use postgres::{Client, NoTls}; + +use crate::zenith::ComputeState; + +const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds + +// Spin in a loop and figure out the last activity time in the Postgres. +// Then update it in the shared state. This function never errors out. +// XXX: the only expected panic is at `RwLock` unwrap(). +fn watch_compute_activity(state: &Arc>) { + // Suppose that `connstr` doesn't change + let connstr = state.read().unwrap().connstr.clone(); + // Define `client` outside of the loop to reuse existing connection if it's active. + let mut client = Client::connect(&connstr, NoTls); + let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL); + + info!("watching Postgres activity at {}", connstr); + + loop { + // Should be outside of the write lock to allow others to read while we sleep. + thread::sleep(timeout); + + match &mut client { + Ok(cli) => { + if cli.is_closed() { + info!("connection to postgres closed, trying to reconnect"); + + // Connection is closed, reconnect and try again. + client = Client::connect(&connstr, NoTls); + continue; + } + + // Get all running client backends except ourself, use RFC3339 DateTime format. + let backends = cli + .query( + "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change + FROM pg_stat_activity + WHERE backend_type = 'client backend' + AND pid != pg_backend_pid() + AND usename != 'zenith_admin';", // XXX: find a better way to filter other monitors? + &[], + ); + let mut last_active = state.read().unwrap().last_active; + + if let Ok(backs) = backends { + let mut idle_backs: Vec> = vec![]; + + for b in backs.into_iter() { + let state: String = b.get("state"); + let change: String = b.get("state_change"); + + if state == "idle" { + let change = DateTime::parse_from_rfc3339(&change); + match change { + Ok(t) => idle_backs.push(t.with_timezone(&Utc)), + Err(e) => { + info!("cannot parse backend state_change DateTime: {}", e); + continue; + } + } + } else { + // Found non-idle backend, so the last activity is NOW. + // Save it and exit the for loop. Also clear the idle backend + // `state_change` timestamps array as it doesn't matter now. + last_active = Utc::now(); + idle_backs.clear(); + break; + } + } + + // Sort idle backend `state_change` timestamps. The last one corresponds + // to the last activity. + idle_backs.sort(); + if let Some(last) = idle_backs.last() { + last_active = *last; + } + } + + // Update the last activity in the shared state if we got a more recent one. + let mut state = state.write().unwrap(); + if last_active > state.last_active { + state.last_active = last_active; + debug!("set the last compute activity time to: {}", last_active); + } + } + Err(e) => { + info!("cannot connect to postgres: {}, retrying", e); + + // Establish a new connection and try again. + client = Client::connect(&connstr, NoTls); + } + } + } +} + +/// Launch a separate compute monitor thread and return its `JoinHandle`. +pub fn launch_monitor(state: &Arc>) -> Result> { + let state = Arc::clone(state); + + Ok(thread::Builder::new() + .name("compute-monitor".into()) + .spawn(move || watch_compute_activity(&state))?) +} diff --git a/compute_tools/src/params.rs b/compute_tools/src/params.rs new file mode 100644 index 0000000000..925a2f8ef3 --- /dev/null +++ b/compute_tools/src/params.rs @@ -0,0 +1,3 @@ +pub const DEFAULT_LOG_LEVEL: &str = "info"; +pub const DEFAULT_CONNSTRING: &str = "host=localhost user=postgres"; +pub const PG_HBA_ALL_MD5: &str = "host\tall\t\tall\t\t0.0.0.0/0\t\tmd5"; diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs new file mode 100644 index 0000000000..940e3adebc --- /dev/null +++ b/compute_tools/src/pg_helpers.rs @@ -0,0 +1,264 @@ +use std::net::{SocketAddr, TcpStream}; +use std::os::unix::fs::PermissionsExt; +use std::path::Path; +use std::process::Command; +use std::str::FromStr; +use std::{fs, thread, time}; + +use anyhow::{anyhow, Result}; +use postgres::{Client, Transaction}; +use serde::Deserialize; + +const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds + +/// Rust representation of Postgres role info with only those fields +/// that matter for us. +#[derive(Clone, Deserialize)] +pub struct Role { + pub name: PgIdent, + pub encrypted_password: Option, + pub options: GenericOptions, +} + +/// Rust representation of Postgres database info with only those fields +/// that matter for us. +#[derive(Clone, Deserialize)] +pub struct Database { + pub name: PgIdent, + pub owner: PgIdent, + pub options: GenericOptions, +} + +/// Common type representing both SQL statement params with or without value, +/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config +/// options like `wal_level = logical`. +#[derive(Clone, Deserialize)] +pub struct GenericOption { + pub name: String, + pub value: Option, + pub vartype: String, +} + +/// Optional collection of `GenericOption`'s. Type alias allows us to +/// declare a `trait` on it. +pub type GenericOptions = Option>; + +impl GenericOption { + /// Represent `GenericOption` as SQL statement parameter. + pub fn to_pg_option(&self) -> String { + if let Some(val) = &self.value { + match self.vartype.as_ref() { + "string" => format!("{} '{}'", self.name, val), + _ => format!("{} {}", self.name, val), + } + } else { + self.name.to_owned() + } + } + + /// Represent `GenericOption` as configuration option. + pub fn to_pg_setting(&self) -> String { + if let Some(val) = &self.value { + match self.vartype.as_ref() { + "string" => format!("{} = '{}'", self.name, val), + _ => format!("{} = {}", self.name, val), + } + } else { + self.name.to_owned() + } + } +} + +pub trait PgOptionsSerialize { + fn as_pg_options(&self) -> String; + fn as_pg_settings(&self) -> String; +} + +impl PgOptionsSerialize for GenericOptions { + /// Serialize an optional collection of `GenericOption`'s to + /// Postgres SQL statement arguments. + fn as_pg_options(&self) -> String { + if let Some(ops) = &self { + ops.iter() + .map(|op| op.to_pg_option()) + .collect::>() + .join(" ") + } else { + "".to_string() + } + } + + /// Serialize an optional collection of `GenericOption`'s to + /// `postgresql.conf` compatible format. + fn as_pg_settings(&self) -> String { + if let Some(ops) = &self { + ops.iter() + .map(|op| op.to_pg_setting()) + .collect::>() + .join("\n") + } else { + "".to_string() + } + } +} + +pub trait GenericOptionsSearch { + fn find(&self, name: &str) -> Option; +} + +impl GenericOptionsSearch for GenericOptions { + /// Lookup option by name + fn find(&self, name: &str) -> Option { + match &self { + Some(ops) => { + let op = ops.iter().find(|s| s.name == name); + match op { + Some(op) => op.value.clone(), + None => None, + } + } + None => None, + } + } +} + +impl Role { + /// Serialize a list of role parameters into a Postgres-acceptable + /// string of arguments. + pub fn to_pg_options(&self) -> String { + // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in Rails. + // For now we do not use generic `options` for roles. Once used, add + // `self.options.as_pg_options()` somewhere here. + let mut params: String = "LOGIN".to_string(); + + if let Some(pass) = &self.encrypted_password { + params.push_str(&format!(" PASSWORD 'md5{}'", pass)); + } else { + params.push_str(" PASSWORD NULL"); + } + + params + } +} + +impl Database { + /// Serialize a list of database parameters into a Postgres-acceptable + /// string of arguments. + /// NB: `TEMPLATE` is actually also an identifier, but so far we only need + /// to use `template0` and `template1`, so it is not a problem. Yet in the future + /// it may require a proper quoting too. + pub fn to_pg_options(&self) -> String { + let mut params: String = self.options.as_pg_options(); + params.push_str(&format!(" OWNER {}", &self.owner.quote())); + + params + } +} + +/// String type alias representing Postgres identifier and +/// intended to be used for DB / role names. +pub type PgIdent = String; + +/// Generic trait used to provide quoting for strings used in the +/// Postgres SQL queries. Currently used only to implement quoting +/// of identifiers, but could be used for literals in the future. +pub trait PgQuote { + fn quote(&self) -> String; +} + +impl PgQuote for PgIdent { + /// This is intended to mimic Postgres quote_ident(), but for simplicity it + /// always quotes provided string with `""` and escapes every `"`. Not idempotent, + /// i.e. if string is already escaped it will be escaped again. + fn quote(&self) -> String { + let result = format!("\"{}\"", self.replace("\"", "\"\"")); + result + } +} + +/// Build a list of existing Postgres roles +pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result> { + let postgres_roles = xact + .query("SELECT rolname, rolpassword FROM pg_catalog.pg_authid", &[])? + .iter() + .map(|row| Role { + name: row.get("rolname"), + encrypted_password: row.get("rolpassword"), + options: None, + }) + .collect(); + + Ok(postgres_roles) +} + +/// Build a list of existing Postgres databases +pub fn get_existing_dbs(client: &mut Client) -> Result> { + let postgres_dbs = client + .query( + "SELECT datname, datdba::regrole::text as owner + FROM pg_catalog.pg_database;", + &[], + )? + .iter() + .map(|row| Database { + name: row.get("datname"), + owner: row.get("owner"), + options: None, + }) + .collect(); + + Ok(postgres_dbs) +} + +/// Wait for Postgres to become ready to accept connections: +/// - state should be `ready` in the `pgdata/postmaster.pid` +/// - and we should be able to connect to 127.0.0.1:5432 +pub fn wait_for_postgres(port: &str, pgdata: &Path) -> Result<()> { + let pid_path = pgdata.join("postmaster.pid"); + let mut slept: u64 = 0; // ms + let pause = time::Duration::from_millis(100); + + let timeout = time::Duration::from_millis(200); + let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap(); + + loop { + // Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout, + // but postgres starts listening almost immediately, even if it is not really + // ready to accept connections). + if slept >= POSTGRES_WAIT_TIMEOUT { + return Err(anyhow!("timed out while waiting for Postgres to start")); + } + + if pid_path.exists() { + // XXX: dumb and the simplest way to get the last line in a text file + // TODO: better use `.lines().last()` later + let stdout = Command::new("tail") + .args(&["-n1", pid_path.to_str().unwrap()]) + .output()? + .stdout; + let status = String::from_utf8(stdout)?; + let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok(); + + // Now Postgres is ready to accept connections + if status.trim() == "ready" && can_connect { + break; + } + } + + thread::sleep(pause); + slept += 100; + } + + Ok(()) +} + +/// Remove `pgdata` directory and create it again with right permissions. +pub fn create_pgdata(pgdata: &str) -> Result<()> { + // Ignore removal error, likely it is a 'No such file or directory (os error 2)'. + // If it is something different then create_dir() will error out anyway. + let _ok = fs::remove_dir_all(pgdata); + fs::create_dir(pgdata)?; + fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?; + + Ok(()) +} diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs new file mode 100644 index 0000000000..41e4174bf0 --- /dev/null +++ b/compute_tools/src/spec.rs @@ -0,0 +1,246 @@ +use std::path::Path; + +use anyhow::Result; +use log::{info, log_enabled, warn, Level}; +use postgres::Client; + +use crate::config; +use crate::params::PG_HBA_ALL_MD5; +use crate::pg_helpers::*; +use crate::zenith::ClusterSpec; + +/// It takes cluster specification and does the following: +/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file. +/// - Update `pg_hba.conf` to allow external connections. +pub fn handle_configuration(spec: &ClusterSpec, pgdata_path: &Path) -> Result<()> { + // File `postgresql.conf` is no longer included into `basebackup`, so just + // always write all config into it creating new file. + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; + + update_pg_hba(pgdata_path)?; + + Ok(()) +} + +/// Check `pg_hba.conf` and update if needed to allow external connections. +pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> { + // XXX: consider making it a part of spec.json + info!("checking pg_hba.conf"); + let pghba_path = pgdata_path.join("pg_hba.conf"); + + if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? { + info!("updated pg_hba.conf to allow external connections"); + } else { + info!("pg_hba.conf is up-to-date"); + } + + Ok(()) +} + +/// Given a cluster spec json and open transaction it handles roles creation, +/// deletion and update. +pub fn handle_roles(spec: &ClusterSpec, client: &mut Client) -> Result<()> { + let mut xact = client.transaction()?; + let existing_roles: Vec = get_existing_roles(&mut xact)?; + + // Print a list of existing Postgres roles (only in debug mode) + info!("postgres roles:"); + for r in &existing_roles { + info_println!( + "{} - {}:{}", + " ".repeat(27 + 5), + r.name, + if r.encrypted_password.is_some() { + "[FILTERED]" + } else { + "(null)" + } + ); + } + + // Process delta operations first + if let Some(ops) = &spec.delta_operations { + info!("processing delta operations on roles"); + for op in ops { + match op.action.as_ref() { + // We do not check either role exists or not, + // Postgres will take care of it for us + "delete_role" => { + let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.quote()); + + warn!("deleting role '{}'", &op.name); + xact.execute(query.as_str(), &[])?; + } + // Renaming role drops its password, since tole name is + // used as a salt there. It is important that this role + // is recorded with a new `name` in the `roles` list. + // Follow up roles update will set the new password. + "rename_role" => { + let new_name = op.new_name.as_ref().unwrap(); + + // XXX: with a limited number of roles it is fine, but consider making it a HashMap + if existing_roles.iter().any(|r| r.name == op.name) { + let query: String = format!( + "ALTER ROLE {} RENAME TO {}", + op.name.quote(), + new_name.quote() + ); + + warn!("renaming role '{}' to '{}'", op.name, new_name); + xact.execute(query.as_str(), &[])?; + } + } + _ => {} + } + } + } + + // Refresh Postgres roles info to handle possible roles renaming + let existing_roles: Vec = get_existing_roles(&mut xact)?; + + info!("cluster spec roles:"); + for role in &spec.cluster.roles { + let name = &role.name; + + info_print!( + "{} - {}:{}", + " ".repeat(27 + 5), + name, + if role.encrypted_password.is_some() { + "[FILTERED]" + } else { + "(null)" + } + ); + + // XXX: with a limited number of roles it is fine, but consider making it a HashMap + let pg_role = existing_roles.iter().find(|r| r.name == *name); + + if let Some(r) = pg_role { + let mut update_role = false; + + if (r.encrypted_password.is_none() && role.encrypted_password.is_some()) + || (r.encrypted_password.is_some() && role.encrypted_password.is_none()) + { + update_role = true; + } else if let Some(pg_pwd) = &r.encrypted_password { + // Check whether password changed or not (trim 'md5:' prefix first) + update_role = pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap(); + } + + if update_role { + let mut query: String = format!("ALTER ROLE {} ", name.quote()); + info_print!(" -> update"); + + query.push_str(&role.to_pg_options()); + xact.execute(query.as_str(), &[])?; + } + } else { + info!("role name {}", &name); + let mut query: String = format!("CREATE ROLE {} ", name.quote()); + info!("role create query {}", &query); + info_print!(" -> create"); + + query.push_str(&role.to_pg_options()); + xact.execute(query.as_str(), &[])?; + } + + info_print!("\n"); + } + + xact.commit()?; + + Ok(()) +} + +/// It follows mostly the same logic as `handle_roles()` excepting that we +/// does not use an explicit transactions block, since major database operations +/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level +/// atomicity should be enough here due to the order of operations and various checks, +/// which together provide us idempotency. +pub fn handle_databases(spec: &ClusterSpec, client: &mut Client) -> Result<()> { + let existing_dbs: Vec = get_existing_dbs(client)?; + + // Print a list of existing Postgres databases (only in debug mode) + info!("postgres databases:"); + for r in &existing_dbs { + info_println!("{} - {}:{}", " ".repeat(27 + 5), r.name, r.owner); + } + + // Process delta operations first + if let Some(ops) = &spec.delta_operations { + info!("processing delta operations on databases"); + for op in ops { + match op.action.as_ref() { + // We do not check either DB exists or not, + // Postgres will take care of it for us + "delete_db" => { + let query: String = format!("DROP DATABASE IF EXISTS {}", &op.name.quote()); + + warn!("deleting database '{}'", &op.name); + client.execute(query.as_str(), &[])?; + } + "rename_db" => { + let new_name = op.new_name.as_ref().unwrap(); + + // XXX: with a limited number of roles it is fine, but consider making it a HashMap + if existing_dbs.iter().any(|r| r.name == op.name) { + let query: String = format!( + "ALTER DATABASE {} RENAME TO {}", + op.name.quote(), + new_name.quote() + ); + + warn!("renaming database '{}' to '{}'", op.name, new_name); + client.execute(query.as_str(), &[])?; + } + } + _ => {} + } + } + } + + // Refresh Postgres databases info to handle possible renames + let existing_dbs: Vec = get_existing_dbs(client)?; + + info!("cluster spec databases:"); + for db in &spec.cluster.databases { + let name = &db.name; + + info_print!("{} - {}:{}", " ".repeat(27 + 5), db.name, db.owner); + + // XXX: with a limited number of databases it is fine, but consider making it a HashMap + let pg_db = existing_dbs.iter().find(|r| r.name == *name); + + if let Some(r) = pg_db { + // XXX: db owner name is returned as quoted string from Postgres, + // when quoting is needed. + let new_owner = if r.owner.starts_with('\"') { + db.owner.quote() + } else { + db.owner.clone() + }; + + if new_owner != r.owner { + let query: String = format!( + "ALTER DATABASE {} OWNER TO {}", + name.quote(), + db.owner.quote() + ); + info_print!(" -> update"); + + client.execute(query.as_str(), &[])?; + } + } else { + let mut query: String = format!("CREATE DATABASE {} ", name.quote()); + info_print!(" -> create"); + + query.push_str(&db.to_pg_options()); + client.execute(query.as_str(), &[])?; + } + + info_print!("\n"); + } + + Ok(()) +} diff --git a/compute_tools/src/zenith.rs b/compute_tools/src/zenith.rs new file mode 100644 index 0000000000..96aef00016 --- /dev/null +++ b/compute_tools/src/zenith.rs @@ -0,0 +1,107 @@ +use std::process::{Command, Stdio}; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use postgres::{Client, NoTls}; +use serde::Deserialize; + +use crate::pg_helpers::*; + +/// Compute node state shared across several `zenith_ctl` threads. +/// Should be used under `RwLock` to allow HTTP API server to serve +/// status requests, while configuration is in progress. +pub struct ComputeState { + pub connstr: String, + pub pgdata: String, + pub pgbin: String, + pub spec: ClusterSpec, + /// Compute setup process has finished + pub ready: bool, + /// Timestamp of the last Postgres activity + pub last_active: DateTime, +} + +/// Cluster spec or configuration represented as an optional number of +/// delta operations + final cluster state description. +#[derive(Clone, Deserialize)] +pub struct ClusterSpec { + pub format_version: f32, + pub timestamp: String, + pub operation_uuid: Option, + /// Expected cluster state at the end of transition process. + pub cluster: Cluster, + pub delta_operations: Option>, +} + +/// Cluster state seen from the perspective of the external tools +/// like Rails web console. +#[derive(Clone, Deserialize)] +pub struct Cluster { + pub cluster_id: i64, + pub name: String, + pub state: Option, + pub roles: Vec, + pub databases: Vec, + pub settings: GenericOptions, +} + +/// Single cluster state changing operation that could not be represented as +/// a static `Cluster` structure. For example: +/// - DROP DATABASE +/// - DROP ROLE +/// - ALTER ROLE name RENAME TO new_name +/// - ALTER DATABASE name RENAME TO new_name +#[derive(Clone, Deserialize)] +pub struct DeltaOp { + pub action: String, + pub name: PgIdent, + pub new_name: Option, +} + +/// Get basebackup from the libpq connection to pageserver using `connstr` and +/// unarchive it to `pgdata` directory overriding all its previous content. +pub fn get_basebackup( + pgdata: &str, + connstr: &str, + tenant: &str, + timeline: &str, + lsn: &str, +) -> Result<()> { + let mut client = Client::connect(connstr, NoTls)?; + let basebackup_cmd = match lsn { + "0/0" => format!("basebackup {} {}", tenant, timeline), // First start of the compute + _ => format!("basebackup {} {} {}", tenant, timeline, lsn), + }; + let copyreader = client.copy_out(basebackup_cmd.as_str())?; + let mut ar = tar::Archive::new(copyreader); + + ar.unpack(&pgdata)?; + + Ok(()) +} + +/// Run `postgres` in a special mode with `--sync-safekeepers` argument +/// and return the reported LSN back to the caller. +pub fn sync_safekeepers(pgdata: &str, pgbin: &str) -> Result { + let sync_handle = Command::new(&pgbin) + .args(&["--sync-safekeepers"]) + .env("PGDATA", &pgdata) // we cannot use -D in this mode + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("postgres --sync-safekeepers failed to start"); + + let sync_output = sync_handle + .wait_with_output() + .expect("postgres --sync-safekeepers failed"); + if !sync_output.status.success() { + anyhow::bail!( + "postgres --sync-safekeepers failed: '{}'", + String::from_utf8_lossy(&sync_output.stderr) + ); + } + + let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim()); + + Ok(lsn) +} diff --git a/compute_tools/tests/cluster_spec.json b/compute_tools/tests/cluster_spec.json new file mode 100644 index 0000000000..e0034415f1 --- /dev/null +++ b/compute_tools/tests/cluster_spec.json @@ -0,0 +1,205 @@ +{ + "format_version": 1.0, + + "timestamp": "2021-05-23T18:25:43.511Z", + "operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b", + + "cluster": { + "cluster_id": 42, + "name": "Zenith Test", + "state": "restarted", + "roles": [ + { + "name": "postgres", + "encrypted_password": "6b1d16b78004bbd51fa06af9eda75972", + "options": null + }, + { + "name": "alexk", + "encrypted_password": null, + "options": null + }, + { + "name": "zenith \"new\"", + "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972", + "options": null + }, + { + "name": "zen", + "encrypted_password": "9b1d16b78004bbd51fa06af9eda75972" + }, + { + "name": "\"name\";\\n select 1;", + "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972" + }, + { + "name": "MyRole", + "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972" + } + ], + "databases": [ + { + "name": "DB2", + "owner": "alexk", + "options": [ + { + "name": "LC_COLLATE", + "value": "C", + "vartype": "string" + }, + { + "name": "LC_CTYPE", + "value": "C", + "vartype": "string" + }, + { + "name": "TEMPLATE", + "value": "template0", + "vartype": "enum" + } + ] + }, + { + "name": "zenith", + "owner": "MyRole" + }, + { + "name": "zen", + "owner": "zen" + } + ], + "settings": [ + { + "name": "fsync", + "value": "off", + "vartype": "bool" + }, + { + "name": "wal_level", + "value": "replica", + "vartype": "enum" + }, + { + "name": "hot_standby", + "value": "on", + "vartype": "bool" + }, + { + "name": "wal_acceptors", + "value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501", + "vartype": "string" + }, + { + "name": "wal_log_hints", + "value": "on", + "vartype": "bool" + }, + { + "name": "log_connections", + "value": "on", + "vartype": "bool" + }, + { + "name": "shared_buffers", + "value": "32768", + "vartype": "integer" + }, + { + "name": "port", + "value": "55432", + "vartype": "integer" + }, + { + "name": "max_connections", + "value": "100", + "vartype": "integer" + }, + { + "name": "max_wal_senders", + "value": "10", + "vartype": "integer" + }, + { + "name": "listen_addresses", + "value": "0.0.0.0", + "vartype": "string" + }, + { + "name": "wal_sender_timeout", + "value": "0", + "vartype": "integer" + }, + { + "name": "password_encryption", + "value": "md5", + "vartype": "enum" + }, + { + "name": "maintenance_work_mem", + "value": "65536", + "vartype": "integer" + }, + { + "name": "max_parallel_workers", + "value": "8", + "vartype": "integer" + }, + { + "name": "max_worker_processes", + "value": "8", + "vartype": "integer" + }, + { + "name": "zenith.zenith_tenant", + "value": "b0554b632bd4d547a63b86c3630317e8", + "vartype": "string" + }, + { + "name": "max_replication_slots", + "value": "10", + "vartype": "integer" + }, + { + "name": "zenith.zenith_timeline", + "value": "2414a61ffc94e428f14b5758fe308e13", + "vartype": "string" + }, + { + "name": "shared_preload_libraries", + "value": "zenith", + "vartype": "string" + }, + { + "name": "synchronous_standby_names", + "value": "walproposer", + "vartype": "string" + }, + { + "name": "zenith.page_server_connstring", + "value": "host=127.0.0.1 port=6400", + "vartype": "string" + } + ] + }, + + "delta_operations": [ + { + "action": "delete_db", + "name": "zenith_test" + }, + { + "action": "rename_db", + "name": "DB", + "new_name": "DB2" + }, + { + "action": "delete_role", + "name": "zenith2" + }, + { + "action": "rename_role", + "name": "zenith new", + "new_name": "zenith \"new\"" + } + ] +} diff --git a/compute_tools/tests/config_test.rs b/compute_tools/tests/config_test.rs new file mode 100644 index 0000000000..9ab16b1930 --- /dev/null +++ b/compute_tools/tests/config_test.rs @@ -0,0 +1,48 @@ +#[cfg(test)] +mod config_tests { + + use std::fs::{remove_file, File}; + use std::io::{Read, Write}; + use std::path::Path; + + use compute_tools::config::*; + + fn write_test_file(path: &Path, content: &str) { + let mut file = File::create(path).unwrap(); + file.write_all(content.as_bytes()).unwrap(); + } + + fn check_file_content(path: &Path, expected_content: &str) { + let mut file = File::open(path).unwrap(); + let mut content = String::new(); + + file.read_to_string(&mut content).unwrap(); + assert_eq!(content, expected_content); + } + + #[test] + fn test_line_in_file() { + let path = Path::new("./tests/tmp/config_test.txt"); + write_test_file(path, "line1\nline2.1\t line2.2\nline3"); + + let line = "line2.1\t line2.2"; + let result = line_in_file(path, line).unwrap(); + assert!(!result); + check_file_content(path, "line1\nline2.1\t line2.2\nline3"); + + let line = "line4"; + let result = line_in_file(path, line).unwrap(); + assert!(result); + check_file_content(path, "line1\nline2.1\t line2.2\nline3\nline4"); + + remove_file(path).unwrap(); + + let path = Path::new("./tests/tmp/new_config_test.txt"); + let line = "line4"; + let result = line_in_file(path, line).unwrap(); + assert!(result); + check_file_content(path, "line4"); + + remove_file(path).unwrap(); + } +} diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs new file mode 100644 index 0000000000..472a49af4b --- /dev/null +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -0,0 +1,41 @@ +#[cfg(test)] +mod pg_helpers_tests { + + use std::fs::File; + + use compute_tools::pg_helpers::*; + use compute_tools::zenith::ClusterSpec; + + #[test] + fn params_serialize() { + let file = File::open("tests/cluster_spec.json").unwrap(); + let spec: ClusterSpec = serde_json::from_reader(file).unwrap(); + + assert_eq!( + spec.cluster.databases.first().unwrap().to_pg_options(), + "LC_COLLATE 'C' LC_CTYPE 'C' TEMPLATE template0 OWNER \"alexk\"" + ); + assert_eq!( + spec.cluster.roles.first().unwrap().to_pg_options(), + "LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'" + ); + } + + #[test] + fn settings_serialize() { + let file = File::open("tests/cluster_spec.json").unwrap(); + let spec: ClusterSpec = serde_json::from_reader(file).unwrap(); + + assert_eq!( + spec.cluster.settings.as_pg_settings(), + "fsync = off\nwal_level = replica\nhot_standby = on\nwal_acceptors = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'\nwal_log_hints = on\nlog_connections = on\nshared_buffers = 32768\nport = 55432\nmax_connections = 100\nmax_wal_senders = 10\nlisten_addresses = '0.0.0.0'\nwal_sender_timeout = 0\npassword_encryption = md5\nmaintenance_work_mem = 65536\nmax_parallel_workers = 8\nmax_worker_processes = 8\nzenith.zenith_tenant = 'b0554b632bd4d547a63b86c3630317e8'\nmax_replication_slots = 10\nzenith.zenith_timeline = '2414a61ffc94e428f14b5758fe308e13'\nshared_preload_libraries = 'zenith'\nsynchronous_standby_names = 'walproposer'\nzenith.page_server_connstring = 'host=127.0.0.1 port=6400'" + ); + } + + #[test] + fn quote_ident() { + let ident: PgIdent = PgIdent::from("\"name\";\\n select 1;"); + + assert_eq!(ident.quote(), "\"\"\"name\"\";\\n select 1;\""); + } +} diff --git a/compute_tools/tests/tmp/.gitignore b/compute_tools/tests/tmp/.gitignore new file mode 100644 index 0000000000..91224e5de8 --- /dev/null +++ b/compute_tools/tests/tmp/.gitignore @@ -0,0 +1 @@ +**/* diff --git a/vendor/postgres b/vendor/postgres index f7ce86a970..eb581dbe8c 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit f7ce86a9700dc47485fa633d9dd60d7942f7f0e1 +Subproject commit eb581dbe8c214a6f92c5019f91daf317b5c432e9