diff --git a/.circleci/config.yml b/.circleci/config.yml index 45c0f0df57..63816eee56 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ executors: zenith-build-executor: resource_class: xlarge docker: - - image: cimg/rust:1.55.0 + - image: cimg/rust:1.56.1 zenith-python-executor: docker: - image: cimg/python:3.7.10 # Oldest available 3.7 with Ubuntu 20.04 (for GLIBC and Rust) at CirlceCI @@ -14,7 +14,6 @@ jobs: executor: zenith-build-executor steps: - checkout - - run: name: rustfmt when: always @@ -81,6 +80,8 @@ jobs: build_type: type: enum enum: ["debug", "release"] + environment: + BUILD_TYPE: << parameters.build_type >> steps: - run: name: apt install dependencies @@ -116,16 +117,17 @@ jobs: - run: name: Rust build << parameters.build_type >> command: | - export CARGO_INCREMENTAL=0 - BUILD_TYPE="<< parameters.build_type >>" if [[ $BUILD_TYPE == "debug" ]]; then - echo "Build in debug mode" - cargo build --bins --tests + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + CARGO_FLAGS= elif [[ $BUILD_TYPE == "release" ]]; then - echo "Build in release mode" - cargo build --release --bins --tests + cov_prefix=() + CARGO_FLAGS=--release fi + export CARGO_INCREMENTAL=0 + "${cov_prefix[@]}" cargo build $CARGO_FLAGS --bins --tests + - save_cache: name: Save rust cache key: v04-rust-cache-deps-<< parameters.build_type >>-{{ checksum "Cargo.lock" }} @@ -138,45 +140,77 @@ jobs: # has to run separately from cargo fmt section # since needs to run with dependencies - run: - name: clippy + name: cargo clippy command: | - ./run_clippy.sh + if [[ $BUILD_TYPE == "debug" ]]; then + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + elif [[ $BUILD_TYPE == "release" ]]; then + cov_prefix=() + fi + + "${cov_prefix[@]}" ./run_clippy.sh # Run rust unit tests - - run: cargo test + - run: + name: cargo test + command: | + if [[ $BUILD_TYPE == "debug" ]]; then + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + elif [[ $BUILD_TYPE == "release" ]]; then + cov_prefix=() + fi + + "${cov_prefix[@]}" cargo test # Install the rust binaries, for use by test jobs - # `--locked` is required; otherwise, `cargo install` will ignore Cargo.lock. - # FIXME: this is a really silly way to install; maybe we should just output - # a tarball as an artifact? Or a .deb package? - run: - name: cargo install + name: Install rust binaries command: | - export CARGO_INCREMENTAL=0 - BUILD_TYPE="<< parameters.build_type >>" if [[ $BUILD_TYPE == "debug" ]]; then - echo "Install debug mode" - CARGO_FLAGS="--debug" + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) elif [[ $BUILD_TYPE == "release" ]]; then - echo "Install release mode" - # The default is release mode; there is no --release flag. - CARGO_FLAGS="" + cov_prefix=() + fi + + binaries=$( + "${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps | + jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name' + ) + + test_exe_paths=$( + "${cov_prefix[@]}" cargo test --message-format=json --no-run | + jq -r '.executable | select(. != null)' + ) + + mkdir -p /tmp/zenith/bin + mkdir -p /tmp/zenith/test_bin + mkdir -p /tmp/zenith/etc + + # Install target binaries + for bin in $binaries; do + SRC=target/$BUILD_TYPE/$bin + DST=/tmp/zenith/bin/$bin + cp $SRC $DST + echo $DST >> /tmp/zenith/etc/binaries.list + done + + # Install test executables (for code coverage) + if [[ $BUILD_TYPE == "debug" ]]; then + for bin in $test_exe_paths; do + SRC=$bin + DST=/tmp/zenith/test_bin/$(basename $bin) + cp $SRC $DST + echo $DST >> /tmp/zenith/etc/binaries.list + done fi - cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path pageserver - cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path walkeeper - cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path zenith # Install the postgres binaries, for use by test jobs - # FIXME: this is a silly way to do "install"; maybe just output a standard - # postgres package, whatever the favored form is (tarball? .deb package?) - # Note that pg_regress needs some build artifacts that probably aren't - # in the usual package...? - run: - name: postgres install + name: Install postgres binaries command: | cp -a tmp_install /tmp/zenith/pg_install - # Save the rust output binaries for other jobs in this workflow. + # Save the rust binaries and coverage data for other jobs in this workflow. - persist_to_workspace: root: /tmp/zenith paths: @@ -228,6 +262,8 @@ jobs: save_perf_report: type: boolean default: false + environment: + BUILD_TYPE: << parameters.build_type >> steps: - attach_workspace: at: /tmp/zenith @@ -241,21 +277,22 @@ jobs: command: pipenv --python 3.7 install - run: name: Run pytest - working_directory: test_runner # pytest doesn't output test logs in real time, so CI job may fail with # `Too long with no output` error, if a test is running for a long time. - # In that case, tests should have internal timeouts that are less than + # In that case, tests should have internal timeouts that are less than # no_output_timeout, specified here. no_output_timeout: 10m environment: - ZENITH_BIN: /tmp/zenith/bin - POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install - TEST_OUTPUT: /tmp/test_output - # this variable will be embedded in perf test report + # this variable will be embedded in perf test report # and is needed to distinguish different environments - PLATFORM: zenith-local-ci command: | - TEST_SELECTION="<< parameters.test_selection >>" + PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)" + + TEST_SELECTION="test_runner/<< parameters.test_selection >>" EXTRA_PARAMS="<< parameters.extra_params >>" if [ -z "$TEST_SELECTION" ]; then echo "test_selection must be set" @@ -263,16 +300,22 @@ jobs: fi if << parameters.run_in_parallel >>; then EXTRA_PARAMS="-n4 $EXTRA_PARAMS" - fi; + fi if << parameters.save_perf_report >>; then if [[ $CIRCLE_BRANCH == "main" ]]; then - mkdir -p perf-report-local - EXTRA_PARAMS="--out-dir perf-report-local $EXTRA_PARAMS" - fi; - fi; + mkdir -p "$PERF_REPORT_DIR" + EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS" + fi + fi export GITHUB_SHA=$CIRCLE_SHA1 + if [[ $BUILD_TYPE == "debug" ]]; then + cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run) + elif [[ $BUILD_TYPE == "release" ]]; then + cov_prefix=() + fi + # Run the tests. # # The junit.xml file allows CircleCI to display more fine-grained test information @@ -283,14 +326,21 @@ jobs: # -n4 uses four processes to run tests via pytest-xdist # -s is not used to prevent pytest from capturing output, because tests are running # in parallel and logs are mixed between different tests - pipenv run pytest --junitxml=$TEST_OUTPUT/junit.xml --tb=short --verbose -m "not remote_cluster" -rA $TEST_SELECTION $EXTRA_PARAMS + "${cov_prefix[@]}" pipenv run pytest \ + --junitxml=$TEST_OUTPUT/junit.xml \ + --tb=short \ + --verbose \ + -m "not remote_cluster" \ + -rA $TEST_SELECTION $EXTRA_PARAMS if << parameters.save_perf_report >>; then if [[ $CIRCLE_BRANCH == "main" ]]; then - REPORT_FROM=$(realpath perf-report-local) REPORT_TO=local ../scripts/generate_and_push_perf_report.sh - fi; - fi; - + # TODO: reuse scripts/git-upload + export REPORT_FROM="$PERF_REPORT_DIR" + export REPORT_TO=local + scripts/generate_and_push_perf_report.sh + fi + fi - run: # CircleCI artifacts are preserved one file at a time, so skipping # this step isn't a good idea. If you want to extract the @@ -306,6 +356,65 @@ jobs: # The store_test_results step tells CircleCI where to find the junit.xml file. - store_test_results: path: /tmp/test_output + # Save coverage data (if any) + - persist_to_workspace: + root: /tmp/zenith + paths: + - "*" + + coverage-report: + executor: zenith-build-executor + steps: + - attach_workspace: + at: /tmp/zenith + - checkout + - restore_cache: + name: Restore rust cache + keys: + # Require an exact match. While an out of date cache might speed up the build, + # there's no way to clean out old packages, so the cache grows every time something + # changes. + - v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }} + - run: + name: Install llvm-tools + command: | + # TODO: install a proper symbol demangler, e.g. rustfilt + # TODO: we should embed this into a docker image + rustup component add llvm-tools-preview + - run: + name: Build coverage report + command: | + COMMIT_URL=https://github.com/zenithdb/zenith/commit/$CIRCLE_SHA1 + + scripts/coverage \ + --dir=/tmp/zenith/coverage report \ + --input-objects=/tmp/zenith/etc/binaries.list \ + --commit-url=$COMMIT_URL \ + --format=github + - run: + name: Upload coverage report + command: | + LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME + REPORT_URL=https://zenithdb.github.io/zenith-coverage-data/$CIRCLE_SHA1 + COMMIT_URL=https://github.com/zenithdb/zenith/commit/$CIRCLE_SHA1 + + scripts/git-upload \ + --repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/zenithdb/zenith-coverage-data.git \ + --message="Add code coverage for $COMMIT_URL" \ + copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE + + # Add link to the coverage report to the commit + curl -f -X POST \ + https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \ + -H "Accept: application/vnd.github.v3+json" \ + --user "$CI_ACCESS_TOKEN" \ + --data \ + "{ + \"state\": \"success\", + \"context\": \"zenith-coverage\", + \"description\": \"Coverage report is ready\", + \"target_url\": \"$REPORT_URL\" + }" # Build zenithdb/zenith:latest image and push it to Docker hub docker-image: @@ -410,6 +519,12 @@ workflows: save_perf_report: true requires: - build-zenith-release + - coverage-report: + # Context passes credentials for gh api + context: CI_ACCESS_TOKEN + requires: + # TODO: consider adding more + - other-tests-debug - docker-image: # Context gives an ability to login context: Docker Hub diff --git a/.gitignore b/.gitignore index afcc79fd20..2ecdaa2053 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,7 @@ test_output/ .vscode /.zenith /integration_tests/.zenith + +# Coverage +*.profraw +*.profdata diff --git a/Cargo.lock b/Cargo.lock index a7b90e18b7..59facab172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1888,7 +1888,6 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" dependencies = [ - "cc", "libc", "signal-hook-registry", ] @@ -2360,6 +2359,7 @@ dependencies = [ "rust-s3", "serde", "serde_json", + "signal-hook", "tempfile", "tokio", "tokio-stream", @@ -2611,6 +2611,7 @@ dependencies = [ "rustls-split", "serde", "serde_json", + "signal-hook", "tempfile", "thiserror", "tokio", diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 85f6c45ece..841cf050c7 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -290,8 +290,12 @@ impl PostgresNode { conf.append("max_wal_size", "100GB"); conf.append("fsync", "off"); conf.append("max_connections", "100"); - conf.append("wal_sender_timeout", "0"); conf.append("wal_level", "replica"); + // wal_sender_timeout is the maximum time to wait for WAL replication. + // It also defines how often the walreciever will send a feedback message to the wal sender. + conf.append("wal_sender_timeout", "5s"); + conf.append("max_replication_flush_lag", "160MB"); + conf.append("max_replication_apply_lag", "1500MB"); conf.append("listen_addresses", &self.address.ip().to_string()); conf.append("port", &self.address.port().to_string()); diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index ca8dbf38dd..fcbf840397 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -130,9 +130,8 @@ impl SafekeeperNode { let listen_pg = format!("localhost:{}", self.conf.pg_port); let listen_http = format!("localhost:{}", self.conf.http_port); - let mut cmd: &mut Command = &mut Command::new(self.env.safekeeper_bin()?); - cmd = cmd - .args(&["-D", self.datadir_path().to_str().unwrap()]) + let mut cmd = Command::new(self.env.safekeeper_bin()?); + cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--pageserver", &pageserver_conn]) @@ -141,13 +140,18 @@ impl SafekeeperNode { .env_clear() .env("RUST_BACKTRACE", "1"); if !self.conf.sync { - cmd = cmd.arg("--no-sync"); + cmd.arg("--no-sync"); } if self.env.pageserver.auth_type == AuthType::ZenithJWT { cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token); } + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); + } + if !cmd.status()?.success() { bail!( "Safekeeper failed to start. See '{}' for details.", diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index f4cef0a72b..b8118d7f4b 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -5,7 +5,7 @@ use std::process::Command; use std::time::Duration; use std::{io, result, thread}; -use anyhow::{anyhow, bail}; +use anyhow::bail; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; @@ -97,7 +97,6 @@ impl PageServerNode { } pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> { - let mut cmd = Command::new(self.env.pageserver_bin()?); let listen_pg = format!("localhost:{}", self.env.pageserver.pg_port); let listen_http = format!("localhost:{}", self.env.pageserver.http_port); let mut args = vec![ @@ -122,18 +121,19 @@ impl PageServerNode { args.extend(&["--create-tenant", tenantid]) } - let status = cmd - .args(args) - .env_clear() - .env("RUST_BACKTRACE", "1") - .status() - .expect("pageserver init failed"); + let mut cmd = Command::new(self.env.pageserver_bin()?); + cmd.args(args).env_clear().env("RUST_BACKTRACE", "1"); - if status.success() { - Ok(()) - } else { - Err(anyhow!("pageserver init failed")) + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); } + + if !cmd.status()?.success() { + bail!("pageserver init failed"); + } + + Ok(()) } pub fn repo_path(&self) -> PathBuf { @@ -158,6 +158,11 @@ impl PageServerNode { .env_clear() .env("RUST_BACKTRACE", "1"); + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); + } + if !cmd.status()?.success() { bail!( "Pageserver failed to start. See '{}' for details.", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 6cb26404c5..7bb6d1c945 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -35,7 +35,7 @@ scopeguard = "1.1.0" async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" -signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } +signal-hook = "0.3.10" url = "2" nix = "0.23" once_cell = "1.8.0" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 63de235003..e237afbd16 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -14,14 +14,6 @@ use tracing::*; use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION}; use anyhow::{bail, ensure, Context, Result}; -use signal_hook::consts::signal::*; -use signal_hook::consts::TERM_SIGNALS; -use signal_hook::flag; -use signal_hook::iterator::exfiltrator::WithOrigin; -use signal_hook::iterator::SignalsInfo; -use std::process::exit; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; @@ -32,6 +24,8 @@ use pageserver::{ }; use zenith_utils::http::endpoint; use zenith_utils::postgres_backend; +use zenith_utils::shutdown::exit_now; +use zenith_utils::signals::{self, Signal}; use const_format::formatcp; @@ -524,17 +518,6 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { info!("version: {}", GIT_VERSION); - let term_now = Arc::new(AtomicBool::new(false)); - for sig in TERM_SIGNALS { - // When terminated by a second term signal, exit with exit code 1. - // This will do nothing the first time (because term_now is false). - flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?; - // But this will "arm" the above for the second time, by setting it to true. - // The order of registering these is important, if you put this one first, it will - // first arm and then terminate ‒ all in the first round. - flag::register(*sig, Arc::clone(&term_now))?; - } - // TODO: Check that it looks like a valid repository before going further // bind sockets before daemonizing so we report errors early and do not return until we are listening @@ -550,6 +533,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { ); let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -564,18 +548,21 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { .stdout(stdout) .stderr(stderr); - match daemonize.start() { + // XXX: The parent process should exit abruptly right after + // it has spawned a child to prevent coverage machinery from + // dumping stats into a `profraw` file now owned by the child. + // Otherwise, the coverage data will be damaged. + match daemonize.exit_action(|| exit_now(0)).start() { Ok(_) => info!("Success, daemonized"), Err(err) => error!(%err, "could not daemonize"), } } - // keep join handles for spawned threads - // don't spawn threads before daemonizing - let mut join_handles = Vec::new(); + let signals = signals::install_shutdown_handlers()?; + let mut threads = vec![]; if let Some(handle) = remote_storage::run_storage_sync_thread(conf)? { - join_handles.push(handle); + threads.push(handle); } // Initialize tenant manager. tenant_mgr::init(conf); @@ -594,61 +581,55 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // Spawn a new thread for the http endpoint // bind before launching separate thread so the error reported before startup exits let cloned = auth.clone(); - let http_endpoint_thread = thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(move || { - let router = http::make_router(conf, cloned); - endpoint::serve_thread_main(router, http_listener) - })?; - - join_handles.push(http_endpoint_thread); + threads.push( + thread::Builder::new() + .name("http_endpoint_thread".into()) + .spawn(move || { + let router = http::make_router(conf, cloned); + endpoint::serve_thread_main(router, http_listener) + })?, + ); // Spawn a thread to listen for connections. It will spawn further threads // for each connection. - let page_service_thread = thread::Builder::new() - .name("Page Service thread".into()) - .spawn(move || { - page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) - })?; + threads.push( + thread::Builder::new() + .name("Page Service thread".into()) + .spawn(move || { + page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) + })?, + ); - for info in SignalsInfo::::new(TERM_SIGNALS)?.into_iter() { - match info.signal { - SIGQUIT => { - info!("Got SIGQUIT. Terminate pageserver in immediate shutdown mode"); - exit(111); - } - SIGINT | SIGTERM => { - info!("Got SIGINT/SIGTERM. Terminate gracefully in fast shutdown mode"); - // Terminate postgres backends - postgres_backend::set_pgbackend_shutdown_requested(); - // Stop all tenants and flush their data - tenant_mgr::shutdown_all_tenants()?; - // Wait for pageservice thread to complete the job - page_service_thread + signals.handle(|signal| match signal { + Signal::Quit => { + info!( + "Got {}. Terminating in immediate shutdown mode", + signal.name() + ); + std::process::exit(111); + } + + Signal::Interrupt | Signal::Terminate => { + info!( + "Got {}. Terminating gracefully in fast shutdown mode", + signal.name() + ); + + postgres_backend::set_pgbackend_shutdown_requested(); + tenant_mgr::shutdown_all_tenants()?; + endpoint::shutdown(); + + for handle in std::mem::take(&mut threads) { + handle .join() .expect("thread panicked") .expect("thread exited with an error"); - - // Shut down http router - endpoint::shutdown(); - - // Wait for all threads - for handle in join_handles.into_iter() { - handle - .join() - .expect("thread panicked") - .expect("thread exited with an error"); - } - info!("Pageserver shut down successfully completed"); - exit(0); - } - unknown_signal => { - debug!("Unknown signal {}", unknown_signal); } + + info!("Shut down successfully completed"); + std::process::exit(0); } - } - - Ok(()) + }) } #[cfg(test)] diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index af046536e0..3124cb3488 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -238,16 +238,22 @@ impl Repository for LayeredRepository { } fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> { - { - let timelines = self.timelines.lock().unwrap(); + // Scan through the hashmap and collect a list of all the timelines, + // while holding the lock. Then drop the lock and actually perform the + // checkpoints. We don't want to block everything else while the + // checkpoint runs. + let timelines = self.timelines.lock().unwrap(); + let timelines_to_checkpoint: Vec<(ZTimelineId, Arc)> = timelines + .iter() + .map(|(timelineid, timeline)| (*timelineid, timeline.clone())) + .collect(); + drop(timelines); - for (timelineid, timeline) in timelines.iter() { - let _entered = - info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid) - .entered(); + for (timelineid, timeline) in timelines_to_checkpoint.iter() { + let _entered = + info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered(); - timeline.checkpoint(cconf)?; - } + timeline.checkpoint(cconf)?; } Ok(()) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 43db88f74e..f335ddfbdb 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -297,6 +297,11 @@ impl Layer for DeltaLayer { inner.page_version_metas = VecMap::default(); inner.relsizes = VecMap::default(); inner.loaded = false; + + // Note: we keep the Book open. Is that a good idea? The virtual file + // machinery has its own rules for closing the file descriptor if it's not + // needed, but the Book struct uses up some memory, too. + Ok(()) } @@ -410,7 +415,7 @@ impl DeltaLayer { end_lsn, dropped, inner: Mutex::new(DeltaLayerInner { - loaded: true, + loaded: false, book: None, page_version_metas: VecMap::default(), relsizes, @@ -495,8 +500,13 @@ impl DeltaLayer { } let path = self.path(); - let file = VirtualFile::open(&path)?; - let book = Book::new(file)?; + + // Open the file if it's not open already. + if inner.book.is_none() { + let file = VirtualFile::open(&path)?; + inner.book = Some(Book::new(file)?); + } + let book = inner.book.as_ref().unwrap(); match &self.path_or_conf { PathOrConf::Conf(_) => { @@ -531,12 +541,9 @@ impl DeltaLayer { debug!("loaded from {}", &path.display()); - *inner = DeltaLayerInner { - loaded: true, - book: Some(book), - page_version_metas, - relsizes, - }; + inner.page_version_metas = page_version_metas; + inner.relsizes = relsizes; + inner.loaded = true; Ok(inner) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 1f84ed8507..ba4f1aa1e5 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -321,14 +321,24 @@ fn walreceiver_main( }; if let Some(last_lsn) = status_update { - // TODO: More thought should go into what values are sent here. let last_lsn = PgLsn::from(u64::from(last_lsn)); - // We are using disk consistent LSN as `write_lsn`, i.e. LSN at which page server - // may guarantee persistence of all received data. Safekeeper is not free to remove - // WAL preceding `write_lsn`: it should not be requested by this page server. - let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); - let flush_lsn = last_lsn; - let apply_lsn = PgLsn::from(0); + + // The last LSN we processed. It is not guaranteed to survive pageserver crash. + let write_lsn = last_lsn; + // This value doesn't guarantee data durability, but it's ok. + // In setup with WAL service, pageserver durability is guaranteed by safekeepers. + // In setup without WAL service, we just don't care. + let flush_lsn = write_lsn; + // `disk_consistent_lsn` is the LSN at which page server guarantees persistence of all received data + // Depending on the setup we recieve WAL directly from Compute Node or + // from a WAL service. + // + // Senders use the feedback to determine if we are caught up: + // - Safekeepers are free to remove WAL preceding `apply_lsn`, + // as it will never be requested by this page server. + // - Compute Node uses 'apply_lsn' to calculate a lag for back pressure mechanism + // (delay WAL inserts to avoid lagging pageserver responses and WAL overflow). + let apply_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); let ts = SystemTime::now(); const NO_REPLY: u8 = 0; physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 7d21b2fc51..7fe4f40158 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -187,8 +187,13 @@ fn find_end_of_wal_segment( let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize; if xl_tot_len == 0 { info!( - "find_end_of_wal_segment reached zeros at {:?}", - Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)) + "find_end_of_wal_segment reached zeros at {:?}, last records ends at {:?}", + Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)), + Lsn(XLogSegNoOffsetToRecPtr( + segno, + last_valid_rec_pos as u32, + wal_seg_size + )) ); break; // zeros, reached the end } @@ -303,12 +308,17 @@ pub fn find_end_of_wal( high_segno, ); } + let start_offset = if start_lsn.segment_number(wal_seg_size) == high_segno { + start_lsn.segment_offset(wal_seg_size) + } else { + 0 + }; high_offs = find_end_of_wal_segment( data_dir, high_segno, high_tli, wal_seg_size, - start_lsn.segment_offset(wal_seg_size), + start_offset, )?; } let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); diff --git a/scripts/coverage b/scripts/coverage new file mode 100755 index 0000000000..9ad282a7b2 --- /dev/null +++ b/scripts/coverage @@ -0,0 +1,510 @@ +#!/usr/bin/env python3 + +# Here'a good link in case you're interested in learning more +# about current deficiencies of rust code coverage story: +# https://github.com/rust-lang/rust/issues?q=is%3Aissue+is%3Aopen+instrument-coverage+label%3AA-code-coverage +# +# Also a couple of inspirational tools which I deliberately ended up not using: +# * https://github.com/mozilla/grcov +# * https://github.com/taiki-e/cargo-llvm-cov +# * https://github.com/llvm/llvm-project/tree/main/llvm/test/tools/llvm-cov + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path +from tempfile import TemporaryDirectory +from textwrap import dedent +from typing import Any, Iterable, List, Optional + +import argparse +import json +import os +import shutil +import subprocess +import sys + + +def intersperse(sep: Any, iterable: Iterable[Any]): + fst = True + for item in iterable: + if not fst: + yield sep + fst = False + yield item + + +def find_demangler(demangler=None): + known_tools = ['c++filt', 'rustfilt', 'llvm-cxxfilt'] + + if demangler: + # Explicit argument has precedence over `known_tools` + demanglers = [demangler] + else: + demanglers = known_tools + + for demangler in demanglers: + if shutil.which(demangler): + return demangler + + raise Exception(' '.join([ + 'Failed to find symbol demangler.', + 'Please install it or provide another tool', + f"(e.g. {', '.join(known_tools)})", + ])) + + +class Cargo: + def __init__(self, cwd: Path): + self.cwd = cwd + self.target_dir = Path(os.environ.get('CARGO_TARGET_DIR', cwd / 'target')).resolve() + self._rustlib_dir = None + + @property + def rustlib_dir(self): + if not self._rustlib_dir: + cmd = [ + 'cargo', + '-Zunstable-options', + 'rustc', + '--print=target-libdir', + ] + self._rustlib_dir = Path(subprocess.check_output(cmd, cwd=self.cwd, text=True)).parent + + return self._rustlib_dir + + def binaries(self, profile: str) -> List[str]: + executables = [] + + # This will emit json messages containing test binaries names + cmd = [ + 'cargo', + 'test', + '--no-run', + '--message-format=json', + ] + env = dict(os.environ, PROFILE=profile) + output = subprocess.check_output(cmd, cwd=self.cwd, env=env, text=True) + + for line in output.splitlines(keepends=False): + meta = json.loads(line) + exe = meta.get('executable') + if exe: + executables.append(exe) + + # Metadata contains crate names, which can be used + # to recover names of executables, e.g. `pageserver` + cmd = [ + 'cargo', + 'metadata', + '--format-version=1', + '--no-deps', + ] + meta = json.loads(subprocess.check_output(cmd, cwd=self.cwd)) + + for pkg in meta.get('packages', []): + for target in pkg.get('targets', []): + if 'bin' in target['kind']: + exe = self.target_dir / profile / target['name'] + if exe.exists(): + executables.append(str(exe)) + + return executables + + +@dataclass +class LLVM: + cargo: Cargo + + def resolve_tool(self, name: str) -> str: + exe = self.cargo.rustlib_dir / 'bin' / name + if exe.exists(): + return str(exe) + + if not shutil.which(name): + # Show a user-friendly warning + raise Exception(' '.join([ + f"It appears that you don't have `{name}` installed.", + "Please execute `rustup component add llvm-tools-preview`,", + "or install it via your package manager of choice.", + "LLVM tools should be the same version as LLVM in `rustc --version --verbose`.", + ])) + + return name + + def profdata(self, input_dir: Path, output_profdata: Path): + profraws = [f for f in input_dir.iterdir() if f.suffix == '.profraw'] + if not profraws: + raise Exception(f'No profraw files found at {input_dir}') + + with open(input_dir / 'profraw.list', 'w') as input_files: + profraw_mtime = 0 + for profraw in profraws: + profraw_mtime = max(profraw_mtime, profraw.stat().st_mtime_ns) + print(profraw, file=input_files) + input_files.flush() + + try: + profdata_mtime = output_profdata.stat().st_mtime_ns + except FileNotFoundError: + profdata_mtime = 0 + + # An obvious make-ish optimization + if profraw_mtime >= profdata_mtime: + subprocess.check_call([ + self.resolve_tool('llvm-profdata'), + 'merge', + '-sparse', + f'-input-files={input_files.name}', + f'-output={output_profdata}', + ]) + + def _cov(self, + *extras, + subcommand: str, + profdata: Path, + objects: List[str], + sources: List[str], + demangler: Optional[str] = None) -> None: + + cwd = self.cargo.cwd + objects = list(intersperse('-object', objects)) + extras = list(extras) + + # For some reason `rustc` produces relative paths to src files, + # so we force it to cut the $PWD prefix. + # see: https://github.com/rust-lang/rust/issues/34701#issuecomment-739809584 + if sources: + extras.append(f'-path-equivalence=.,{cwd.resolve()}') + + if demangler: + extras.append(f'-Xdemangler={demangler}') + + cmd = [ + self.resolve_tool('llvm-cov'), + subcommand, # '-dump-collected-paths', # classified debug flag + '-instr-profile', + str(profdata), + *extras, + *objects, + *sources, + ] + subprocess.check_call(cmd, cwd=cwd) + + def cov_report(self, **kwargs) -> None: + self._cov(subcommand='report', **kwargs) + + def cov_export(self, *, kind: str, **kwargs) -> None: + extras = [f'-format={kind}'] + self._cov(subcommand='export', *extras, **kwargs) + + def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None: + extras = [f'-format={kind}'] + if output_dir: + extras.append(f'-output-dir={output_dir}') + + self._cov(subcommand='show', *extras, **kwargs) + + +@dataclass +class Report(ABC): + """ Common properties of a coverage report """ + + llvm: LLVM + demangler: str + profdata: Path + objects: List[str] + sources: List[str] + + def _common_kwargs(self): + return dict(profdata=self.profdata, + objects=self.objects, + sources=self.sources, + demangler=self.demangler) + + @abstractmethod + def generate(self): + pass + + def open(self): + # Do nothing by default + pass + + +class SummaryReport(Report): + def generate(self): + self.llvm.cov_report(**self._common_kwargs()) + + +class TextReport(Report): + def generate(self): + self.llvm.cov_show(kind='text', **self._common_kwargs()) + + +class LcovReport(Report): + def generate(self): + self.llvm.cov_export(kind='lcov', **self._common_kwargs()) + + +@dataclass +class HtmlReport(Report): + output_dir: Path + + def generate(self): + self.llvm.cov_show(kind='html', output_dir=self.output_dir, **self._common_kwargs()) + print(f'HTML report is located at `{self.output_dir}`') + + def open(self): + tool = dict(linux='xdg-open', darwin='open').get(sys.platform) + if not tool: + raise Exception(f'Unknown platform {sys.platform}') + + subprocess.check_call([tool, self.output_dir / 'index.html'], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + + +@dataclass +class GithubPagesReport(HtmlReport): + output_dir: Path + commit_url: str + + def generate(self): + def index_path(path): + return path / 'index.html' + + common = self._common_kwargs() + # Provide default sources if there's none + common.setdefault('sources', ['.']) + + self.llvm.cov_show(kind='html', output_dir=self.output_dir, **common) + shutil.copy(index_path(self.output_dir), self.output_dir / 'local.html') + + with TemporaryDirectory() as tmp: + output_dir = Path(tmp) + args = dict(common, sources=[]) + self.llvm.cov_show(kind='html', output_dir=output_dir, **args) + shutil.copy(index_path(output_dir), self.output_dir / 'all.html') + + with open(index_path(self.output_dir), 'w') as index: + commit_sha = self.commit_url.rsplit('/', maxsplit=1)[-1][:10] + + html = f""" + + + + Coverage ({commit_sha}) + + +

+ Coverage report for commit + + {commit_sha} + +

+ +

+ + Show only local sources + +

+ +

+ + Show all sources (including dependencies) + +

+ + + """ + index.write(dedent(html)) + + print(f'HTML report is located at `{self.output_dir}`') + + +class State: + def __init__(self, cwd: Path, top_dir: Optional[Path], profraw_prefix: Optional[str]): + # Use hostname by default + profraw_prefix = profraw_prefix or '%h' + + self.cwd = cwd + self.cargo = Cargo(self.cwd) + self.llvm = LLVM(self.cargo) + + self.top_dir = top_dir or self.cargo.target_dir / 'coverage' + self.report_dir = self.top_dir / 'report' + + # Directory for raw coverage data emitted by executables + self.profraw_dir = self.top_dir / 'profraw' + self.profraw_dir.mkdir(parents=True, exist_ok=True) + + # Aggregated coverage data + self.profdata_file = self.top_dir / 'coverage.profdata' + + # Dump all coverage data files into a dedicated directory. + # Each filename is parameterized by PID & executable's signature. + os.environ['LLVM_PROFILE_FILE'] = str(self.profraw_dir / + f'cov-{profraw_prefix}-%p-%m.profraw') + + os.environ['RUSTFLAGS'] = ' '.join([ + os.environ.get('RUSTFLAGS', ''), + # Enable LLVM's source-based coverage + # see: https://clang.llvm.org/docs/SourceBasedCodeCoverage.html + # see: https://blog.rust-lang.org/inside-rust/2020/11/12/source-based-code-coverage.html + '-Zinstrument-coverage', + # Link every bit of code to prevent "holes" in coverage report + # see: https://doc.rust-lang.org/rustc/codegen-options/index.html#link-dead-code + '-Clink-dead-code', + # Some of the paths that `rustc` embeds into binaries are absolute, others are relative. + # The point is, we can't have both, because depending on `-path-equivalence`, `llvm-cov` + # either will cripple absolute paths or won't be able to show relative paths at all. + # There's no way to turn relative paths into absolute, so we strip $PWD prefix. + # Only source files of deps (e.g. `$HOME/.cargo`) will keep their absolute paths, + # but we won't include them in report by default (but see `--all`). + f'--remap-path-prefix {self.cwd}=', + ]) + + # XXX: God, have mercy on our souls... + # see: https://github.com/rust-lang/rust/pull/90132 + os.environ['RUSTC_BOOTSTRAP'] = '1' + + def do_run(self, args): + subprocess.check_call([*args.command, *args.args]) + + def do_report(self, args): + if args.all and args.sources: + raise Exception('--all should not be used with sources') + + # see man for `llvm-cov show [sources]` + if args.all: + sources = [] + elif not args.sources: + sources = ['.'] + else: + sources = args.sources + + print('* Merging profraw files') + self.llvm.profdata(self.profraw_dir, self.profdata_file) + + objects = [] + if args.input_objects: + print('* Collecting object files using --input-objects') + with open(args.input_objects) as f: + objects.extend(f.read().splitlines(keepends=False)) + if args.cargo_objects == 'true' or (args.cargo_objects == 'auto' + and not args.input_objects): + print('* Collecting object files using cargo') + objects.extend(self.cargo.binaries(args.profile)) + + params = dict(llvm=self.llvm, + demangler=find_demangler(args.demangler), + profdata=self.profdata_file, + objects=objects, + sources=sources) + + formats = { + 'html': + lambda: HtmlReport(**params, output_dir=self.report_dir), + 'text': + lambda: TextReport(**params), + 'lcov': + lambda: LcovReport(**params), + 'summary': + lambda: SummaryReport(**params), + 'github': + lambda: GithubPagesReport( + **params, output_dir=self.report_dir, commit_url=args.commit_url), + } + + report = formats.get(args.format)() + if not report: + raise Exception('Format `{args.format}` is not supported') + + print(f'* Rendering coverage report ({args.format})') + report.generate() + + if args.open: + print('* Opening the report') + report.open() + + def do_clean(self, args): + # Wipe everything if no filters have been provided + if not (args.report or args.prof): + shutil.rmtree(self.top_dir, ignore_errors=True) + else: + if args.report: + shutil.rmtree(self.report_dir, ignore_errors=True) + if args.prof: + self.profdata_file.unlink(missing_ok=True) + + +def main(): + app = sys.argv[0] + example = f""" +prerequisites: + # alternatively, install a system package for `llvm-tools` + rustup component add llvm-tools-preview + +self-contained example: + {app} run make + {app} run pipenv run pytest test_runner + {app} run cargo test + {app} report --open + """ + + parser = argparse.ArgumentParser(description='Coverage report builder', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=example) + parser.add_argument('--dir', type=Path, help='output directory') + parser.add_argument('--profraw-prefix', metavar='STRING', type=str) + + commands = parser.add_subparsers(title='commands', dest='subparser_name') + + p_run = commands.add_parser('run', help='run a command with magic env') + p_run.add_argument('command', nargs=1) + p_run.add_argument('args', nargs=argparse.REMAINDER) + + p_report = commands.add_parser('report', help='generate a coverage report') + p_report.add_argument('--profile', + default='debug', + choices=('debug', 'release'), + help='cargo build profile') + p_report.add_argument('--format', + default='html', + choices=('html', 'text', 'summary', 'lcov', 'github'), + help='report format') + p_report.add_argument('--input-objects', + metavar='FILE', + type=Path, + help='file containing list of binaries') + p_report.add_argument('--cargo-objects', + default='auto', + choices=('auto', 'true', 'false'), + help='use cargo for auto discovery of binaries') + p_report.add_argument('--commit-url', type=str, help='required for --format=github') + p_report.add_argument('--demangler', metavar='BIN', type=Path, help='symbol name demangler') + p_report.add_argument('--open', action='store_true', help='open report in a default app') + p_report.add_argument('--all', action='store_true', help='show everything, e.g. deps') + p_report.add_argument('sources', nargs='*', type=Path, help='source file or directory') + + p_clean = commands.add_parser('clean', help='wipe coverage artifacts') + p_clean.add_argument('--report', action='store_true', help='pick generated report') + p_clean.add_argument('--prof', action='store_true', help='pick *.profdata & *.profraw') + + args = parser.parse_args() + state = State(cwd=Path.cwd(), top_dir=args.dir, profraw_prefix=args.profraw_prefix) + + commands = { + 'run': state.do_run, + 'report': state.do_report, + 'clean': state.do_clean, + } + + action = commands.get(args.subparser_name) + if action: + action(args) + else: + parser.print_help() + + +if __name__ == '__main__': + main() diff --git a/scripts/git-upload b/scripts/git-upload new file mode 100755 index 0000000000..5298b693af --- /dev/null +++ b/scripts/git-upload @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +from contextlib import contextmanager +from tempfile import TemporaryDirectory +from pathlib import Path + +import argparse +import os +import shutil +import subprocess +import sys + + +def absolute_path(path): + return Path(path).resolve() + + +def relative_path(path): + path = Path(path) + if path.is_absolute(): + raise Exception(f'path `{path}` must be relative!') + return path + + +@contextmanager +def chdir(cwd: Path): + old = os.getcwd() + os.chdir(cwd) + try: + yield cwd + finally: + os.chdir(old) + + +def run(cmd, *args, **kwargs): + print('$', ' '.join(cmd)) + subprocess.check_call(cmd, *args, **kwargs) + + +class GitRepo: + def __init__(self, url): + self.url = url + self.cwd = TemporaryDirectory() + + subprocess.check_call([ + 'git', + 'clone', + str(url), + self.cwd.name, + ]) + + def is_dirty(self): + res = subprocess.check_output(['git', 'status', '--porcelain'], text=True).strip() + return bool(res) + + def update(self, message, action, branch=None): + with chdir(self.cwd.name): + if not branch: + cmd = ['git', 'branch', '--show-current'] + branch = subprocess.check_output(cmd, text=True).strip() + + # Run action in repo's directory + action() + + run(['git', 'add', '.']) + + if not self.is_dirty(): + print('No changes detected, quitting') + return + + run([ + 'git', + '-c', + 'user.name=vipvap', + '-c', + 'user.email=vipvap@zenith.tech', + 'commit', + '--author="vipvap "', + f'--message={message}', + ]) + + for _ in range(5): + try: + run(['git', 'fetch', 'origin', branch]) + run(['git', 'rebase', f'origin/{branch}']) + run(['git', 'push', 'origin', branch]) + return + + except subprocess.CalledProcessError as e: + print(f'failed to update branch `{branch}`: {e}', file=sys.stderr) + + raise Exception(f'failed to update branch `{branch}`') + + +def do_copy(args): + src = args.src + dst = args.dst + + try: + if src.is_dir(): + shutil.copytree(src, dst) + else: + shutil.copy(src, dst) + except FileExistsError: + if args.forbid_overwrite: + raise + + +def main(): + parser = argparse.ArgumentParser(description='Git upload tool') + parser.add_argument('--repo', type=str, metavar='URL', required=True, help='git repo url') + parser.add_argument('--message', type=str, metavar='TEXT', help='commit message') + + commands = parser.add_subparsers(title='commands', dest='subparser_name') + + p_copy = commands.add_parser('copy', help='copy file into the repo') + p_copy.add_argument('src', type=absolute_path, help='source path') + p_copy.add_argument('dst', type=relative_path, help='relative dest path') + p_copy.add_argument('--forbid-overwrite', action='store_true', help='do not allow overwrites') + + args = parser.parse_args() + + commands = { + 'copy': do_copy, + } + + action = commands.get(args.subparser_name) + if action: + message = args.message or 'update' + GitRepo(args.repo).update(message, lambda: action(args)) + else: + parser.print_usage() + + +if __name__ == '__main__': + main() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 57cf379a96..120e6c769b 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -392,6 +392,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin): "lm_prefix": "prefix", "lm_message": "message", "set_commit_lsn": True, + "send_proposer_elected": True, "term": 2, "begin_lsn": begin_lsn, "epoch_start_lsn": epoch_start_lsn, diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 326cb060d1..954e9d75ff 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -509,6 +509,12 @@ sync = false # Disable fsyncs to make the tests go faster env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir) env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) + # Pass coverage settings + var = 'LLVM_PROFILE_FILE' + val = os.environ.get(var) + if val: + env_vars[var] = val + # Intercept CalledProcessError and print more info try: res = subprocess.run(args, diff --git a/vendor/postgres b/vendor/postgres index 08878b19d3..be8bdba074 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 08878b19d3cae5a1bd765bf7396187b6b806c6ac +Subproject commit be8bdba074baf2a4c7f8fb2cc701c2b3fac9342f diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 539a925ebd..69d5f681c5 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -28,6 +28,7 @@ anyhow = "1.0" crc32c = "0.6.0" humantime = "2.1.0" walkdir = "2" +signal-hook = "0.3.10" serde = { version = "1.0", features = ["derive"] } hex = "0.4.3" const_format = "0.2.21" diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index e85d49a8c6..0f06983574 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -8,14 +8,15 @@ use daemonize::Daemonize; use log::*; use std::path::{Path, PathBuf}; use std::thread; -use zenith_utils::http::endpoint; -use zenith_utils::{logging, tcp_listener, GIT_VERSION}; - use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use walkeeper::http; use walkeeper::s3_offload; use walkeeper::wal_service; use walkeeper::SafeKeeperConf; +use zenith_utils::http::endpoint; +use zenith_utils::shutdown::exit_now; +use zenith_utils::signals; +use zenith_utils::{logging, tcp_listener, GIT_VERSION}; fn main() -> Result<()> { zenith_metrics::set_common_metrics_prefix("safekeeper"); @@ -131,6 +132,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { e })?; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -145,51 +147,59 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { .stdout(stdout) .stderr(stderr); - match daemonize.start() { + // XXX: The parent process should exit abruptly right after + // it has spawned a child to prevent coverage machinery from + // dumping stats into a `profraw` file now owned by the child. + // Otherwise, the coverage data will be damaged. + match daemonize.exit_action(|| exit_now(0)).start() { Ok(_) => info!("Success, daemonized"), Err(e) => error!("Error, {}", e), } } - let mut threads = Vec::new(); + let signals = signals::install_shutdown_handlers()?; + let mut threads = vec![]; - let conf_cloned = conf.clone(); - let http_endpoint_thread = thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(|| { - // TODO authentication - let router = http::make_router(conf_cloned); - endpoint::serve_thread_main(router, http_listener).unwrap(); - }) - .unwrap(); - threads.push(http_endpoint_thread); + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("http_endpoint_thread".into()) + .spawn(|| { + // TODO authentication + let router = http::make_router(conf_); + endpoint::serve_thread_main(router, http_listener).unwrap(); + })?, + ); if conf.ttl.is_some() { - let s3_conf = conf.clone(); - let s3_offload_thread = thread::Builder::new() - .name("S3 offload thread".into()) + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("S3 offload thread".into()) + .spawn(|| { + s3_offload::thread_main(conf_); + })?, + ); + } + + threads.push( + thread::Builder::new() + .name("WAL acceptor thread".into()) .spawn(|| { - // thread code - s3_offload::thread_main(s3_conf); - }) - .unwrap(); - threads.push(s3_offload_thread); - } + let thread_result = wal_service::thread_main(conf, pg_listener); + if let Err(e) = thread_result { + info!("wal_service thread terminated: {}", e); + } + })?, + ); - let wal_acceptor_thread = thread::Builder::new() - .name("WAL acceptor thread".into()) - .spawn(|| { - // thread code - let thread_result = wal_service::thread_main(conf, pg_listener); - if let Err(e) = thread_result { - info!("wal_service thread terminated: {}", e); - } - }) - .unwrap(); - threads.push(wal_acceptor_thread); - - for t in threads { - t.join().unwrap() - } - Ok(()) + // NOTE: we still have to handle signals like SIGQUIT to prevent coredumps + signals.handle(|signal| { + // TODO: implement graceful shutdown with joining threads etc + info!( + "Got {}. Terminating in immediate shutdown mode", + signal.name() + ); + std::process::exit(111); + }) } diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index 159ec17a9b..dc1905a5a7 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -7,7 +7,8 @@ use std::fmt::Display; use std::sync::Arc; use zenith_utils::lsn::Lsn; -use crate::safekeeper::AcceptorState; +use crate::safekeeper::Term; +use crate::safekeeper::TermHistory; use crate::timeline::CreateControlFile; use crate::timeline::GlobalTimelines; use crate::SafeKeeperConf; @@ -29,6 +30,7 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { .as_ref() } +/// Serialize through Display trait. fn display_serialize(z: &F, s: S) -> Result where S: Serializer, @@ -37,6 +39,14 @@ where s.serialize_str(&format!("{}", z)) } +/// Augment AcceptorState with epoch for convenience +#[derive(Debug, Serialize)] +struct AcceptorStateStatus { + term: Term, + epoch: Term, + term_history: TermHistory, +} + /// Info about timeline on safekeeper ready for reporting. #[derive(Debug, Serialize)] struct TimelineStatus { @@ -44,7 +54,7 @@ struct TimelineStatus { tenant_id: ZTenantId, #[serde(serialize_with = "display_serialize")] timeline_id: ZTimelineId, - acceptor_state: AcceptorState, + acceptor_state: AcceptorStateStatus, #[serde(serialize_with = "display_serialize")] commit_lsn: Lsn, #[serde(serialize_with = "display_serialize")] @@ -68,10 +78,16 @@ async fn timeline_status_handler(request: Request) -> Result Result<()> { let response = swh.timeline.get().process_msg(&greeting_request)?; match response { - AcceptorProposerMessage::Greeting(_) => Ok(()), + Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), _ => anyhow::bail!("not GreetingResponse"), } } +fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> { + // add new term to existing history + let history = swh.timeline.get().get_info().acceptor_state.term_history; + let history = history.up_to(lsn.checked_sub(1u64).unwrap()); + let mut history_entries = history.0; + history_entries.push(TermSwitchEntry { term, lsn }); + let history = TermHistory(history_entries); + + let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected { + term, + start_streaming_at: lsn, + term_history: history, + }); + + swh.timeline.get().process_msg(&proposer_elected_request)?; + Ok(()) +} + #[derive(Serialize, Deserialize)] struct InsertedWAL { begin_lsn: Lsn, @@ -150,7 +176,7 @@ fn append_logical_message( let response = swh.timeline.get().process_msg(&append_request)?; let append_response = match response { - AcceptorProposerMessage::AppendResponse(resp) => resp, + Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, _ => anyhow::bail!("not AppendResponse"), }; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 9498980802..a653c41922 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result}; use bytes::Bytes; +use bytes::BytesMut; use log::*; use postgres::{Client, Config, NoTls}; @@ -98,7 +99,7 @@ impl<'pg> ReceiveWalConn<'pg> { // Send message to the postgres fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> { - let mut buf = Vec::new(); + let mut buf = BytesMut::with_capacity(128); msg.serialize(&mut buf)?; self.pg_backend.write_message(&BeMessage::CopyData(&buf))?; Ok(()) @@ -147,7 +148,9 @@ impl<'pg> ReceiveWalConn<'pg> { .get() .process_msg(&msg) .with_context(|| "failed to process ProposerAcceptorMessage")?; - self.write_msg(&reply)?; + if let Some(reply) = reply { + self.write_msg(&reply)?; + } msg = self.read_msg()?; } } diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 9f9e974336..f3486e2885 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -55,9 +55,9 @@ impl HotStandbyFeedback { /// Standby status update #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StandbyReply { - pub write_lsn: Lsn, // disk consistent lSN - pub flush_lsn: Lsn, // LSN committedby quorum - pub apply_lsn: Lsn, // not used + pub write_lsn: Lsn, // not used + pub flush_lsn: Lsn, // not used + pub apply_lsn: Lsn, // pageserver's disk consistent lSN pub reply_ts: TimestampTz, pub reply_requested: bool, } @@ -115,7 +115,7 @@ impl ReplicationConn { Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; - state.disk_consistent_lsn = reply.write_lsn; + state.disk_consistent_lsn = reply.apply_lsn; timeline.update_replica_state(replica, Some(state)); } _ => warn!("unexpected message {:?}", msg), diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 0b25241165..2a15bb3fc6 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -4,16 +4,16 @@ use anyhow::Context; use anyhow::{anyhow, bail, Result}; use byteorder::LittleEndian; use byteorder::ReadBytesExt; -use byteorder::WriteBytesExt; use bytes::Buf; +use bytes::BufMut; use bytes::Bytes; +use bytes::BytesMut; use log::*; use pageserver::waldecoder::WalStreamDecoder; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; -use std::cmp::max; use std::cmp::min; -use std::io; +use std::fmt; use std::io::Read; use lazy_static::lazy_static; @@ -37,6 +37,70 @@ const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct TermSwitchEntry { + pub term: Term, + pub lsn: Lsn, +} +#[derive(Clone, Serialize, Deserialize)] +pub struct TermHistory(pub Vec); + +impl TermHistory { + pub fn empty() -> TermHistory { + TermHistory(Vec::new()) + } + + // Parse TermHistory as n_entries followed by TermSwitchEntry pairs + pub fn from_bytes(mut bytes: Bytes) -> Result { + if bytes.remaining() < 4 { + bail!("TermHistory misses len"); + } + let n_entries = bytes.get_u32_le(); + let mut res = Vec::with_capacity(n_entries as usize); + for _ in 0..n_entries { + if bytes.remaining() < 16 { + bail!("TermHistory is incomplete"); + } + res.push(TermSwitchEntry { + term: bytes.get_u64_le(), + lsn: bytes.get_u64_le().into(), + }) + } + Ok(TermHistory(res)) + } + + /// Return copy of self with switches happening strictly after up_to + /// truncated. + pub fn up_to(&self, up_to: Lsn) -> TermHistory { + let mut res = Vec::with_capacity(self.0.len()); + for e in &self.0 { + if e.lsn > up_to { + break; + } + res.push(*e); + } + TermHistory(res) + } +} + +/// Display only latest entries for Debug. +impl fmt::Debug for TermHistory { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let n_printed = 20; + write!( + fmt, + "{}{:?}", + if self.0.len() > n_printed { "... " } else { "" }, + self.0 + .iter() + .rev() + .take(n_printed) + .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry + .collect::>() + ) + } +} + /// Unique id of proposer. Not needed for correctness, used for monitoring. type PgUuid = [u8; 16]; @@ -45,8 +109,21 @@ type PgUuid = [u8; 16]; pub struct AcceptorState { /// acceptor's last term it voted for (advanced in 1 phase) pub term: Term, - /// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached). - pub epoch: Term, + /// History of term switches for safekeeper's WAL. + /// Actually it often goes *beyond* WAL contents as we adopt term history + /// from the proposer before recovery. + pub term_history: TermHistory, +} + +impl AcceptorState { + /// acceptor's epoch is the term of the highest entry in the log + pub fn get_epoch(&self, flush_lsn: Lsn) -> Term { + let th = self.term_history.up_to(flush_lsn); + match th.0.last() { + Some(e) => e.term, + None => 0, + } + } } /// Information about Postgres. Safekeeper gets it once and then verifies @@ -91,7 +168,10 @@ impl SafeKeeperState { SafeKeeperState { magic: SK_MAGIC, format_version: SK_FORMAT_VERSION, - acceptor_state: AcceptorState { term: 0, epoch: 0 }, + acceptor_state: AcceptorState { + term: 0, + term_history: TermHistory::empty(), + }, server: ServerInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ system_id: 0, /* Postgres system identifier */ @@ -147,16 +227,28 @@ pub struct VoteRequest { /// Vote itself, sent from safekeeper to proposer #[derive(Debug, Serialize)] pub struct VoteResponse { - term: Term, // not really needed, just a sanity check + term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. vote_given: u64, // fixme u64 due to padding - /// Safekeeper's log position, to let proposer choose the most advanced one - epoch: Term, + // Safekeeper flush_lsn (end of WAL) + history of term switches allow + // proposer to choose the most advanced one. flush_lsn: Lsn, truncate_lsn: Lsn, + term_history: TermHistory, +} + +/* + * Proposer -> Acceptor message announcing proposer is elected and communicating + * term history to it. + */ +#[derive(Debug)] +pub struct ProposerElected { + pub term: Term, + pub start_streaming_at: Lsn, + pub term_history: TermHistory, } /// Request with WAL message sent from proposer to safekeeper. Along the way it -/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn. +/// communicates commit_lsn. #[derive(Debug)] pub struct AppendRequest { pub h: AppendRequestHeader, @@ -164,6 +256,7 @@ pub struct AppendRequest { } #[derive(Debug, Clone, Deserialize)] pub struct AppendRequestHeader { + // safekeeper's current term; if it is higher than proposer's, the compute is out of date. pub term: Term, // LSN since the proposer appends WAL; determines epoch switch point. pub epoch_start_lsn: Lsn, @@ -185,7 +278,6 @@ pub struct AppendResponse { // Current term of the safekeeper; if it is higher than proposer's, the // compute is out of date. pub term: Term, - pub epoch: Term, // NOTE: this is physical end of wal on safekeeper; currently it doesn't // make much sense without taking epoch into account, as history can be // diverged. @@ -198,19 +290,32 @@ pub struct AppendResponse { pub hs_feedback: HotStandbyFeedback, } +impl AppendResponse { + fn term_only(term: Term) -> AppendResponse { + AppendResponse { + term, + flush_lsn: Lsn(0), + commit_lsn: Lsn(0), + disk_consistent_lsn: Lsn(0), + hs_feedback: HotStandbyFeedback::empty(), + } + } +} + /// Proposer -> Acceptor messages #[derive(Debug)] pub enum ProposerAcceptorMessage { Greeting(ProposerGreeting), VoteRequest(VoteRequest), + Elected(ProposerElected), AppendRequest(AppendRequest), } impl ProposerAcceptorMessage { /// Parse proposer message. - pub fn parse(msg: Bytes) -> Result { + pub fn parse(msg_bytes: Bytes) -> Result { // xxx using Reader is inefficient but easy to work with bincode - let mut stream = msg.reader(); + let mut stream = msg_bytes.reader(); // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is let tag = stream.read_u64::()? as u8 as char; match tag { @@ -222,6 +327,21 @@ impl ProposerAcceptorMessage { let msg = VoteRequest::des_from(&mut stream)?; Ok(ProposerAcceptorMessage::VoteRequest(msg)) } + 'e' => { + let mut msg_bytes = stream.into_inner(); + if msg_bytes.remaining() < 16 { + bail!("ProposerElected message is not complete"); + } + let term = msg_bytes.get_u64_le(); + let start_streaming_at = msg_bytes.get_u64_le().into(); + let term_history = TermHistory::from_bytes(msg_bytes)?; + let msg = ProposerElected { + term, + start_streaming_at, + term_history, + }; + Ok(ProposerAcceptorMessage::Elected(msg)) + } 'a' => { // read header followed by wal data let hdr = AppendRequestHeader::des_from(&mut stream)?; @@ -259,19 +379,33 @@ pub enum AcceptorProposerMessage { impl AcceptorProposerMessage { /// Serialize acceptor -> proposer message. - pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> { + pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> { match self { AcceptorProposerMessage::Greeting(msg) => { - stream.write_u64::('g' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('g' as u64); + buf.put_u64_le(msg.term); } AcceptorProposerMessage::VoteResponse(msg) => { - stream.write_u64::('v' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('v' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.vote_given); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.truncate_lsn.into()); + buf.put_u32_le(msg.term_history.0.len() as u32); + for e in &msg.term_history.0 { + buf.put_u64_le(e.term); + buf.put_u64_le(e.lsn.into()); + } } AcceptorProposerMessage::AppendResponse(msg) => { - stream.write_u64::('a' as u64)?; - msg.ser_into(stream)?; + buf.put_u64_le('a' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.commit_lsn.into()); + buf.put_u64_le(msg.disk_consistent_lsn.into()); + buf.put_i64_le(msg.hs_feedback.ts); + buf.put_u64_le(msg.hs_feedback.xmin); + buf.put_u64_le(msg.hs_feedback.catalog_xmin); } } @@ -284,6 +418,8 @@ pub trait Storage { fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>; /// Write piece of wal in buf to disk and sync it. fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>; + // Truncate WAL at specified LSN + fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>; } lazy_static! { @@ -357,8 +493,7 @@ pub struct SafeKeeper { pub commit_lsn: Lsn, pub truncate_lsn: Lsn, pub storage: ST, - pub s: SafeKeeperState, // persistent part - pub elected_proposer_term: Term, // for monitoring/debugging + pub s: SafeKeeperState, // persistent part decoder: WalStreamDecoder, } @@ -375,27 +510,40 @@ where truncate_lsn: state.truncate_lsn, storage, s: state, - elected_proposer_term: 0, decoder: WalStreamDecoder::new(Lsn(0)), } } + /// Get history of term switches for the available WAL + fn get_term_history(&self) -> TermHistory { + self.s.acceptor_state.term_history.up_to(self.flush_lsn) + } + + #[cfg(test)] + fn get_epoch(&self) -> Term { + self.s.acceptor_state.get_epoch(self.flush_lsn) + } + /// Process message from proposer and possibly form reply. Concurrent /// callers must exclude each other. pub fn process_msg( &mut self, msg: &ProposerAcceptorMessage, - ) -> Result { + ) -> Result> { match msg { ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), + ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg), ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg), } } /// Handle initial message from proposer: check its sanity and send my /// current term. - fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result { + fn handle_greeting( + &mut self, + msg: &ProposerGreeting, + ) -> Result> { /* Check protocol compatibility */ if msg.protocol_version != SK_PROTOCOL_VERSION { bail!( @@ -429,64 +577,106 @@ where "processed greeting from proposer {:?}, sending term {:?}", msg.proposer_id, self.s.acceptor_state.term ); - Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting { + Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { term: self.s.acceptor_state.term, - })) + }))) } /// Give vote for the given term, if we haven't done that previously. - fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result { + fn handle_vote_request( + &mut self, + msg: &VoteRequest, + ) -> Result> { // initialize with refusal let mut resp = VoteResponse { - term: msg.term, + term: self.s.acceptor_state.term, vote_given: false as u64, - epoch: 0, - flush_lsn: Lsn(0), - truncate_lsn: Lsn(0), + flush_lsn: self.flush_lsn, + truncate_lsn: self.s.truncate_lsn, + term_history: self.get_term_history(), }; if self.s.acceptor_state.term < msg.term { self.s.acceptor_state.term = msg.term; // persist vote before sending it out self.storage.persist(&self.s, true)?; + resp.term = self.s.acceptor_state.term; resp.vote_given = true as u64; - resp.epoch = self.s.acceptor_state.epoch; - resp.flush_lsn = self.flush_lsn; - resp.truncate_lsn = self.s.truncate_lsn; } info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); - Ok(AcceptorProposerMessage::VoteResponse(resp)) + Ok(Some(AcceptorProposerMessage::VoteResponse(resp))) + } + + /// Bump our term if received a note from elected proposer with higher one + fn bump_if_higher(&mut self, term: Term) -> Result<()> { + if self.s.acceptor_state.term < term { + self.s.acceptor_state.term = term; + self.storage.persist(&self.s, true)?; + } + Ok(()) + } + + /// Form AppendResponse from current state. + fn append_response(&self) -> AppendResponse { + AppendResponse { + term: self.s.acceptor_state.term, + flush_lsn: self.flush_lsn, + commit_lsn: self.s.commit_lsn, + disk_consistent_lsn: Lsn(0), + // will be filled by the upper code to avoid bothering safekeeper + hs_feedback: HotStandbyFeedback::empty(), + } + } + + fn handle_elected(&mut self, msg: &ProposerElected) -> Result> { + info!("received ProposerElected {:?}", msg); + self.bump_if_higher(msg.term)?; + // If our term is higher, ignore the message (next feedback will inform the compute) + if self.s.acceptor_state.term > msg.term { + return Ok(None); + } + + // TODO: cross check divergence point + + // streaming must not create a hole + assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at); + + // truncate obsolete part of WAL + if self.flush_lsn != Lsn(0) { + self.storage + .truncate_wal(&self.s.server, msg.start_streaming_at)?; + } + // update our end of WAL pointer + self.flush_lsn = msg.start_streaming_at; + // and now adopt term history from proposer + self.s.acceptor_state.term_history = msg.term_history.clone(); + self.storage.persist(&self.s, true)?; + + info!("start receiving WAL since {:?}", msg.start_streaming_at); + + Ok(None) } /// Handle request to append WAL. #[allow(clippy::comparison_chain)] - fn handle_append_request(&mut self, msg: &AppendRequest) -> Result { - // log first AppendRequest from this proposer - if self.elected_proposer_term < msg.h.term { - info!( - "start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}", - self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn, - ); - self.elected_proposer_term = msg.h.term; + fn handle_append_request( + &mut self, + msg: &AppendRequest, + ) -> Result> { + if self.s.acceptor_state.term < msg.h.term { + bail!("got AppendRequest before ProposerElected"); } - // If our term is lower than elected proposer one, bump it. - if self.s.acceptor_state.term < msg.h.term { - self.s.acceptor_state.term = msg.h.term; - self.storage.persist(&self.s, true)?; - } - // OTOH, if it is higher, immediately refuse the message. - else if self.s.acceptor_state.term > msg.h.term { - let resp = AppendResponse { - term: self.s.acceptor_state.term, - epoch: self.s.acceptor_state.epoch, - commit_lsn: Lsn(0), - flush_lsn: Lsn(0), - disk_consistent_lsn: Lsn(0), - hs_feedback: HotStandbyFeedback::empty(), - }; - return Ok(AcceptorProposerMessage::AppendResponse(resp)); + // If our term is higher, immediately refuse the message. + if self.s.acceptor_state.term > msg.h.term { + let resp = AppendResponse::term_only(self.s.acceptor_state.term); + return Ok(Some(AcceptorProposerMessage::AppendResponse(resp))); } + // After ProposerElected, which performs truncation, we should get only + // indeed append requests (but flush_lsn is advanced only on record + // boundary, so might be less). + assert!(self.flush_lsn <= msg.h.begin_lsn); + self.s.proposer_uuid = msg.h.proposer_uuid; let mut sync_control_file = false; @@ -530,48 +720,21 @@ where } } - /* - * Epoch switch happen when written WAL record cross the boundary. - * The boundary is maximum of last WAL position at this node (FlushLSN) and global - * maximum (vcl) determined by WAL proposer during handshake. - * Switching epoch means that node completes recovery and start writing in the WAL new data. - * XXX: this is wrong, we must actively truncate not matching part of log. - * - * The non-strict inequality is important for us, as proposer in --sync mode doesn't - * generate new records, but to advance commit_lsn epoch switch must happen on majority. - * We can regard this as commit of empty entry in new epoch, this should be safe. - */ - if self.s.acceptor_state.epoch < msg.h.term - && msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn) - { - info!( - "switched to new epoch {} on receival of request end_lsn={:?}, len={:?}", - msg.h.term, - msg.h.end_lsn, - msg.wal_data.len(), - ); - self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ - sync_control_file = true; - } if last_rec_lsn > self.flush_lsn { self.flush_lsn = last_rec_lsn; self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); } - // Advance commit_lsn taking into account what we have locally. xxx this - // is wrapped into epoch check because we overwrite wal instead of - // truncating it, so without it commit_lsn might include wrong part. - // Anyway, nobody is much interested in our commit_lsn while epoch - // switch hasn't happened, right? - // + // Advance commit_lsn taking into account what we have locally. // commit_lsn can be 0, being unknown to new walproposer while he hasn't // collected majority of its epoch acks yet, ignore it in this case. - if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) { + if msg.h.commit_lsn != Lsn(0) { let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); - // If new commit_lsn reached epoch switch, force sync of control file: - // walproposer in sync mode is very interested when this happens. - sync_control_file |= - commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; + // If new commit_lsn reached epoch switch, force sync of control + // file: walproposer in sync mode is very interested when this + // happens. Note: this is for sync-safekeepers mode only, as + // otherwise commit_lsn might jump over epoch_start_lsn. + sync_control_file |= commit_lsn == msg.h.epoch_start_lsn; self.commit_lsn = commit_lsn; self.metrics .commit_lsn @@ -592,15 +755,7 @@ where } self.storage.persist(&self.s, sync_control_file)?; - let resp = AppendResponse { - term: self.s.acceptor_state.term, - epoch: self.s.acceptor_state.epoch, - flush_lsn: self.flush_lsn, - commit_lsn: self.s.commit_lsn, - disk_consistent_lsn: Lsn(0), - // will be filled by caller code to avoid bothering safekeeper - hs_feedback: HotStandbyFeedback::empty(), - }; + let resp = self.append_response(); info!( "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}", msg.wal_data.len(), @@ -609,7 +764,7 @@ where msg.h.truncate_lsn, &resp, ); - Ok(AcceptorProposerMessage::AppendResponse(resp)) + Ok(Some(AcceptorProposerMessage::AppendResponse(resp))) } } @@ -631,6 +786,10 @@ mod tests { fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> { Ok(()) } + + fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> { + Ok(()) + } } #[test] @@ -644,7 +803,7 @@ mod tests { let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); let mut vote_resp = sk.process_msg(&vote_request); match vote_resp.unwrap() { - AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), r => panic!("unexpected response: {:?}", r), } @@ -658,7 +817,7 @@ mod tests { // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request); match vote_resp.unwrap() { - AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), r => panic!("unexpected response: {:?}", r), } } @@ -684,10 +843,21 @@ mod tests { wal_data: Bytes::from_static(b"b"), }; + let pem = ProposerElected { + term: 1, + start_streaming_at: Lsn(1), + term_history: TermHistory(vec![TermSwitchEntry { + term: 1, + lsn: Lsn(3), + }]), + }; + sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) + .unwrap(); + // check that AppendRequest before epochStartLsn doesn't switch epoch let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0); + assert_eq!(sk.get_epoch(), 0); // but record at epochStartLsn does the switch ar_hdr.begin_lsn = Lsn(2); @@ -698,6 +868,7 @@ mod tests { }; let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1); + sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %) + assert_eq!(sk.get_epoch(), 1); } } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index b2698faa82..9e48a833d4 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -317,8 +317,11 @@ impl Timeline { } /// Pass arrived message to the safekeeper. - pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result { - let mut rmsg: AcceptorProposerMessage; + pub fn process_msg( + &self, + msg: &ProposerAcceptorMessage, + ) -> Result> { + let mut rmsg: Option; let commit_lsn: Lsn; { let mut shared_state = self.mutex.lock().unwrap(); @@ -328,7 +331,7 @@ impl Timeline { commit_lsn = shared_state.sk.commit_lsn; // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn - if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg { + if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; resp.disk_consistent_lsn = state.disk_consistent_lsn; @@ -596,6 +599,82 @@ impl Storage for FileStorage { } Ok(()) } + + fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> { + let partial; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + let wal_seg_size = server.wal_seg_size as usize; + let ztli = server.ztli; + + /* Extract WAL location for this block */ + let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; + + /* Open file */ + let mut segno = end_pos.segment_number(wal_seg_size); + // note: we basically don't support changing pg timeline + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name + ".partial"); + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else { + wal_file = OpenOptions::new() + .write(true) + .open(&wal_file_partial_path)?; + partial = true; + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + while xlogoff < wal_seg_size { + let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff); + wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?; + xlogoff += bytes_to_write; + } + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + if !partial { + // Make segment partial once again + fs::rename(&wal_file_path, &wal_file_partial_path)?; + } + // Remove all subsequent segments + loop { + segno += 1; + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone()); + let wal_file_partial_path = self + .conf + .workdir + .join(ztli.to_string()) + .join(wal_file_name.clone() + ".partial"); + // TODO: better use fs::try_exists which is currenty avaialble only in nightly build + if wal_file_path.exists() { + fs::remove_file(&wal_file_path)?; + } else if wal_file_partial_path.exists() { + fs::remove_file(&wal_file_partial_path)?; + } else { + break; + } + } + Ok(()) + } } #[cfg(test)] diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 6292971c21..3a81e9bd38 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -20,18 +20,17 @@ tokio = "1.11" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } nix = "0.23.0" - -zenith_metrics = { path = "../zenith_metrics" } -workspace_hack = { path = "../workspace_hack" } +signal-hook = "0.3.10" rand = "0.8.3" jsonwebtoken = "7" hex = { version = "0.4.3", features = ["serde"] } - rustls = "0.19.1" rustls-split = "0.2.1" - git-version = "0.3.5" +zenith_metrics = { path = "../zenith_metrics" } +workspace_hack = { path = "../workspace_hack" } + [dev-dependencies] hex-literal = "0.3" bytes = "1.0" diff --git a/zenith_utils/src/http/endpoint.rs b/zenith_utils/src/http/endpoint.rs index 9c35f77328..ffb798fe83 100644 --- a/zenith_utils/src/http/endpoint.rs +++ b/zenith_utils/src/http/endpoint.rs @@ -153,7 +153,7 @@ pub fn check_permission(req: &Request, tenantid: Option) -> Res } } -// Send shutdown signal +/// Initiate graceful shutdown of the http endpoint pub fn shutdown() { if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() { let _ = tx.send(()); diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 8730656eab..b0e5131a11 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -40,6 +40,7 @@ pub mod logging; // Misc pub mod accum; +pub mod shutdown; // Utility for binding TcpListeners with proper socket options. pub mod tcp_listener; @@ -47,6 +48,9 @@ pub mod tcp_listener; // Utility for putting a raw file descriptor into non-blocking mode pub mod nonblock; +// Default signal handling +pub mod signals; + // This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages // // we have several cases: @@ -72,5 +76,6 @@ pub mod nonblock; use git_version::git_version; pub const GIT_VERSION: &str = git_version!( prefix = "git:", - fallback = concat!("git-env:", env!("GIT_VERSION")) + fallback = concat!("git-env:", env!("GIT_VERSION")), + args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha ); diff --git a/zenith_utils/src/shutdown.rs b/zenith_utils/src/shutdown.rs new file mode 100644 index 0000000000..7eba905997 --- /dev/null +++ b/zenith_utils/src/shutdown.rs @@ -0,0 +1,6 @@ +/// Immediately terminate the calling process without calling +/// atexit callbacks, C runtime destructors etc. We mainly use +/// this to protect coverage data from concurrent writes. +pub fn exit_now(code: u8) { + unsafe { nix::libc::_exit(code as _) }; +} diff --git a/zenith_utils/src/signals.rs b/zenith_utils/src/signals.rs new file mode 100644 index 0000000000..6586da2339 --- /dev/null +++ b/zenith_utils/src/signals.rs @@ -0,0 +1,59 @@ +use signal_hook::flag; +use signal_hook::iterator::Signals; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +pub use signal_hook::consts::{signal::*, TERM_SIGNALS}; + +pub fn install_shutdown_handlers() -> anyhow::Result { + let term_now = Arc::new(AtomicBool::new(false)); + for sig in TERM_SIGNALS { + // When terminated by a second term signal, exit with exit code 1. + // This will do nothing the first time (because term_now is false). + flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?; + // But this will "arm" the above for the second time, by setting it to true. + // The order of registering these is important, if you put this one first, it will + // first arm and then terminate ‒ all in the first round. + flag::register(*sig, Arc::clone(&term_now))?; + } + + Ok(ShutdownSignals) +} + +pub enum Signal { + Quit, + Interrupt, + Terminate, +} + +impl Signal { + pub fn name(&self) -> &'static str { + match self { + Signal::Quit => "SIGQUIT", + Signal::Interrupt => "SIGINT", + Signal::Terminate => "SIGTERM", + } + } +} + +pub struct ShutdownSignals; + +impl ShutdownSignals { + pub fn handle( + self, + mut handler: impl FnMut(Signal) -> anyhow::Result<()>, + ) -> anyhow::Result<()> { + for raw_signal in Signals::new(TERM_SIGNALS)?.into_iter() { + let signal = match raw_signal { + SIGINT => Signal::Interrupt, + SIGTERM => Signal::Terminate, + SIGQUIT => Signal::Quit, + other => panic!("unknown signal: {}", other), + }; + + handler(signal)?; + } + + Ok(()) + } +}