Merge branch 'main' into main_local

This commit is contained in:
Konstantin Knizhnik
2021-12-06 17:22:20 +03:00
30 changed files with 1508 additions and 333 deletions

View File

@@ -4,7 +4,7 @@ executors:
zenith-build-executor:
resource_class: xlarge
docker:
- image: cimg/rust:1.55.0
- image: cimg/rust:1.56.1
zenith-python-executor:
docker:
- image: cimg/python:3.7.10 # Oldest available 3.7 with Ubuntu 20.04 (for GLIBC and Rust) at CirlceCI
@@ -14,7 +14,6 @@ jobs:
executor: zenith-build-executor
steps:
- checkout
- run:
name: rustfmt
when: always
@@ -81,6 +80,8 @@ jobs:
build_type:
type: enum
enum: ["debug", "release"]
environment:
BUILD_TYPE: << parameters.build_type >>
steps:
- run:
name: apt install dependencies
@@ -116,16 +117,17 @@ jobs:
- run:
name: Rust build << parameters.build_type >>
command: |
export CARGO_INCREMENTAL=0
BUILD_TYPE="<< parameters.build_type >>"
if [[ $BUILD_TYPE == "debug" ]]; then
echo "Build in debug mode"
cargo build --bins --tests
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
echo "Build in release mode"
cargo build --release --bins --tests
cov_prefix=()
CARGO_FLAGS=--release
fi
export CARGO_INCREMENTAL=0
"${cov_prefix[@]}" cargo build $CARGO_FLAGS --bins --tests
- save_cache:
name: Save rust cache
key: v04-rust-cache-deps-<< parameters.build_type >>-{{ checksum "Cargo.lock" }}
@@ -138,45 +140,77 @@ jobs:
# has to run separately from cargo fmt section
# since needs to run with dependencies
- run:
name: clippy
name: cargo clippy
command: |
./run_clippy.sh
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
"${cov_prefix[@]}" ./run_clippy.sh
# Run rust unit tests
- run: cargo test
- run:
name: cargo test
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
"${cov_prefix[@]}" cargo test
# Install the rust binaries, for use by test jobs
# `--locked` is required; otherwise, `cargo install` will ignore Cargo.lock.
# FIXME: this is a really silly way to install; maybe we should just output
# a tarball as an artifact? Or a .deb package?
- run:
name: cargo install
name: Install rust binaries
command: |
export CARGO_INCREMENTAL=0
BUILD_TYPE="<< parameters.build_type >>"
if [[ $BUILD_TYPE == "debug" ]]; then
echo "Install debug mode"
CARGO_FLAGS="--debug"
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
echo "Install release mode"
# The default is release mode; there is no --release flag.
CARGO_FLAGS=""
cov_prefix=()
fi
binaries=$(
"${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
test_exe_paths=$(
"${cov_prefix[@]}" cargo test --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
mkdir -p /tmp/zenith/bin
mkdir -p /tmp/zenith/test_bin
mkdir -p /tmp/zenith/etc
# Install target binaries
for bin in $binaries; do
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/zenith/bin/$bin
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
# Install test executables (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/zenith/test_bin/$(basename $bin)
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
fi
cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path pageserver
cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path walkeeper
cargo install $CARGO_FLAGS --locked --root /tmp/zenith --path zenith
# Install the postgres binaries, for use by test jobs
# FIXME: this is a silly way to do "install"; maybe just output a standard
# postgres package, whatever the favored form is (tarball? .deb package?)
# Note that pg_regress needs some build artifacts that probably aren't
# in the usual package...?
- run:
name: postgres install
name: Install postgres binaries
command: |
cp -a tmp_install /tmp/zenith/pg_install
# Save the rust output binaries for other jobs in this workflow.
# Save the rust binaries and coverage data for other jobs in this workflow.
- persist_to_workspace:
root: /tmp/zenith
paths:
@@ -228,6 +262,8 @@ jobs:
save_perf_report:
type: boolean
default: false
environment:
BUILD_TYPE: << parameters.build_type >>
steps:
- attach_workspace:
at: /tmp/zenith
@@ -241,21 +277,22 @@ jobs:
command: pipenv --python 3.7 install
- run:
name: Run pytest
working_directory: test_runner
# pytest doesn't output test logs in real time, so CI job may fail with
# `Too long with no output` error, if a test is running for a long time.
# In that case, tests should have internal timeouts that are less than
# In that case, tests should have internal timeouts that are less than
# no_output_timeout, specified here.
no_output_timeout: 10m
environment:
- ZENITH_BIN: /tmp/zenith/bin
- POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
- TEST_OUTPUT: /tmp/test_output
# this variable will be embedded in perf test report
# this variable will be embedded in perf test report
# and is needed to distinguish different environments
- PLATFORM: zenith-local-ci
command: |
TEST_SELECTION="<< parameters.test_selection >>"
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
TEST_SELECTION="test_runner/<< parameters.test_selection >>"
EXTRA_PARAMS="<< parameters.extra_params >>"
if [ -z "$TEST_SELECTION" ]; then
echo "test_selection must be set"
@@ -263,16 +300,22 @@ jobs:
fi
if << parameters.run_in_parallel >>; then
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
fi;
fi
if << parameters.save_perf_report >>; then
if [[ $CIRCLE_BRANCH == "main" ]]; then
mkdir -p perf-report-local
EXTRA_PARAMS="--out-dir perf-report-local $EXTRA_PARAMS"
fi;
fi;
mkdir -p "$PERF_REPORT_DIR"
EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS"
fi
fi
export GITHUB_SHA=$CIRCLE_SHA1
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
# Run the tests.
#
# The junit.xml file allows CircleCI to display more fine-grained test information
@@ -283,14 +326,21 @@ jobs:
# -n4 uses four processes to run tests via pytest-xdist
# -s is not used to prevent pytest from capturing output, because tests are running
# in parallel and logs are mixed between different tests
pipenv run pytest --junitxml=$TEST_OUTPUT/junit.xml --tb=short --verbose -m "not remote_cluster" -rA $TEST_SELECTION $EXTRA_PARAMS
"${cov_prefix[@]}" pipenv run pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \
--verbose \
-m "not remote_cluster" \
-rA $TEST_SELECTION $EXTRA_PARAMS
if << parameters.save_perf_report >>; then
if [[ $CIRCLE_BRANCH == "main" ]]; then
REPORT_FROM=$(realpath perf-report-local) REPORT_TO=local ../scripts/generate_and_push_perf_report.sh
fi;
fi;
# TODO: reuse scripts/git-upload
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO=local
scripts/generate_and_push_perf_report.sh
fi
fi
- run:
# CircleCI artifacts are preserved one file at a time, so skipping
# this step isn't a good idea. If you want to extract the
@@ -306,6 +356,65 @@ jobs:
# The store_test_results step tells CircleCI where to find the junit.xml file.
- store_test_results:
path: /tmp/test_output
# Save coverage data (if any)
- persist_to_workspace:
root: /tmp/zenith
paths:
- "*"
coverage-report:
executor: zenith-build-executor
steps:
- attach_workspace:
at: /tmp/zenith
- checkout
- restore_cache:
name: Restore rust cache
keys:
# Require an exact match. While an out of date cache might speed up the build,
# there's no way to clean out old packages, so the cache grows every time something
# changes.
- v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
- run:
name: Install llvm-tools
command: |
# TODO: install a proper symbol demangler, e.g. rustfilt
# TODO: we should embed this into a docker image
rustup component add llvm-tools-preview
- run:
name: Build coverage report
command: |
COMMIT_URL=https://github.com/zenithdb/zenith/commit/$CIRCLE_SHA1
scripts/coverage \
--dir=/tmp/zenith/coverage report \
--input-objects=/tmp/zenith/etc/binaries.list \
--commit-url=$COMMIT_URL \
--format=github
- run:
name: Upload coverage report
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
REPORT_URL=https://zenithdb.github.io/zenith-coverage-data/$CIRCLE_SHA1
COMMIT_URL=https://github.com/zenithdb/zenith/commit/$CIRCLE_SHA1
scripts/git-upload \
--repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/zenithdb/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE
# Add link to the coverage report to the commit
curl -f -X POST \
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"state\": \"success\",
\"context\": \"zenith-coverage\",
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
# Build zenithdb/zenith:latest image and push it to Docker hub
docker-image:
@@ -410,6 +519,12 @@ workflows:
save_perf_report: true
requires:
- build-zenith-release
- coverage-report:
# Context passes credentials for gh api
context: CI_ACCESS_TOKEN
requires:
# TODO: consider adding more
- other-tests-debug
- docker-image:
# Context gives an ability to login
context: Docker Hub

4
.gitignore vendored
View File

@@ -7,3 +7,7 @@ test_output/
.vscode
/.zenith
/integration_tests/.zenith
# Coverage
*.profraw
*.profdata

3
Cargo.lock generated
View File

@@ -1888,7 +1888,6 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1"
dependencies = [
"cc",
"libc",
"signal-hook-registry",
]
@@ -2360,6 +2359,7 @@ dependencies = [
"rust-s3",
"serde",
"serde_json",
"signal-hook",
"tempfile",
"tokio",
"tokio-stream",
@@ -2611,6 +2611,7 @@ dependencies = [
"rustls-split",
"serde",
"serde_json",
"signal-hook",
"tempfile",
"thiserror",
"tokio",

View File

@@ -290,8 +290,12 @@ impl PostgresNode {
conf.append("max_wal_size", "100GB");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_sender_timeout", "0");
conf.append("wal_level", "replica");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreciever will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");
conf.append("max_replication_flush_lag", "160MB");
conf.append("max_replication_apply_lag", "1500MB");
conf.append("listen_addresses", &self.address.ip().to_string());
conf.append("port", &self.address.port().to_string());

View File

@@ -130,9 +130,8 @@ impl SafekeeperNode {
let listen_pg = format!("localhost:{}", self.conf.pg_port);
let listen_http = format!("localhost:{}", self.conf.http_port);
let mut cmd: &mut Command = &mut Command::new(self.env.safekeeper_bin()?);
cmd = cmd
.args(&["-D", self.datadir_path().to_str().unwrap()])
let mut cmd = Command::new(self.env.safekeeper_bin()?);
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--pageserver", &pageserver_conn])
@@ -141,13 +140,18 @@ impl SafekeeperNode {
.env_clear()
.env("RUST_BACKTRACE", "1");
if !self.conf.sync {
cmd = cmd.arg("--no-sync");
cmd.arg("--no-sync");
}
if self.env.pageserver.auth_type == AuthType::ZenithJWT {
cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token);
}
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!(
"Safekeeper failed to start. See '{}' for details.",

View File

@@ -5,7 +5,7 @@ use std::process::Command;
use std::time::Duration;
use std::{io, result, thread};
use anyhow::{anyhow, bail};
use anyhow::bail;
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
@@ -97,7 +97,6 @@ impl PageServerNode {
}
pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let listen_pg = format!("localhost:{}", self.env.pageserver.pg_port);
let listen_http = format!("localhost:{}", self.env.pageserver.http_port);
let mut args = vec![
@@ -122,18 +121,19 @@ impl PageServerNode {
args.extend(&["--create-tenant", tenantid])
}
let status = cmd
.args(args)
.env_clear()
.env("RUST_BACKTRACE", "1")
.status()
.expect("pageserver init failed");
let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(args).env_clear().env("RUST_BACKTRACE", "1");
if status.success() {
Ok(())
} else {
Err(anyhow!("pageserver init failed"))
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!("pageserver init failed");
}
Ok(())
}
pub fn repo_path(&self) -> PathBuf {
@@ -158,6 +158,11 @@ impl PageServerNode {
.env_clear()
.env("RUST_BACKTRACE", "1");
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",

View File

@@ -35,7 +35,7 @@ scopeguard = "1.1.0"
async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
signal-hook = "0.3.10"
url = "2"
nix = "0.23"
once_cell = "1.8.0"

View File

@@ -14,14 +14,6 @@ use tracing::*;
use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use signal_hook::consts::signal::*;
use signal_hook::consts::TERM_SIGNALS;
use signal_hook::flag;
use signal_hook::iterator::exfiltrator::WithOrigin;
use signal_hook::iterator::SignalsInfo;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use clap::{App, Arg, ArgMatches};
use daemonize::Daemonize;
@@ -32,6 +24,8 @@ use pageserver::{
};
use zenith_utils::http::endpoint;
use zenith_utils::postgres_backend;
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals::{self, Signal};
use const_format::formatcp;
@@ -524,17 +518,6 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
info!("version: {}", GIT_VERSION);
let term_now = Arc::new(AtomicBool::new(false));
for sig in TERM_SIGNALS {
// When terminated by a second term signal, exit with exit code 1.
// This will do nothing the first time (because term_now is false).
flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?;
// But this will "arm" the above for the second time, by setting it to true.
// The order of registering these is important, if you put this one first, it will
// first arm and then terminate all in the first round.
flag::register(*sig, Arc::clone(&term_now))?;
}
// TODO: Check that it looks like a valid repository before going further
// bind sockets before daemonizing so we report errors early and do not return until we are listening
@@ -550,6 +533,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
);
let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?;
// XXX: Don't spawn any threads before daemonizing!
if conf.daemonize {
info!("daemonizing...");
@@ -564,18 +548,21 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
.stdout(stdout)
.stderr(stderr);
match daemonize.start() {
// XXX: The parent process should exit abruptly right after
// it has spawned a child to prevent coverage machinery from
// dumping stats into a `profraw` file now owned by the child.
// Otherwise, the coverage data will be damaged.
match daemonize.exit_action(|| exit_now(0)).start() {
Ok(_) => info!("Success, daemonized"),
Err(err) => error!(%err, "could not daemonize"),
}
}
// keep join handles for spawned threads
// don't spawn threads before daemonizing
let mut join_handles = Vec::new();
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
if let Some(handle) = remote_storage::run_storage_sync_thread(conf)? {
join_handles.push(handle);
threads.push(handle);
}
// Initialize tenant manager.
tenant_mgr::init(conf);
@@ -594,61 +581,55 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// Spawn a new thread for the http endpoint
// bind before launching separate thread so the error reported before startup exits
let cloned = auth.clone();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
let router = http::make_router(conf, cloned);
endpoint::serve_thread_main(router, http_listener)
})?;
join_handles.push(http_endpoint_thread);
threads.push(
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
let router = http::make_router(conf, cloned);
endpoint::serve_thread_main(router, http_listener)
})?,
);
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
let page_service_thread = thread::Builder::new()
.name("Page Service thread".into())
.spawn(move || {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?;
threads.push(
thread::Builder::new()
.name("Page Service thread".into())
.spawn(move || {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?,
);
for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() {
match info.signal {
SIGQUIT => {
info!("Got SIGQUIT. Terminate pageserver in immediate shutdown mode");
exit(111);
}
SIGINT | SIGTERM => {
info!("Got SIGINT/SIGTERM. Terminate gracefully in fast shutdown mode");
// Terminate postgres backends
postgres_backend::set_pgbackend_shutdown_requested();
// Stop all tenants and flush their data
tenant_mgr::shutdown_all_tenants()?;
// Wait for pageservice thread to complete the job
page_service_thread
signals.handle(|signal| match signal {
Signal::Quit => {
info!(
"Got {}. Terminating in immediate shutdown mode",
signal.name()
);
std::process::exit(111);
}
Signal::Interrupt | Signal::Terminate => {
info!(
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
postgres_backend::set_pgbackend_shutdown_requested();
tenant_mgr::shutdown_all_tenants()?;
endpoint::shutdown();
for handle in std::mem::take(&mut threads) {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error");
// Shut down http router
endpoint::shutdown();
// Wait for all threads
for handle in join_handles.into_iter() {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error");
}
info!("Pageserver shut down successfully completed");
exit(0);
}
unknown_signal => {
debug!("Unknown signal {}", unknown_signal);
}
info!("Shut down successfully completed");
std::process::exit(0);
}
}
Ok(())
})
}
#[cfg(test)]

View File

@@ -238,16 +238,22 @@ impl Repository for LayeredRepository {
}
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> {
{
let timelines = self.timelines.lock().unwrap();
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_checkpoint: Vec<(ZTimelineId, Arc<LayeredTimeline>)> = timelines
.iter()
.map(|(timelineid, timeline)| (*timelineid, timeline.clone()))
.collect();
drop(timelines);
for (timelineid, timeline) in timelines.iter() {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid)
.entered();
for (timelineid, timeline) in timelines_to_checkpoint.iter() {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered();
timeline.checkpoint(cconf)?;
}
timeline.checkpoint(cconf)?;
}
Ok(())

View File

@@ -297,6 +297,11 @@ impl Layer for DeltaLayer {
inner.page_version_metas = VecMap::default();
inner.relsizes = VecMap::default();
inner.loaded = false;
// Note: we keep the Book open. Is that a good idea? The virtual file
// machinery has its own rules for closing the file descriptor if it's not
// needed, but the Book struct uses up some memory, too.
Ok(())
}
@@ -410,7 +415,7 @@ impl DeltaLayer {
end_lsn,
dropped,
inner: Mutex::new(DeltaLayerInner {
loaded: true,
loaded: false,
book: None,
page_version_metas: VecMap::default(),
relsizes,
@@ -495,8 +500,13 @@ impl DeltaLayer {
}
let path = self.path();
let file = VirtualFile::open(&path)?;
let book = Book::new(file)?;
// Open the file if it's not open already.
if inner.book.is_none() {
let file = VirtualFile::open(&path)?;
inner.book = Some(Book::new(file)?);
}
let book = inner.book.as_ref().unwrap();
match &self.path_or_conf {
PathOrConf::Conf(_) => {
@@ -531,12 +541,9 @@ impl DeltaLayer {
debug!("loaded from {}", &path.display());
*inner = DeltaLayerInner {
loaded: true,
book: Some(book),
page_version_metas,
relsizes,
};
inner.page_version_metas = page_version_metas;
inner.relsizes = relsizes;
inner.loaded = true;
Ok(inner)
}

View File

@@ -321,14 +321,24 @@ fn walreceiver_main(
};
if let Some(last_lsn) = status_update {
// TODO: More thought should go into what values are sent here.
let last_lsn = PgLsn::from(u64::from(last_lsn));
// We are using disk consistent LSN as `write_lsn`, i.e. LSN at which page server
// may guarantee persistence of all received data. Safekeeper is not free to remove
// WAL preceding `write_lsn`: it should not be requested by this page server.
let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
let flush_lsn = last_lsn;
let apply_lsn = PgLsn::from(0);
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = last_lsn;
// This value doesn't guarantee data durability, but it's ok.
// In setup with WAL service, pageserver durability is guaranteed by safekeepers.
// In setup without WAL service, we just don't care.
let flush_lsn = write_lsn;
// `disk_consistent_lsn` is the LSN at which page server guarantees persistence of all received data
// Depending on the setup we recieve WAL directly from Compute Node or
// from a WAL service.
//
// Senders use the feedback to determine if we are caught up:
// - Safekeepers are free to remove WAL preceding `apply_lsn`,
// as it will never be requested by this page server.
// - Compute Node uses 'apply_lsn' to calculate a lag for back pressure mechanism
// (delay WAL inserts to avoid lagging pageserver responses and WAL overflow).
let apply_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
let ts = SystemTime::now();
const NO_REPLY: u8 = 0;
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;

View File

@@ -187,8 +187,13 @@ fn find_end_of_wal_segment(
let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize;
if xl_tot_len == 0 {
info!(
"find_end_of_wal_segment reached zeros at {:?}",
Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size))
"find_end_of_wal_segment reached zeros at {:?}, last records ends at {:?}",
Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)),
Lsn(XLogSegNoOffsetToRecPtr(
segno,
last_valid_rec_pos as u32,
wal_seg_size
))
);
break; // zeros, reached the end
}
@@ -303,12 +308,17 @@ pub fn find_end_of_wal(
high_segno,
);
}
let start_offset = if start_lsn.segment_number(wal_seg_size) == high_segno {
start_lsn.segment_offset(wal_seg_size)
} else {
0
};
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
start_lsn.segment_offset(wal_seg_size),
start_offset,
)?;
}
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);

510
scripts/coverage Executable file
View File

@@ -0,0 +1,510 @@
#!/usr/bin/env python3
# Here'a good link in case you're interested in learning more
# about current deficiencies of rust code coverage story:
# https://github.com/rust-lang/rust/issues?q=is%3Aissue+is%3Aopen+instrument-coverage+label%3AA-code-coverage
#
# Also a couple of inspirational tools which I deliberately ended up not using:
# * https://github.com/mozilla/grcov
# * https://github.com/taiki-e/cargo-llvm-cov
# * https://github.com/llvm/llvm-project/tree/main/llvm/test/tools/llvm-cov
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
from textwrap import dedent
from typing import Any, Iterable, List, Optional
import argparse
import json
import os
import shutil
import subprocess
import sys
def intersperse(sep: Any, iterable: Iterable[Any]):
fst = True
for item in iterable:
if not fst:
yield sep
fst = False
yield item
def find_demangler(demangler=None):
known_tools = ['c++filt', 'rustfilt', 'llvm-cxxfilt']
if demangler:
# Explicit argument has precedence over `known_tools`
demanglers = [demangler]
else:
demanglers = known_tools
for demangler in demanglers:
if shutil.which(demangler):
return demangler
raise Exception(' '.join([
'Failed to find symbol demangler.',
'Please install it or provide another tool',
f"(e.g. {', '.join(known_tools)})",
]))
class Cargo:
def __init__(self, cwd: Path):
self.cwd = cwd
self.target_dir = Path(os.environ.get('CARGO_TARGET_DIR', cwd / 'target')).resolve()
self._rustlib_dir = None
@property
def rustlib_dir(self):
if not self._rustlib_dir:
cmd = [
'cargo',
'-Zunstable-options',
'rustc',
'--print=target-libdir',
]
self._rustlib_dir = Path(subprocess.check_output(cmd, cwd=self.cwd, text=True)).parent
return self._rustlib_dir
def binaries(self, profile: str) -> List[str]:
executables = []
# This will emit json messages containing test binaries names
cmd = [
'cargo',
'test',
'--no-run',
'--message-format=json',
]
env = dict(os.environ, PROFILE=profile)
output = subprocess.check_output(cmd, cwd=self.cwd, env=env, text=True)
for line in output.splitlines(keepends=False):
meta = json.loads(line)
exe = meta.get('executable')
if exe:
executables.append(exe)
# Metadata contains crate names, which can be used
# to recover names of executables, e.g. `pageserver`
cmd = [
'cargo',
'metadata',
'--format-version=1',
'--no-deps',
]
meta = json.loads(subprocess.check_output(cmd, cwd=self.cwd))
for pkg in meta.get('packages', []):
for target in pkg.get('targets', []):
if 'bin' in target['kind']:
exe = self.target_dir / profile / target['name']
if exe.exists():
executables.append(str(exe))
return executables
@dataclass
class LLVM:
cargo: Cargo
def resolve_tool(self, name: str) -> str:
exe = self.cargo.rustlib_dir / 'bin' / name
if exe.exists():
return str(exe)
if not shutil.which(name):
# Show a user-friendly warning
raise Exception(' '.join([
f"It appears that you don't have `{name}` installed.",
"Please execute `rustup component add llvm-tools-preview`,",
"or install it via your package manager of choice.",
"LLVM tools should be the same version as LLVM in `rustc --version --verbose`.",
]))
return name
def profdata(self, input_dir: Path, output_profdata: Path):
profraws = [f for f in input_dir.iterdir() if f.suffix == '.profraw']
if not profraws:
raise Exception(f'No profraw files found at {input_dir}')
with open(input_dir / 'profraw.list', 'w') as input_files:
profraw_mtime = 0
for profraw in profraws:
profraw_mtime = max(profraw_mtime, profraw.stat().st_mtime_ns)
print(profraw, file=input_files)
input_files.flush()
try:
profdata_mtime = output_profdata.stat().st_mtime_ns
except FileNotFoundError:
profdata_mtime = 0
# An obvious make-ish optimization
if profraw_mtime >= profdata_mtime:
subprocess.check_call([
self.resolve_tool('llvm-profdata'),
'merge',
'-sparse',
f'-input-files={input_files.name}',
f'-output={output_profdata}',
])
def _cov(self,
*extras,
subcommand: str,
profdata: Path,
objects: List[str],
sources: List[str],
demangler: Optional[str] = None) -> None:
cwd = self.cargo.cwd
objects = list(intersperse('-object', objects))
extras = list(extras)
# For some reason `rustc` produces relative paths to src files,
# so we force it to cut the $PWD prefix.
# see: https://github.com/rust-lang/rust/issues/34701#issuecomment-739809584
if sources:
extras.append(f'-path-equivalence=.,{cwd.resolve()}')
if demangler:
extras.append(f'-Xdemangler={demangler}')
cmd = [
self.resolve_tool('llvm-cov'),
subcommand, # '-dump-collected-paths', # classified debug flag
'-instr-profile',
str(profdata),
*extras,
*objects,
*sources,
]
subprocess.check_call(cmd, cwd=cwd)
def cov_report(self, **kwargs) -> None:
self._cov(subcommand='report', **kwargs)
def cov_export(self, *, kind: str, **kwargs) -> None:
extras = [f'-format={kind}']
self._cov(subcommand='export', *extras, **kwargs)
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
extras = [f'-format={kind}']
if output_dir:
extras.append(f'-output-dir={output_dir}')
self._cov(subcommand='show', *extras, **kwargs)
@dataclass
class Report(ABC):
""" Common properties of a coverage report """
llvm: LLVM
demangler: str
profdata: Path
objects: List[str]
sources: List[str]
def _common_kwargs(self):
return dict(profdata=self.profdata,
objects=self.objects,
sources=self.sources,
demangler=self.demangler)
@abstractmethod
def generate(self):
pass
def open(self):
# Do nothing by default
pass
class SummaryReport(Report):
def generate(self):
self.llvm.cov_report(**self._common_kwargs())
class TextReport(Report):
def generate(self):
self.llvm.cov_show(kind='text', **self._common_kwargs())
class LcovReport(Report):
def generate(self):
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
@dataclass
class HtmlReport(Report):
output_dir: Path
def generate(self):
self.llvm.cov_show(kind='html', output_dir=self.output_dir, **self._common_kwargs())
print(f'HTML report is located at `{self.output_dir}`')
def open(self):
tool = dict(linux='xdg-open', darwin='open').get(sys.platform)
if not tool:
raise Exception(f'Unknown platform {sys.platform}')
subprocess.check_call([tool, self.output_dir / 'index.html'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
@dataclass
class GithubPagesReport(HtmlReport):
output_dir: Path
commit_url: str
def generate(self):
def index_path(path):
return path / 'index.html'
common = self._common_kwargs()
# Provide default sources if there's none
common.setdefault('sources', ['.'])
self.llvm.cov_show(kind='html', output_dir=self.output_dir, **common)
shutil.copy(index_path(self.output_dir), self.output_dir / 'local.html')
with TemporaryDirectory() as tmp:
output_dir = Path(tmp)
args = dict(common, sources=[])
self.llvm.cov_show(kind='html', output_dir=output_dir, **args)
shutil.copy(index_path(output_dir), self.output_dir / 'all.html')
with open(index_path(self.output_dir), 'w') as index:
commit_sha = self.commit_url.rsplit('/', maxsplit=1)[-1][:10]
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Coverage ({commit_sha})</title>
</head>
<body>
<h1>
Coverage report for commit
<a href="{self.commit_url}">
{commit_sha}
</a>
</h1>
<p>
<a href="./local.html">
<b>Show only local sources</b>
</a>
</p>
<p>
<a href="./all.html">
Show all sources (including dependencies)
</a>
</p>
</body>
</html>
"""
index.write(dedent(html))
print(f'HTML report is located at `{self.output_dir}`')
class State:
def __init__(self, cwd: Path, top_dir: Optional[Path], profraw_prefix: Optional[str]):
# Use hostname by default
profraw_prefix = profraw_prefix or '%h'
self.cwd = cwd
self.cargo = Cargo(self.cwd)
self.llvm = LLVM(self.cargo)
self.top_dir = top_dir or self.cargo.target_dir / 'coverage'
self.report_dir = self.top_dir / 'report'
# Directory for raw coverage data emitted by executables
self.profraw_dir = self.top_dir / 'profraw'
self.profraw_dir.mkdir(parents=True, exist_ok=True)
# Aggregated coverage data
self.profdata_file = self.top_dir / 'coverage.profdata'
# Dump all coverage data files into a dedicated directory.
# Each filename is parameterized by PID & executable's signature.
os.environ['LLVM_PROFILE_FILE'] = str(self.profraw_dir /
f'cov-{profraw_prefix}-%p-%m.profraw')
os.environ['RUSTFLAGS'] = ' '.join([
os.environ.get('RUSTFLAGS', ''),
# Enable LLVM's source-based coverage
# see: https://clang.llvm.org/docs/SourceBasedCodeCoverage.html
# see: https://blog.rust-lang.org/inside-rust/2020/11/12/source-based-code-coverage.html
'-Zinstrument-coverage',
# Link every bit of code to prevent "holes" in coverage report
# see: https://doc.rust-lang.org/rustc/codegen-options/index.html#link-dead-code
'-Clink-dead-code',
# Some of the paths that `rustc` embeds into binaries are absolute, others are relative.
# The point is, we can't have both, because depending on `-path-equivalence`, `llvm-cov`
# either will cripple absolute paths or won't be able to show relative paths at all.
# There's no way to turn relative paths into absolute, so we strip $PWD prefix.
# Only source files of deps (e.g. `$HOME/.cargo`) will keep their absolute paths,
# but we won't include them in report by default (but see `--all`).
f'--remap-path-prefix {self.cwd}=',
])
# XXX: God, have mercy on our souls...
# see: https://github.com/rust-lang/rust/pull/90132
os.environ['RUSTC_BOOTSTRAP'] = '1'
def do_run(self, args):
subprocess.check_call([*args.command, *args.args])
def do_report(self, args):
if args.all and args.sources:
raise Exception('--all should not be used with sources')
# see man for `llvm-cov show [sources]`
if args.all:
sources = []
elif not args.sources:
sources = ['.']
else:
sources = args.sources
print('* Merging profraw files')
self.llvm.profdata(self.profraw_dir, self.profdata_file)
objects = []
if args.input_objects:
print('* Collecting object files using --input-objects')
with open(args.input_objects) as f:
objects.extend(f.read().splitlines(keepends=False))
if args.cargo_objects == 'true' or (args.cargo_objects == 'auto'
and not args.input_objects):
print('* Collecting object files using cargo')
objects.extend(self.cargo.binaries(args.profile))
params = dict(llvm=self.llvm,
demangler=find_demangler(args.demangler),
profdata=self.profdata_file,
objects=objects,
sources=sources)
formats = {
'html':
lambda: HtmlReport(**params, output_dir=self.report_dir),
'text':
lambda: TextReport(**params),
'lcov':
lambda: LcovReport(**params),
'summary':
lambda: SummaryReport(**params),
'github':
lambda: GithubPagesReport(
**params, output_dir=self.report_dir, commit_url=args.commit_url),
}
report = formats.get(args.format)()
if not report:
raise Exception('Format `{args.format}` is not supported')
print(f'* Rendering coverage report ({args.format})')
report.generate()
if args.open:
print('* Opening the report')
report.open()
def do_clean(self, args):
# Wipe everything if no filters have been provided
if not (args.report or args.prof):
shutil.rmtree(self.top_dir, ignore_errors=True)
else:
if args.report:
shutil.rmtree(self.report_dir, ignore_errors=True)
if args.prof:
self.profdata_file.unlink(missing_ok=True)
def main():
app = sys.argv[0]
example = f"""
prerequisites:
# alternatively, install a system package for `llvm-tools`
rustup component add llvm-tools-preview
self-contained example:
{app} run make
{app} run pipenv run pytest test_runner
{app} run cargo test
{app} report --open
"""
parser = argparse.ArgumentParser(description='Coverage report builder',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=example)
parser.add_argument('--dir', type=Path, help='output directory')
parser.add_argument('--profraw-prefix', metavar='STRING', type=str)
commands = parser.add_subparsers(title='commands', dest='subparser_name')
p_run = commands.add_parser('run', help='run a command with magic env')
p_run.add_argument('command', nargs=1)
p_run.add_argument('args', nargs=argparse.REMAINDER)
p_report = commands.add_parser('report', help='generate a coverage report')
p_report.add_argument('--profile',
default='debug',
choices=('debug', 'release'),
help='cargo build profile')
p_report.add_argument('--format',
default='html',
choices=('html', 'text', 'summary', 'lcov', 'github'),
help='report format')
p_report.add_argument('--input-objects',
metavar='FILE',
type=Path,
help='file containing list of binaries')
p_report.add_argument('--cargo-objects',
default='auto',
choices=('auto', 'true', 'false'),
help='use cargo for auto discovery of binaries')
p_report.add_argument('--commit-url', type=str, help='required for --format=github')
p_report.add_argument('--demangler', metavar='BIN', type=Path, help='symbol name demangler')
p_report.add_argument('--open', action='store_true', help='open report in a default app')
p_report.add_argument('--all', action='store_true', help='show everything, e.g. deps')
p_report.add_argument('sources', nargs='*', type=Path, help='source file or directory')
p_clean = commands.add_parser('clean', help='wipe coverage artifacts')
p_clean.add_argument('--report', action='store_true', help='pick generated report')
p_clean.add_argument('--prof', action='store_true', help='pick *.profdata & *.profraw')
args = parser.parse_args()
state = State(cwd=Path.cwd(), top_dir=args.dir, profraw_prefix=args.profraw_prefix)
commands = {
'run': state.do_run,
'report': state.do_report,
'clean': state.do_clean,
}
action = commands.get(args.subparser_name)
if action:
action(args)
else:
parser.print_help()
if __name__ == '__main__':
main()

136
scripts/git-upload Executable file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
from contextlib import contextmanager
from tempfile import TemporaryDirectory
from pathlib import Path
import argparse
import os
import shutil
import subprocess
import sys
def absolute_path(path):
return Path(path).resolve()
def relative_path(path):
path = Path(path)
if path.is_absolute():
raise Exception(f'path `{path}` must be relative!')
return path
@contextmanager
def chdir(cwd: Path):
old = os.getcwd()
os.chdir(cwd)
try:
yield cwd
finally:
os.chdir(old)
def run(cmd, *args, **kwargs):
print('$', ' '.join(cmd))
subprocess.check_call(cmd, *args, **kwargs)
class GitRepo:
def __init__(self, url):
self.url = url
self.cwd = TemporaryDirectory()
subprocess.check_call([
'git',
'clone',
str(url),
self.cwd.name,
])
def is_dirty(self):
res = subprocess.check_output(['git', 'status', '--porcelain'], text=True).strip()
return bool(res)
def update(self, message, action, branch=None):
with chdir(self.cwd.name):
if not branch:
cmd = ['git', 'branch', '--show-current']
branch = subprocess.check_output(cmd, text=True).strip()
# Run action in repo's directory
action()
run(['git', 'add', '.'])
if not self.is_dirty():
print('No changes detected, quitting')
return
run([
'git',
'-c',
'user.name=vipvap',
'-c',
'user.email=vipvap@zenith.tech',
'commit',
'--author="vipvap <vipvap@zenith.tech>"',
f'--message={message}',
])
for _ in range(5):
try:
run(['git', 'fetch', 'origin', branch])
run(['git', 'rebase', f'origin/{branch}'])
run(['git', 'push', 'origin', branch])
return
except subprocess.CalledProcessError as e:
print(f'failed to update branch `{branch}`: {e}', file=sys.stderr)
raise Exception(f'failed to update branch `{branch}`')
def do_copy(args):
src = args.src
dst = args.dst
try:
if src.is_dir():
shutil.copytree(src, dst)
else:
shutil.copy(src, dst)
except FileExistsError:
if args.forbid_overwrite:
raise
def main():
parser = argparse.ArgumentParser(description='Git upload tool')
parser.add_argument('--repo', type=str, metavar='URL', required=True, help='git repo url')
parser.add_argument('--message', type=str, metavar='TEXT', help='commit message')
commands = parser.add_subparsers(title='commands', dest='subparser_name')
p_copy = commands.add_parser('copy', help='copy file into the repo')
p_copy.add_argument('src', type=absolute_path, help='source path')
p_copy.add_argument('dst', type=relative_path, help='relative dest path')
p_copy.add_argument('--forbid-overwrite', action='store_true', help='do not allow overwrites')
args = parser.parse_args()
commands = {
'copy': do_copy,
}
action = commands.get(args.subparser_name)
if action:
message = args.message or 'update'
GitRepo(args.repo).update(message, lambda: action(args))
else:
parser.print_usage()
if __name__ == '__main__':
main()

View File

@@ -392,6 +392,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin):
"lm_prefix": "prefix",
"lm_message": "message",
"set_commit_lsn": True,
"send_proposer_elected": True,
"term": 2,
"begin_lsn": begin_lsn,
"epoch_start_lsn": epoch_start_lsn,

View File

@@ -509,6 +509,12 @@ sync = false # Disable fsyncs to make the tests go faster
env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
# Pass coverage settings
var = 'LLVM_PROFILE_FILE'
val = os.environ.get(var)
if val:
env_vars[var] = val
# Intercept CalledProcessError and print more info
try:
res = subprocess.run(args,

View File

@@ -28,6 +28,7 @@ anyhow = "1.0"
crc32c = "0.6.0"
humantime = "2.1.0"
walkdir = "2"
signal-hook = "0.3.10"
serde = { version = "1.0", features = ["derive"] }
hex = "0.4.3"
const_format = "0.2.21"

View File

@@ -8,14 +8,15 @@ use daemonize::Daemonize;
use log::*;
use std::path::{Path, PathBuf};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::SafeKeeperConf;
use zenith_utils::http::endpoint;
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
@@ -131,6 +132,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
e
})?;
// XXX: Don't spawn any threads before daemonizing!
if conf.daemonize {
info!("daemonizing...");
@@ -145,51 +147,59 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.stdout(stdout)
.stderr(stderr);
match daemonize.start() {
// XXX: The parent process should exit abruptly right after
// it has spawned a child to prevent coverage machinery from
// dumping stats into a `profraw` file now owned by the child.
// Otherwise, the coverage data will be damaged.
match daemonize.exit_action(|| exit_now(0)).start() {
Ok(_) => info!("Success, daemonized"),
Err(e) => error!("Error, {}", e),
}
}
let mut threads = Vec::new();
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let conf_cloned = conf.clone();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
// TODO authentication
let router = http::make_router(conf_cloned);
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();
threads.push(http_endpoint_thread);
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
// TODO authentication
let router = http::make_router(conf_);
endpoint::serve_thread_main(router, http_listener).unwrap();
})?,
);
if conf.ttl.is_some() {
let s3_conf = conf.clone();
let s3_offload_thread = thread::Builder::new()
.name("S3 offload thread".into())
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("S3 offload thread".into())
.spawn(|| {
s3_offload::thread_main(conf_);
})?,
);
}
threads.push(
thread::Builder::new()
.name("WAL acceptor thread".into())
.spawn(|| {
// thread code
s3_offload::thread_main(s3_conf);
})
.unwrap();
threads.push(s3_offload_thread);
}
let thread_result = wal_service::thread_main(conf, pg_listener);
if let Err(e) = thread_result {
info!("wal_service thread terminated: {}", e);
}
})?,
);
let wal_acceptor_thread = thread::Builder::new()
.name("WAL acceptor thread".into())
.spawn(|| {
// thread code
let thread_result = wal_service::thread_main(conf, pg_listener);
if let Err(e) = thread_result {
info!("wal_service thread terminated: {}", e);
}
})
.unwrap();
threads.push(wal_acceptor_thread);
for t in threads {
t.join().unwrap()
}
Ok(())
// NOTE: we still have to handle signals like SIGQUIT to prevent coredumps
signals.handle(|signal| {
// TODO: implement graceful shutdown with joining threads etc
info!(
"Got {}. Terminating in immediate shutdown mode",
signal.name()
);
std::process::exit(111);
})
}

View File

@@ -7,7 +7,8 @@ use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use crate::safekeeper::AcceptorState;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::CreateControlFile;
use crate::timeline::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -29,6 +30,7 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref()
}
/// Serialize through Display trait.
fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
@@ -37,6 +39,14 @@ where
s.serialize_str(&format!("{}", z))
}
/// Augment AcceptorState with epoch for convenience
#[derive(Debug, Serialize)]
struct AcceptorStateStatus {
term: Term,
epoch: Term,
term_history: TermHistory,
}
/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize)]
struct TimelineStatus {
@@ -44,7 +54,7 @@ struct TimelineStatus {
tenant_id: ZTenantId,
#[serde(serialize_with = "display_serialize")]
timeline_id: ZTimelineId,
acceptor_state: AcceptorState,
acceptor_state: AcceptorStateStatus,
#[serde(serialize_with = "display_serialize")]
commit_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
@@ -68,10 +78,16 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
let sk_state = tli.get_info();
let flush_lsn = tli.get_end_of_wal();
let acc_state = AcceptorStateStatus {
term: sk_state.acceptor_state.term,
epoch: sk_state.acceptor_state.get_epoch(flush_lsn),
term_history: sk_state.acceptor_state.term_history,
};
let status = TimelineStatus {
tenant_id,
timeline_id,
acceptor_state: sk_state.acceptor_state,
acceptor_state: acc_state,
commit_lsn: sk_state.commit_lsn,
truncate_lsn: sk_state.truncate_lsn,
flush_lsn,

View File

@@ -14,9 +14,9 @@ use serde::{Deserialize, Serialize};
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerGreeting,
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
};
use crate::safekeeper::{SafeKeeperState, Term};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::send_wal::SendWalHandler;
use crate::timeline::TimelineTools;
use postgres_ffi::pg_constants;
@@ -35,6 +35,9 @@ struct AppendLogicalMessage {
// if true, commit_lsn will match flush_lsn after append
set_commit_lsn: bool,
// if true, ProposerElected will be sent before append
send_proposer_elected: bool,
// fields from AppendRequestHeader
term: Term,
epoch_start_lsn: Lsn,
@@ -70,6 +73,11 @@ pub fn handle_json_ctrl(
// need to init safekeeper state before AppendRequest
prepare_safekeeper(swh)?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?;
}
let inserted_wal = append_logical_message(swh, append_request)?;
let response = AppendResult {
state: swh.timeline.get().get_info(),
@@ -104,11 +112,29 @@ fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
let response = swh.timeline.get().process_msg(&greeting_request)?;
match response {
AcceptorProposerMessage::Greeting(_) => Ok(()),
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
_ => anyhow::bail!("not GreetingResponse"),
}
}
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> {
// add new term to existing history
let history = swh.timeline.get().get_info().acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
term,
start_streaming_at: lsn,
term_history: history,
});
swh.timeline.get().process_msg(&proposer_elected_request)?;
Ok(())
}
#[derive(Serialize, Deserialize)]
struct InsertedWAL {
begin_lsn: Lsn,
@@ -150,7 +176,7 @@ fn append_logical_message(
let response = swh.timeline.get().process_msg(&append_request)?;
let append_response = match response {
AcceptorProposerMessage::AppendResponse(resp) => resp,
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
_ => anyhow::bail!("not AppendResponse"),
};

View File

@@ -4,6 +4,7 @@
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use bytes::BytesMut;
use log::*;
use postgres::{Client, Config, NoTls};
@@ -98,7 +99,7 @@ impl<'pg> ReceiveWalConn<'pg> {
// Send message to the postgres
fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> {
let mut buf = Vec::new();
let mut buf = BytesMut::with_capacity(128);
msg.serialize(&mut buf)?;
self.pg_backend.write_message(&BeMessage::CopyData(&buf))?;
Ok(())
@@ -147,7 +148,9 @@ impl<'pg> ReceiveWalConn<'pg> {
.get()
.process_msg(&msg)
.with_context(|| "failed to process ProposerAcceptorMessage")?;
self.write_msg(&reply)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
msg = self.read_msg()?;
}
}

View File

@@ -55,9 +55,9 @@ impl HotStandbyFeedback {
/// Standby status update
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // disk consistent lSN
pub flush_lsn: Lsn, // LSN committedby quorum
pub apply_lsn: Lsn, // not used
pub write_lsn: Lsn, // not used
pub flush_lsn: Lsn, // not used
pub apply_lsn: Lsn, // pageserver's disk consistent lSN
pub reply_ts: TimestampTz,
pub reply_requested: bool,
}
@@ -115,7 +115,7 @@ impl ReplicationConn {
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
.context("failed to deserialize StandbyReply")?;
state.disk_consistent_lsn = reply.write_lsn;
state.disk_consistent_lsn = reply.apply_lsn;
timeline.update_replica_state(replica, Some(state));
}
_ => warn!("unexpected message {:?}", msg),

View File

@@ -4,16 +4,16 @@ use anyhow::Context;
use anyhow::{anyhow, bail, Result};
use byteorder::LittleEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use log::*;
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::TimeLineID;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
use std::io;
use std::fmt;
use std::io::Read;
use lazy_static::lazy_static;
@@ -37,6 +37,70 @@ const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp.
pub type Term = u64;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchEntry {
pub term: Term,
pub lsn: Lsn,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct TermHistory(pub Vec<TermSwitchEntry>);
impl TermHistory {
pub fn empty() -> TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
pub fn from_bytes(mut bytes: Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
let n_entries = bytes.get_u32_le();
let mut res = Vec::with_capacity(n_entries as usize);
for _ in 0..n_entries {
if bytes.remaining() < 16 {
bail!("TermHistory is incomplete");
}
res.push(TermSwitchEntry {
term: bytes.get_u64_le(),
lsn: bytes.get_u64_le().into(),
})
}
Ok(TermHistory(res))
}
/// Return copy of self with switches happening strictly after up_to
/// truncated.
pub fn up_to(&self, up_to: Lsn) -> TermHistory {
let mut res = Vec::with_capacity(self.0.len());
for e in &self.0 {
if e.lsn > up_to {
break;
}
res.push(*e);
}
TermHistory(res)
}
}
/// Display only latest entries for Debug.
impl fmt::Debug for TermHistory {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let n_printed = 20;
write!(
fmt,
"{}{:?}",
if self.0.len() > n_printed { "... " } else { "" },
self.0
.iter()
.rev()
.take(n_printed)
.map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
.collect::<Vec<_>>()
)
}
}
/// Unique id of proposer. Not needed for correctness, used for monitoring.
type PgUuid = [u8; 16];
@@ -45,8 +109,21 @@ type PgUuid = [u8; 16];
pub struct AcceptorState {
/// acceptor's last term it voted for (advanced in 1 phase)
pub term: Term,
/// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
pub epoch: Term,
/// History of term switches for safekeeper's WAL.
/// Actually it often goes *beyond* WAL contents as we adopt term history
/// from the proposer before recovery.
pub term_history: TermHistory,
}
impl AcceptorState {
/// acceptor's epoch is the term of the highest entry in the log
pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
let th = self.term_history.up_to(flush_lsn);
match th.0.last() {
Some(e) => e.term,
None => 0,
}
}
}
/// Information about Postgres. Safekeeper gets it once and then verifies
@@ -91,7 +168,10 @@ impl SafeKeeperState {
SafeKeeperState {
magic: SK_MAGIC,
format_version: SK_FORMAT_VERSION,
acceptor_state: AcceptorState { term: 0, epoch: 0 },
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
},
server: ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
@@ -147,16 +227,28 @@ pub struct VoteRequest {
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
term: Term, // not really needed, just a sanity check
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
/// Safekeeper's log position, to let proposer choose the most advanced one
epoch: Term,
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
flush_lsn: Lsn,
truncate_lsn: Lsn,
term_history: TermHistory,
}
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
* term history to it.
*/
#[derive(Debug)]
pub struct ProposerElected {
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn.
/// communicates commit_lsn.
#[derive(Debug)]
pub struct AppendRequest {
pub h: AppendRequestHeader,
@@ -164,6 +256,7 @@ pub struct AppendRequest {
}
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
// LSN since the proposer appends WAL; determines epoch switch point.
pub epoch_start_lsn: Lsn,
@@ -185,7 +278,6 @@ pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
pub term: Term,
pub epoch: Term,
// NOTE: this is physical end of wal on safekeeper; currently it doesn't
// make much sense without taking epoch into account, as history can be
// diverged.
@@ -198,19 +290,32 @@ pub struct AppendResponse {
pub hs_feedback: HotStandbyFeedback,
}
impl AppendResponse {
fn term_only(term: Term) -> AppendResponse {
AppendResponse {
term,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// Proposer -> Acceptor messages
#[derive(Debug)]
pub enum ProposerAcceptorMessage {
Greeting(ProposerGreeting),
VoteRequest(VoteRequest),
Elected(ProposerElected),
AppendRequest(AppendRequest),
}
impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg: Bytes) -> Result<ProposerAcceptorMessage> {
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg.reader();
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
@@ -222,6 +327,21 @@ impl ProposerAcceptorMessage {
let msg = VoteRequest::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::VoteRequest(msg))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(msg_bytes)?;
let msg = ProposerElected {
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
// read header followed by wal data
let hdr = AppendRequestHeader::des_from(&mut stream)?;
@@ -259,19 +379,33 @@ pub enum AcceptorProposerMessage {
impl AcceptorProposerMessage {
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> {
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
match self {
AcceptorProposerMessage::Greeting(msg) => {
stream.write_u64::<LittleEndian>('g' as u64)?;
msg.ser_into(stream)?;
buf.put_u64_le('g' as u64);
buf.put_u64_le(msg.term);
}
AcceptorProposerMessage::VoteResponse(msg) => {
stream.write_u64::<LittleEndian>('v' as u64)?;
msg.ser_into(stream)?;
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
}
AcceptorProposerMessage::AppendResponse(msg) => {
stream.write_u64::<LittleEndian>('a' as u64)?;
msg.ser_into(stream)?;
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_u64_le(msg.disk_consistent_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
}
}
@@ -284,6 +418,8 @@ pub trait Storage {
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>;
/// Write piece of wal in buf to disk and sync it.
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
// Truncate WAL at specified LSN
fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>;
}
lazy_static! {
@@ -357,8 +493,7 @@ pub struct SafeKeeper<ST: Storage> {
pub commit_lsn: Lsn,
pub truncate_lsn: Lsn,
pub storage: ST,
pub s: SafeKeeperState, // persistent part
pub elected_proposer_term: Term, // for monitoring/debugging
pub s: SafeKeeperState, // persistent part
decoder: WalStreamDecoder,
}
@@ -375,27 +510,40 @@ where
truncate_lsn: state.truncate_lsn,
storage,
s: state,
elected_proposer_term: 0,
decoder: WalStreamDecoder::new(Lsn(0)),
}
}
/// Get history of term switches for the available WAL
fn get_term_history(&self) -> TermHistory {
self.s.acceptor_state.term_history.up_to(self.flush_lsn)
}
#[cfg(test)]
fn get_epoch(&self) -> Term {
self.s.acceptor_state.get_epoch(self.flush_lsn)
}
/// Process message from proposer and possibly form reply. Concurrent
/// callers must exclude each other.
pub fn process_msg(
&mut self,
msg: &ProposerAcceptorMessage,
) -> Result<AcceptorProposerMessage> {
) -> Result<Option<AcceptorProposerMessage>> {
match msg {
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg),
}
}
/// Handle initial message from proposer: check its sanity and send my
/// current term.
fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result<AcceptorProposerMessage> {
fn handle_greeting(
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
/* Check protocol compatibility */
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
@@ -429,64 +577,106 @@ where
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
);
Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting {
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.s.acceptor_state.term,
}))
})))
}
/// Give vote for the given term, if we haven't done that previously.
fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result<AcceptorProposerMessage> {
fn handle_vote_request(
&mut self,
msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> {
// initialize with refusal
let mut resp = VoteResponse {
term: msg.term,
term: self.s.acceptor_state.term,
vote_given: false as u64,
epoch: 0,
flush_lsn: Lsn(0),
truncate_lsn: Lsn(0),
flush_lsn: self.flush_lsn,
truncate_lsn: self.s.truncate_lsn,
term_history: self.get_term_history(),
};
if self.s.acceptor_state.term < msg.term {
self.s.acceptor_state.term = msg.term;
// persist vote before sending it out
self.storage.persist(&self.s, true)?;
resp.term = self.s.acceptor_state.term;
resp.vote_given = true as u64;
resp.epoch = self.s.acceptor_state.epoch;
resp.flush_lsn = self.flush_lsn;
resp.truncate_lsn = self.s.truncate_lsn;
}
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
Ok(AcceptorProposerMessage::VoteResponse(resp))
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.s.acceptor_state.term < term {
self.s.acceptor_state.term = term;
self.storage.persist(&self.s, true)?;
}
Ok(())
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
AppendResponse {
term: self.s.acceptor_state.term,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
}
}
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
self.bump_if_higher(msg.term)?;
// If our term is higher, ignore the message (next feedback will inform the compute)
if self.s.acceptor_state.term > msg.term {
return Ok(None);
}
// TODO: cross check divergence point
// streaming must not create a hole
assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at);
// truncate obsolete part of WAL
if self.flush_lsn != Lsn(0) {
self.storage
.truncate_wal(&self.s.server, msg.start_streaming_at)?;
}
// update our end of WAL pointer
self.flush_lsn = msg.start_streaming_at;
// and now adopt term history from proposer
self.s.acceptor_state.term_history = msg.term_history.clone();
self.storage.persist(&self.s, true)?;
info!("start receiving WAL since {:?}", msg.start_streaming_at);
Ok(None)
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
fn handle_append_request(&mut self, msg: &AppendRequest) -> Result<AcceptorProposerMessage> {
// log first AppendRequest from this proposer
if self.elected_proposer_term < msg.h.term {
info!(
"start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}",
self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn,
);
self.elected_proposer_term = msg.h.term;
fn handle_append_request(
&mut self,
msg: &AppendRequest,
) -> Result<Option<AcceptorProposerMessage>> {
if self.s.acceptor_state.term < msg.h.term {
bail!("got AppendRequest before ProposerElected");
}
// If our term is lower than elected proposer one, bump it.
if self.s.acceptor_state.term < msg.h.term {
self.s.acceptor_state.term = msg.h.term;
self.storage.persist(&self.s, true)?;
}
// OTOH, if it is higher, immediately refuse the message.
else if self.s.acceptor_state.term > msg.h.term {
let resp = AppendResponse {
term: self.s.acceptor_state.term,
epoch: self.s.acceptor_state.epoch,
commit_lsn: Lsn(0),
flush_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
};
return Ok(AcceptorProposerMessage::AppendResponse(resp));
// If our term is higher, immediately refuse the message.
if self.s.acceptor_state.term > msg.h.term {
let resp = AppendResponse::term_only(self.s.acceptor_state.term);
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
// After ProposerElected, which performs truncation, we should get only
// indeed append requests (but flush_lsn is advanced only on record
// boundary, so might be less).
assert!(self.flush_lsn <= msg.h.begin_lsn);
self.s.proposer_uuid = msg.h.proposer_uuid;
let mut sync_control_file = false;
@@ -530,48 +720,21 @@ where
}
}
/*
* Epoch switch happen when written WAL record cross the boundary.
* The boundary is maximum of last WAL position at this node (FlushLSN) and global
* maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data.
* XXX: this is wrong, we must actively truncate not matching part of log.
*
* The non-strict inequality is important for us, as proposer in --sync mode doesn't
* generate new records, but to advance commit_lsn epoch switch must happen on majority.
* We can regard this as commit of empty entry in new epoch, this should be safe.
*/
if self.s.acceptor_state.epoch < msg.h.term
&& msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn)
{
info!(
"switched to new epoch {} on receival of request end_lsn={:?}, len={:?}",
msg.h.term,
msg.h.end_lsn,
msg.wal_data.len(),
);
self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */
sync_control_file = true;
}
if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn;
self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64);
}
// Advance commit_lsn taking into account what we have locally. xxx this
// is wrapped into epoch check because we overwrite wal instead of
// truncating it, so without it commit_lsn might include wrong part.
// Anyway, nobody is much interested in our commit_lsn while epoch
// switch hasn't happened, right?
//
// Advance commit_lsn taking into account what we have locally.
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case.
if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) {
if msg.h.commit_lsn != Lsn(0) {
let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn);
// If new commit_lsn reached epoch switch, force sync of control file:
// walproposer in sync mode is very interested when this happens.
sync_control_file |=
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
sync_control_file |= commit_lsn == msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.commit_lsn
@@ -592,15 +755,7 @@ where
}
self.storage.persist(&self.s, sync_control_file)?;
let resp = AppendResponse {
term: self.s.acceptor_state.term,
epoch: self.s.acceptor_state.epoch,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by caller code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
};
let resp = self.append_response();
info!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}",
msg.wal_data.len(),
@@ -609,7 +764,7 @@ where
msg.h.truncate_lsn,
&resp,
);
Ok(AcceptorProposerMessage::AppendResponse(resp))
Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
}
}
@@ -631,6 +786,10 @@ mod tests {
fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> {
Ok(())
}
fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> {
Ok(())
}
}
#[test]
@@ -644,7 +803,7 @@ mod tests {
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let mut vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
r => panic!("unexpected response: {:?}", r),
}
@@ -658,7 +817,7 @@ mod tests {
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
r => panic!("unexpected response: {:?}", r),
}
}
@@ -684,10 +843,21 @@ mod tests {
wal_data: Bytes::from_static(b"b"),
};
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermSwitchEntry {
term: 1,
lsn: Lsn(3),
}]),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.unwrap();
// check that AppendRequest before epochStartLsn doesn't switch epoch
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok());
assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0);
assert_eq!(sk.get_epoch(), 0);
// but record at epochStartLsn does the switch
ar_hdr.begin_lsn = Lsn(2);
@@ -698,6 +868,7 @@ mod tests {
};
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok());
assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1);
sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %)
assert_eq!(sk.get_epoch(), 1);
}
}

View File

@@ -317,8 +317,11 @@ impl Timeline {
}
/// Pass arrived message to the safekeeper.
pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result<AcceptorProposerMessage> {
let mut rmsg: AcceptorProposerMessage;
pub fn process_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
@@ -328,7 +331,7 @@ impl Timeline {
commit_lsn = shared_state.sk.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
resp.disk_consistent_lsn = state.disk_consistent_lsn;
@@ -596,6 +599,82 @@ impl Storage for FileStorage {
}
Ok(())
}
fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> {
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.ztli;
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
/* Open file */
let mut segno = end_pos.segment_number(wal_seg_size);
// note: we basically don't support changing pg timeline
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else {
wal_file = OpenOptions::new()
.write(true)
.open(&wal_file_partial_path)?;
partial = true;
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
while xlogoff < wal_seg_size {
let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff);
wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?;
xlogoff += bytes_to_write;
}
// Flush file, if not said otherwise
if !self.conf.no_sync {
wal_file.sync_all()?;
}
}
if !partial {
// Make segment partial once again
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
loop {
segno += 1;
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone() + ".partial");
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
} else if wal_file_partial_path.exists() {
fs::remove_file(&wal_file_partial_path)?;
} else {
break;
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -20,18 +20,17 @@ tokio = "1.11"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
nix = "0.23.0"
zenith_metrics = { path = "../zenith_metrics" }
workspace_hack = { path = "../workspace_hack" }
signal-hook = "0.3.10"
rand = "0.8.3"
jsonwebtoken = "7"
hex = { version = "0.4.3", features = ["serde"] }
rustls = "0.19.1"
rustls-split = "0.2.1"
git-version = "0.3.5"
zenith_metrics = { path = "../zenith_metrics" }
workspace_hack = { path = "../workspace_hack" }
[dev-dependencies]
hex-literal = "0.3"
bytes = "1.0"

View File

@@ -153,7 +153,7 @@ pub fn check_permission(req: &Request<Body>, tenantid: Option<ZTenantId>) -> Res
}
}
// Send shutdown signal
/// Initiate graceful shutdown of the http endpoint
pub fn shutdown() {
if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() {
let _ = tx.send(());

View File

@@ -40,6 +40,7 @@ pub mod logging;
// Misc
pub mod accum;
pub mod shutdown;
// Utility for binding TcpListeners with proper socket options.
pub mod tcp_listener;
@@ -47,6 +48,9 @@ pub mod tcp_listener;
// Utility for putting a raw file descriptor into non-blocking mode
pub mod nonblock;
// Default signal handling
pub mod signals;
// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
//
// we have several cases:
@@ -72,5 +76,6 @@ pub mod nonblock;
use git_version::git_version;
pub const GIT_VERSION: &str = git_version!(
prefix = "git:",
fallback = concat!("git-env:", env!("GIT_VERSION"))
fallback = concat!("git-env:", env!("GIT_VERSION")),
args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha
);

View File

@@ -0,0 +1,6 @@
/// Immediately terminate the calling process without calling
/// atexit callbacks, C runtime destructors etc. We mainly use
/// this to protect coverage data from concurrent writes.
pub fn exit_now(code: u8) {
unsafe { nix::libc::_exit(code as _) };
}

View File

@@ -0,0 +1,59 @@
use signal_hook::flag;
use signal_hook::iterator::Signals;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
pub use signal_hook::consts::{signal::*, TERM_SIGNALS};
pub fn install_shutdown_handlers() -> anyhow::Result<ShutdownSignals> {
let term_now = Arc::new(AtomicBool::new(false));
for sig in TERM_SIGNALS {
// When terminated by a second term signal, exit with exit code 1.
// This will do nothing the first time (because term_now is false).
flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?;
// But this will "arm" the above for the second time, by setting it to true.
// The order of registering these is important, if you put this one first, it will
// first arm and then terminate all in the first round.
flag::register(*sig, Arc::clone(&term_now))?;
}
Ok(ShutdownSignals)
}
pub enum Signal {
Quit,
Interrupt,
Terminate,
}
impl Signal {
pub fn name(&self) -> &'static str {
match self {
Signal::Quit => "SIGQUIT",
Signal::Interrupt => "SIGINT",
Signal::Terminate => "SIGTERM",
}
}
}
pub struct ShutdownSignals;
impl ShutdownSignals {
pub fn handle(
self,
mut handler: impl FnMut(Signal) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
for raw_signal in Signals::new(TERM_SIGNALS)?.into_iter() {
let signal = match raw_signal {
SIGINT => Signal::Interrupt,
SIGTERM => Signal::Terminate,
SIGQUIT => Signal::Quit,
other => panic!("unknown signal: {}", other),
};
handler(signal)?;
}
Ok(())
}
}