Compare commits

..

31 Commits

Author SHA1 Message Date
Bojan Serafimov
7342088fcc Add todo 2022-01-18 16:39:04 -05:00
Bojan Serafimov
2e4307cfe5 Merge branch 'main' into proxy-health-check 2022-01-18 15:31:16 -05:00
Bojan Serafimov
fceb428878 Fix logic 2022-01-18 15:30:33 -05:00
Bojan Serafimov
1e717b5414 Don't error on health check 2022-01-18 15:15:38 -05:00
Andrey Taranik
9d6ae06663 monitoring turn on for proxy (#1146) 2022-01-18 19:23:53 +03:00
Alexey Kondratov
06c28174c2 Integrate compute_tools into zenith workspace and improve logging (zenithdb/console#487) 2022-01-18 18:47:31 +03:00
bojanserafimov
8af1b43074 proxy: Add new metrics (#1132) 2022-01-14 19:12:43 -05:00
Heikki Linnakangas
17b7caddcb Update vendor/postgres: silence excessive logging from walproposer. 2022-01-14 20:51:02 +02:00
Heikki Linnakangas
dab30c27b6 Refactor thread management and shutdown
This introduces a new module to handle thread creation and shutdown.
All page server threads are now registered in a global hash map, and
there's a function to request individual threads to shut down gracefully.

Thread shutdown request is signalled to the thread with a flag, as well
as a Future that can be used to wake up async operations if shutdown is
requested. Use that facility to have the libpq listener thread respond
to pageserver shutdown, based on Kirill's earlier prototype
(https://github.com/zenithdb/zenith/pull/1088). That addresses
https://github.com/zenithdb/zenith/issues/1036, previously the libpq
listener thread would not exit until one more connection arrives.

This also eliminates a resource leak in the accept() loop. Previously,
we added the JoinHanlde of each new thread to a vector but old handles
for threads that had already exited were never removed.
2022-01-14 18:36:10 +02:00
Heikki Linnakangas
bad1dd9759 Don't panic if spawning a new WAL receiver thread fails.
The panic would kill the page service thread. That's not too bad, but
still let's try to handle it more gracefully.
2022-01-14 18:02:34 +02:00
Heikki Linnakangas
d29836d0d5 Don't panic if spawning a thread to handle a connection fails.
Log the error and continue. Hopefully it's a transient failure.

This might have been happening in staging earlier, when the safekeeper
had a problem where it opened connections very frequently to issue
"callmemaybe" commands. If you launch too many threads too fast, you might
run out of file descriptors or something. It's not totally clear what
happened, but with commit, at least the page server will continue to run
and accept new connections, if a transient error happens.
2022-01-14 18:02:30 +02:00
Heikki Linnakangas
adb0b3dada Include backtrace in error messages in the log.
'anyhow' crate can include a backtrace in all errors, when the
'backtrace' feature is enabled. Enable it, and change the places that used
'{:#}' or '{}' to '{:?}', so that the backtrace is printed.
2022-01-14 10:10:17 +02:00
bojanserafimov
5e0f39cc9e Add proxy metrics (#1093) 2022-01-13 20:34:30 -05:00
Arthur Petukhovsky
0a34a592d5 Bump vendor/postgres (#1120) 2022-01-13 20:28:37 +03:00
Heikki Linnakangas
19aaa91f6d Timeline IDs are not globally unique, fix some code that assumed that.
A timeline ID is only guaranteed to be unique for a particular tenant,
so you need to use tenant ID + timeline ID as the key, rather than just
timeline ID.

The safekeeper currently makes the same assumption, and we should fix that
too, but this commit just addresses this one case in the page server.

In the passing, reorder some function arguments to be more consistent.
2022-01-13 18:45:30 +02:00
Konstantin Knizhnik
404aab9373 Use mutex to prevent concurrent checkpoints (#1115)
* Use mutex to prevent concurrent checkpoints

* Fix comment
2022-01-13 17:48:24 +03:00
Konstantin Knizhnik
bc6db2c10e Implement IO metrics in VirtualFile (#1112)
* Implement IO metrics in VirtualFile

* Do not group virtual file close statistics by tenantid/timelineid

* Add comments concenring close metrics
2022-01-13 17:36:53 +03:00
Heikki Linnakangas
772d853dcf Fix race condition leading to panic in walkeeper.
The walkeeper launch two threads for each connection, and uses a guard
object to remove entry from 'replicas' array, when finishes. But only
the background thread held onto the guard object, so if the background
thread finished before the other thread, the array entry would be
removed prematurely, which lead to panic in the check_stop_streaming()
call.

Fixes https://github.com/zenithdb/zenith/issues/1103
2022-01-13 11:21:11 +02:00
Arseny Sher
ab4d272149 Add safekeeper --dump-control-file option.
Hexalize zids there for better output; since Serde doesn't support several
formats for one struct, on-disk representation is changed as well, make
upgrade.rs cope with it.
2022-01-12 19:47:24 +03:00
Konstantin Knizhnik
f70a5cad61 Fix releasing of timelines lock (#1100)
refer #1087
2022-01-12 15:05:08 +03:00
anastasia
7aba299dbd Use safekeeper in test_branch_behind (#1068)
to avoid a subtle race condition.

Without safekeeper, walreceiver reconnection can stuck,
because of IO deadlock between walsender auth and regular backend.
2022-01-12 14:38:04 +03:00
Kirill Bulatov
4b3b19f444 Support prefixes when working with s3 buckets 2022-01-11 15:44:50 +02:00
Kirill Bulatov
8ab4c8a050 Code review fixes 2022-01-11 15:44:23 +02:00
Kirill Bulatov
7c4a653230 Propagate Zenith CLI's RUST_LOG env var to subprocesses 2022-01-11 15:44:23 +02:00
Kirill Bulatov
a3cd8f0e6d Add the remote storage test 2022-01-11 15:44:23 +02:00
Kirill Bulatov
65c851a451 Test pageserver's timeline http methods
z
2022-01-11 15:44:23 +02:00
Kirill Bulatov
23cf2fa984 Properly shutdown storage sync loop 2022-01-11 15:44:23 +02:00
Kirill Bulatov
ce8d6ae958 Allow using remote storage in tests 2022-01-11 15:44:23 +02:00
Kirill Bulatov
384b2a91fa Pass generic pageserver params through zenith cli 2022-01-11 15:44:23 +02:00
Arseny Sher
233c4811db Fix default safekeeper http port. 2022-01-11 10:13:27 +03:00
Konstantin Knizhnik
2fd4c390cb Do not hold timelines lock during GC (#1089)
* Do not hold timelines lock during GC
refer #1087

* Add gc_cs mutex for preveting creation of new timelines during GC

* Make clippy happy

* Use Mutex<()> instead of Mutex<i32> for GC critical section
2022-01-10 14:41:15 +03:00
56 changed files with 1606 additions and 1951 deletions

View File

@@ -4,6 +4,7 @@ executors:
zenith-build-executor:
resource_class: xlarge
docker:
# NB: when changed, do not forget to update rust image tag in all Dockerfiles
- image: cimg/rust:1.56.1
zenith-python-executor:
docker:
@@ -443,27 +444,25 @@ jobs:
- checkout
- setup_remote_docker:
docker_layer_caching: true
- run:
name: Login to docker hub
command: echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
- run:
name: Setup buildx
command: docker run -it --rm --privileged tonistiigi/binfmt --install all
# Build zenithdb/compute-tools:latest image and push it to Docker hub
# TODO: this should probably also use versioned tag, not just :latest.
# XXX: but should it? We build and use it only locally now.
- run:
name: Build and push compute-tools Docker image
command: docker buildx build --platform linux/amd64,linux/arm64 --push -t zenithdb/compute-tools:latest compute_tools
command: |
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
docker build -t zenithdb/compute-tools:latest -f Dockerfile.compute-tools .
docker push zenithdb/compute-tools:latest
- run:
name: Init postgres submodule
command: git submodule update --init --depth 1
- run:
name: Build and push compute-node Docker image
command: |
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
DOCKER_TAG=$(git log --oneline|wc -l)
docker buildx build --platform linux/amd64,linux/arm64 --push -t zenithdb/compute-node:latest vendor/postgres
docker buildx build --platform linux/amd64,linux/arm64 --push -t zenithdb/compute-node:${DOCKER_TAG} vendor/postgres
docker build -t zenithdb/compute-node:latest vendor/postgres && docker push zenithdb/compute-node:latest
docker tag zenithdb/compute-node:latest zenithdb/compute-node:${DOCKER_TAG} && docker push zenithdb/compute-node:${DOCKER_TAG}
deploy-staging:
docker:
@@ -573,55 +572,6 @@ jobs:
}
}"
#
#
# compute-tools jobs
# TODO: unify with main build_and_test pipeline
#
#
compute-tools-test:
executor: zenith-build-executor
working_directory: ~/repo/compute_tools
steps:
- checkout:
path: ~/repo
- restore_cache:
name: Restore rust cache
keys:
# Require an exact match. While an out of date cache might speed up the build,
# there's no way to clean out old packages, so the cache grows every time something
# changes.
- v03-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
# Build the rust code, including test binaries
- run:
name: Rust build
environment:
CARGO_INCREMENTAL: 0
command: cargo build --bins --tests
- save_cache:
name: Save rust cache
key: v03-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
paths:
- ~/.cargo/registry
- ~/.cargo/git
- target
# Run Rust formatting checks
- run:
name: cargo fmt check
command: cargo fmt --all -- --check
# Run Rust linter (clippy)
- run:
name: cargo clippy check
command: cargo clippy --all --all-targets -- -Dwarnings -Drust-2018-idioms
# Run Rust integration and unittests
- run: cargo test
workflows:
build_and_test:
jobs:
@@ -670,7 +620,6 @@ workflows:
requires:
# TODO: consider adding more
- other-tests-debug
- compute-tools-test
- docker-image:
# Context gives an ability to login
context: Docker Hub
@@ -690,11 +639,9 @@ workflows:
branches:
only:
- main
- docker-multi-platform
# requires:
# - pg_regress-tests-release
# - other-tests-release
# - compute-tools-test
requires:
- pg_regress-tests-release
- other-tests-release
- deploy-staging:
# Context gives an ability to login
context: Docker Hub

View File

@@ -11,3 +11,10 @@ exposedService:
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: start.stage.zenith.tech
metrics:
enabled: true
serviceMonitor:
enabled: true
selector:
prometheus: zenith

View File

@@ -1,44 +0,0 @@
## Build docker image zenithdb/build:buster for linux/adm64 and linux/arm64 platforms
name: docker-builder
on:
push:
branches:
- 'docker-multi-platform'
schedule:
# * is a special character in YAML so you have to quote this string
# buil daily at 5:30am
- cron: '30 5 * * *'
jobs:
docker-builder-buster:
runs-on: ubuntu-latest
steps:
-
name: Checkout
uses: actions/checkout@v2
with:
submodules: false
-
name: Set up QEMU
uses: docker/setup-qemu-action@v1
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
-
name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
-
name: Build and push zenithdb/build:buster
uses: docker/build-push-action@v2
with:
push: true
file: Dockerfile.build
platforms: linux/amd64,linux/arm64
cache-from: type=registry,ref=zenithdb/build:buster
tags: zenithdb/build:buster

87
Cargo.lock generated
View File

@@ -2,6 +2,21 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.4.7"
@@ -40,6 +55,9 @@ name = "anyhow"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1"
dependencies = [
"backtrace",
]
[[package]]
name = "async-compression"
@@ -149,6 +167,21 @@ dependencies = [
"anyhow",
]
[[package]]
name = "backtrace"
version = "0.3.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "321629d8ba6513061f26707241fa9bc89524ff1cd7a915a97ef0c62c666ce1b6"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.12.3"
@@ -331,6 +364,25 @@ dependencies = [
"memchr",
]
[[package]]
name = "compute_tools"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap",
"env_logger",
"hyper",
"libc",
"log",
"postgres",
"regex",
"serde",
"serde_json",
"tar",
"tokio",
]
[[package]]
name = "const_format"
version = "0.2.22"
@@ -655,6 +707,12 @@ dependencies = [
"wasi",
]
[[package]]
name = "gimli"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
[[package]]
name = "git-version"
version = "0.3.5"
@@ -982,6 +1040,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if",
"serde",
]
[[package]]
@@ -1048,6 +1107,16 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "miniz_oxide"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
dependencies = [
"adler",
"autocfg",
]
[[package]]
name = "mio"
version = "0.7.13"
@@ -1144,6 +1213,15 @@ dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.8.0"
@@ -1424,16 +1502,19 @@ dependencies = [
"bytes",
"clap",
"hex",
"hyper",
"lazy_static",
"md5",
"parking_lot",
"rand",
"reqwest",
"routerify",
"rustls 0.19.1",
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"zenith_metrics",
"zenith_utils",
]
@@ -1650,6 +1731,12 @@ dependencies = [
"url",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc-hash"
version = "1.1.0"

View File

@@ -1,5 +1,6 @@
[workspace]
members = [
"compute_tools",
"control_plane",
"pageserver",
"postgres_ffi",

View File

@@ -2,8 +2,9 @@
# Image with all the required dependencies to build https://github.com/zenithdb/zenith
# and Postgres from https://github.com/zenithdb/postgres
# Also includes some rust development and build tools.
# NB: keep in sync with rust image version in .circle/config.yml
#
FROM rust:slim-buster
FROM rust:1.56.1-slim-buster
WORKDIR /zenith
# Install postgres and zenith build dependencies

14
Dockerfile.compute-tools Normal file
View File

@@ -0,0 +1,14 @@
# First transient image to build compute_tools binaries
# NB: keep in sync with rust image version in .circle/config.yml
FROM rust:1.56.1-slim-buster AS rust-build
WORKDIR /zenith
COPY . .
RUN cargo build -p compute_tools --release
# Final image that only has one binary
FROM debian:buster-slim
COPY --from=rust-build /zenith/target/release/zenith_ctl /usr/local/bin/zenith_ctl

1161
compute_tools/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,9 +6,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[workspace]
# TODO: make it a part of global zenith worksapce
[dependencies]
libc = "0.2"
anyhow = "1.0"
@@ -17,12 +14,9 @@ clap = "2.33"
env_logger = "0.8"
hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] }
postgres = "0.19"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
regex = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_json = "1"
tar = "0.4"
tokio = { version = "1", features = ["full"] }
[profile.release]
debug = true
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread"] }

View File

@@ -1,14 +0,0 @@
# First transient image to build compute_tools binaries
FROM rust:slim-buster AS rust-build
RUN mkdir /compute_tools
WORKDIR /compute_tools
COPY . /compute_tools/
RUN cargo build --release
# Final image that only has one binary
FROM debian:buster-slim
COPY --from=rust-build /compute_tools/target/release/zenith_ctl /usr/local/bin/zenith_ctl

View File

@@ -27,12 +27,12 @@
//! ```
//!
use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::{exit, Command, ExitStatus};
use std::sync::{Arc, RwLock};
use std::{env, panic};
use anyhow::Result;
use anyhow::{Context, Result};
use chrono::Utc;
use libc::{prctl, PR_SET_PDEATHSIG, SIGINT};
use log::info;
@@ -70,7 +70,7 @@ fn prepare_pgdata(state: &Arc<RwLock<ComputeState>>) -> Result<()> {
.expect("tenant id should be provided");
info!(
"applying spec for cluster #{}, operation #{}",
"starting cluster #{}, operation #{}",
spec.cluster.cluster_id,
spec.operation_uuid.as_ref().unwrap()
);
@@ -80,10 +80,23 @@ fn prepare_pgdata(state: &Arc<RwLock<ComputeState>>) -> Result<()> {
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
info!("starting safekeepers syncing");
let lsn = sync_safekeepers(&state.pgdata, &state.pgbin)?;
let lsn = sync_safekeepers(&state.pgdata, &state.pgbin)
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
get_basebackup(&state.pgdata, &pageserver_connstr, &tenant, &timeline, &lsn)?;
info!(
"getting basebackup@{} from pageserver {}",
lsn, pageserver_connstr
);
get_basebackup(&state.pgdata, &pageserver_connstr, &tenant, &timeline, &lsn).with_context(
|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, pageserver_connstr
)
},
)?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
@@ -149,6 +162,9 @@ fn main() -> Result<()> {
// This does not matter much for Docker, where `zenith_ctl` is an entrypoint,
// so the whole container will exit if it exits. But could be useful when
// `zenith_ctl` is used in e.g. systemd.
// XXX: this appears to just don't work. When `main` exits, the child process
// `postgres` is re-assigned to a new parent (`/lib/systemd/systemd --user`
// in my case).
unsafe {
prctl(PR_SET_PDEATHSIG, SIGINT);
}
@@ -156,8 +172,10 @@ fn main() -> Result<()> {
// TODO: re-use `zenith_utils::logging` later
init_logger(DEFAULT_LOG_LEVEL)?;
// Env variable is set by `cargo`
let version: Option<&str> = option_env!("CARGO_PKG_VERSION");
let matches = clap::App::new("zenith_ctl")
.version("0.1.0")
.version(version.unwrap_or("unknown"))
.arg(
clap::Arg::with_name("connstr")
.short("C")
@@ -212,13 +230,7 @@ fn main() -> Result<()> {
let file = File::open(path)?;
serde_json::from_reader(file)?
} else {
// Finally, try to fetch it from the env
// XXX: not tested well and kept as a backup option for k8s, Docker, etc.
// TODO: remove later
match env::var("CLUSTER_SPEC") {
Ok(json) => serde_json::from_str(&json)?,
Err(_) => panic!("cluster spec should be provided via --spec, --spec-path or env variable CLUSTER_SPEC")
}
panic!("cluster spec should be provided via --spec or --spec-path argument");
}
}
};

View File

@@ -87,17 +87,19 @@ pub fn sync_safekeepers(pgdata: &str, pgbin: &str) -> Result<String> {
.args(&["--sync-safekeepers"])
.env("PGDATA", &pgdata) // we cannot use -D in this mode
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("postgres --sync-safekeepers failed to start");
// `postgres --sync-safekeepers` will print all log output to stderr and
// final LSN to stdout. So we pipe only stdout, while stderr will be automatically
// redirected to the caller output.
let sync_output = sync_handle
.wait_with_output()
.expect("postgres --sync-safekeepers failed");
if !sync_output.status.success() {
anyhow::bail!(
"postgres --sync-safekeepers failed: '{}'",
String::from_utf8_lossy(&sync_output.stderr)
"postgres --sync-safekeepers exited with non-zero status: {}",
sync_output.status,
);
}

View File

@@ -9,6 +9,7 @@
use anyhow::{anyhow, bail, Context, Result};
use std::fs;
use std::path::Path;
use std::process::Command;
pub mod compute;
pub mod local_env;
@@ -31,3 +32,19 @@ pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
}
Ok(pid)
}
fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
const RUST_LOG_KEY: &str = "RUST_LOG";
if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) {
cmd.env(RUST_LOG_KEY, rust_log_value)
} else {
cmd
}
}

View File

@@ -17,8 +17,8 @@ use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::read_pidfile;
use crate::storage::PageServerNode;
use crate::{fill_rust_env_vars, read_pidfile};
use zenith_utils::connstring::connection_address;
#[derive(Error, Debug)]
@@ -118,22 +118,17 @@ impl SafekeeperNode {
let listen_http = format!("localhost:{}", self.conf.http_port);
let mut cmd = Command::new(self.env.safekeeper_bin()?);
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize")
.env_clear()
.env("RUST_BACKTRACE", "1");
fill_rust_env_vars(
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize"),
);
if !self.conf.sync {
cmd.arg("--no-sync");
}
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!(
"Safekeeper failed to start. See '{}' for details.",

View File

@@ -19,7 +19,7 @@ use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
use crate::local_env::LocalEnv;
use crate::read_pidfile;
use crate::{fill_rust_env_vars, read_pidfile};
use pageserver::branches::BranchInfo;
use pageserver::tenant_mgr::TenantInfo;
use zenith_utils::connstring::connection_address;
@@ -96,46 +96,49 @@ impl PageServerNode {
.unwrap()
}
pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> {
pub fn init(
&self,
create_tenant: Option<&str>,
config_overrides: &[&str],
) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let mut args = vec![
"--init".to_string(),
"-D".to_string(),
self.env.base_data_dir.display().to_string(),
"-c".to_string(),
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display()),
"-c".to_string(),
format!("auth_type='{}'", self.env.pageserver.auth_type),
"-c".to_string(),
format!(
"listen_http_addr='{}'",
self.env.pageserver.listen_http_addr
),
"-c".to_string(),
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr),
];
let base_data_dir_param = self.env.base_data_dir.display().to_string();
let pg_distrib_dir_param =
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
let listen_http_addr_param = format!(
"listen_http_addr='{}'",
self.env.pageserver.listen_http_addr
);
let listen_pg_addr_param =
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
let mut args = Vec::with_capacity(20);
args.push("--init");
args.extend(["-D", &base_data_dir_param]);
args.extend(["-c", &pg_distrib_dir_param]);
args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]);
for config_override in config_overrides {
args.extend(["-c", config_override]);
}
if self.env.pageserver.auth_type != AuthType::Trust {
args.extend([
"-c".to_string(),
"auth_validation_public_key_path='auth_public_key.pem'".to_string(),
"-c",
"auth_validation_public_key_path='auth_public_key.pem'",
]);
}
if let Some(tenantid) = create_tenant {
args.extend(["--create-tenant".to_string(), tenantid.to_string()])
args.extend(["--create-tenant", tenantid])
}
let status = cmd
.args(args)
.env_clear()
.env("RUST_BACKTRACE", "1")
let status = fill_rust_env_vars(cmd.args(args))
.status()
.expect("pageserver init failed");
@@ -154,7 +157,7 @@ impl PageServerNode {
self.repo_path().join("pageserver.pid")
}
pub fn start(&self) -> anyhow::Result<()> {
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
print!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
@@ -163,16 +166,16 @@ impl PageServerNode {
io::stdout().flush().unwrap();
let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(&["-D", self.repo_path().to_str().unwrap()])
.arg("--daemonize")
.env_clear()
.env("RUST_BACKTRACE", "1");
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
let repo_path = self.repo_path();
let mut args = vec!["-D", repo_path.to_str().unwrap()];
for config_override in config_overrides {
args.extend(["-c", config_override]);
}
fill_rust_env_vars(cmd.args(&args).arg("--daemonize"));
if !cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",

View File

@@ -147,6 +147,10 @@ bucket_name = 'some-sample-bucket'
# Name of the region where the bucket is located at
bucket_region = 'eu-north-1'
# A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
# Optional, pageserver uses entire bucket if the prefix is not specified.
prefix_in_bucket = '/some/prefix/'
# Access key to connect to the bucket ("login" part of the credentials)
access_key_id = 'SOMEKEYAAAAASADSAH*#'

View File

@@ -24,7 +24,7 @@ postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbf
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
tokio-stream = "0.1.8"
routerify = "2"
anyhow = "1.0"
anyhow = { version = "1.0", features = ["backtrace"] }
crc32c = "0.6.0"
thiserror = "1.0"
hex = { version = "0.4.3", features = ["serde"] }

View File

@@ -129,13 +129,13 @@ There are the following implementations present:
* local filesystem — to use in tests mainly
* AWS S3 - to use in production
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs.
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
The backup service is disabled by default and can be enabled to interact with a single remote storage.
CLI examples:
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
@@ -154,6 +154,7 @@ or
[remote_storage]
bucket_name = 'some-sample-bucket'
bucket_region = 'eu-north-1'
prefix_in_bucket = '/test_prefix/'
access_key_id = 'SOMEKEYAAAAASADSAH*#'
secret_access_key = 'SOMEsEcReTsd292v'
```

View File

@@ -1,6 +1,6 @@
//! Main entry point for the Page Server executable.
use std::{env, path::Path, str::FromStr, thread};
use std::{env, path::Path, str::FromStr};
use tracing::*;
use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION};
@@ -12,7 +12,9 @@ use daemonize::Daemonize;
use pageserver::{
branches,
config::{defaults::*, PageServerConf},
http, page_cache, page_service, remote_storage, tenant_mgr, virtual_file, LOG_FILE_NAME,
http, page_cache, page_service, remote_storage, tenant_mgr, thread_mgr,
thread_mgr::ThreadKind,
virtual_file, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
use zenith_utils::postgres_backend;
@@ -53,12 +55,12 @@ fn main() -> Result<()> {
)
// See `settings.md` for more details on the extra configuration patameters pageserver can process
.arg(
Arg::with_name("config-option")
Arg::with_name("config-override")
.short("c")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.help("Additional configuration options or overrides of the ones from the toml config file.
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"),
)
.get_matches();
@@ -105,7 +107,7 @@ fn main() -> Result<()> {
};
// Process any extra options given with -c
if let Some(values) = arg_matches.values_of("config-option") {
if let Some(values) = arg_matches.values_of("config-override") {
for option_line in values {
let doc = toml_edit::Document::from_str(option_line).with_context(|| {
format!(
@@ -169,7 +171,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
);
let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?;
// XXX: Don't spawn any threads before daemonizing!
// NB: Don't spawn any threads before daemonizing!
if daemonize {
info!("daemonizing...");
@@ -195,15 +197,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
}
let signals = signals::install_shutdown_handlers()?;
let mut threads = Vec::new();
let sync_startup = remote_storage::start_local_timeline_sync(conf)
.context("Failed to set up local files sync with external storage")?;
if let Some(handle) = sync_startup.sync_loop_handle {
threads.push(handle);
}
// Initialize tenant manager.
tenant_mgr::set_timeline_states(conf, sync_startup.initial_timeline_states);
@@ -220,25 +216,27 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// Spawn a new thread for the http endpoint
// bind before launching separate thread so the error reported before startup exits
let cloned = auth.clone();
threads.push(
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
let router = http::make_router(conf, cloned);
endpoint::serve_thread_main(router, http_listener)
})?,
);
let auth_cloned = auth.clone();
thread_mgr::spawn(
ThreadKind::HttpEndpointListener,
None,
None,
"http_endpoint_thread",
move || {
let router = http::make_router(conf, auth_cloned);
endpoint::serve_thread_main(router, http_listener, thread_mgr::shutdown_watcher())
},
)?;
// Spawn a thread to listen for connections. It will spawn further threads
// Spawn a thread to listen for libpq connections. It will spawn further threads
// for each connection.
threads.push(
thread::Builder::new()
.name("Page Service thread".into())
.spawn(move || {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?,
);
thread_mgr::spawn(
ThreadKind::LibpqEndpointListener,
None,
None,
"libpq endpoint thread",
move || page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type),
)?;
signals.handle(|signal| match signal {
Signal::Quit => {
@@ -254,20 +252,38 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
postgres_backend::set_pgbackend_shutdown_requested();
tenant_mgr::shutdown_all_tenants()?;
endpoint::shutdown();
for handle in std::mem::take(&mut threads) {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error");
}
info!("Shut down successfully completed");
std::process::exit(0);
shutdown_pageserver();
unreachable!()
}
})
}
fn shutdown_pageserver() {
// Shut down the libpq endpoint thread. This prevents new connections from
// being accepted.
thread_mgr::shutdown_threads(Some(ThreadKind::LibpqEndpointListener), None, None);
// Shut down any page service threads.
postgres_backend::set_pgbackend_shutdown_requested();
thread_mgr::shutdown_threads(Some(ThreadKind::PageRequestHandler), None, None);
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC threads.
tenant_mgr::shutdown_all_tenants();
// Stop syncing with remote storage.
//
// FIXME: Does this wait for the sync thread to finish syncing what's queued up?
// Should it?
thread_mgr::shutdown_threads(Some(ThreadKind::StorageSync), None, None);
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
thread_mgr::shutdown_threads(Some(ThreadKind::HttpEndpointListener), None, None);
// There should be nothing left, but let's be sure
thread_mgr::shutdown_threads(None, None, None);
info!("Shut down successfully completed");
std::process::exit(0);
}

View File

@@ -45,14 +45,16 @@ impl BranchInfo {
repo: &Arc<dyn Repository>,
include_non_incremental_logical_size: bool,
) -> Result<Self> {
let name = path
.as_ref()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string();
let timeline_id = std::fs::read_to_string(path)?.parse::<ZTimelineId>()?;
let path = path.as_ref();
let name = path.file_name().unwrap().to_string_lossy().to_string();
let timeline_id = std::fs::read_to_string(path)
.with_context(|| {
format!(
"Failed to read branch file contents at path '{}'",
path.display()
)
})?
.parse::<ZTimelineId>()?;
let timeline = match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(local_entry) => local_entry,

View File

@@ -135,6 +135,8 @@ pub struct S3Config {
pub bucket_name: String,
/// The region where the bucket is located at.
pub bucket_region: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
pub prefix_in_bucket: Option<String>,
/// "Login" to use when connecting to bucket.
/// Can be empty for cases like AWS k8s IAM
/// where we can allow certain pods to connect
@@ -149,6 +151,7 @@ impl std::fmt::Debug for S3Config {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.finish()
}
}
@@ -332,18 +335,26 @@ impl PageServerConf {
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
}
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
bucket_name: bucket_name.as_str().unwrap().to_string(),
bucket_region: bucket_region.as_str().unwrap().to_string(),
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
access_key_id: toml
.get("access_key_id")
.map(|x| x.as_str().unwrap().to_string()),
.map(|access_key_id| parse_toml_string("access_key_id", access_key_id))
.transpose()?,
secret_access_key: toml
.get("secret_access_key")
.map(|x| x.as_str().unwrap().to_string()),
.map(|secret_access_key| {
parse_toml_string("secret_access_key", secret_access_key)
})
.transpose()?,
prefix_in_bucket: toml
.get("prefix_in_bucket")
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
.transpose()?,
}),
(Some(local_path), None, None) => {
RemoteStorageKind::LocalFs(PathBuf::from(local_path.as_str().unwrap()))
}
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
)),
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
};
@@ -585,6 +596,7 @@ pg_distrib_dir='{}'
let bucket_name = "some-sample-bucket".to_string();
let bucket_region = "eu-north-1".to_string();
let prefix_in_bucket = "test_prefix".to_string();
let access_key_id = "SOMEKEYAAAAASADSAH*#".to_string();
let secret_access_key = "SOMEsEcReTsd292v".to_string();
let max_concurrent_sync = NonZeroUsize::new(111).unwrap();
@@ -597,13 +609,14 @@ max_concurrent_sync = {}
max_sync_errors = {}
bucket_name = '{}'
bucket_region = '{}'
prefix_in_bucket = '{}'
access_key_id = '{}'
secret_access_key = '{}'"#,
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
),
format!(
"remote_storage={{max_concurrent_sync = {}, max_sync_errors = {}, bucket_name='{}', bucket_region='{}', access_key_id='{}', secret_access_key='{}'}}",
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
"remote_storage={{max_concurrent_sync={}, max_sync_errors={}, bucket_name='{}', bucket_region='{}', prefix_in_bucket='{}', access_key_id='{}', secret_access_key='{}'}}",
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
),
];
@@ -637,6 +650,7 @@ pg_distrib_dir='{}'
bucket_region: bucket_region.clone(),
access_key_id: Some(access_key_id.clone()),
secret_access_key: Some(secret_access_key.clone()),
prefix_in_bucket: Some(prefix_in_bucket.clone())
}),
},
"Remote storage config should correctly parse the S3 config"

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use anyhow::{Context, Result};
use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
@@ -190,18 +190,27 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
}
#[derive(Debug, Serialize)]
struct TimelineInfo {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
start_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
#[serde(tag = "type")]
enum TimelineInfo {
Local {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
start_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
},
Remote {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
},
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -215,9 +224,12 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
match repo.get_timeline(timeline_id)?.local_timeline() {
None => bail!("Timeline with id {} is not present locally", timeline_id),
Some(timeline) => Ok::<_, anyhow::Error>(TimelineInfo {
Ok::<_, anyhow::Error>(match repo.get_timeline(timeline_id)?.local_timeline() {
None => TimelineInfo::Remote {
timeline_id,
tenant_id,
},
Some(timeline) => TimelineInfo::Local {
timeline_id,
tenant_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
@@ -226,8 +238,8 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
prev_record_lsn: timeline.get_prev_record_lsn(),
start_lsn: timeline.get_start_lsn(),
timeline_state: repo.get_timeline_state(timeline_id),
}),
}
},
})
})
.await
.map_err(ApiError::from_err)??;

View File

@@ -40,8 +40,8 @@ use crate::repository::{
BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncState,
TimelineWriter, ZenithWalRecord,
};
use crate::tenant_mgr;
use crate::walreceiver;
use crate::thread_mgr;
use crate::virtual_file::VirtualFile;
use crate::walreceiver::IS_WAL_RECEIVER;
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
@@ -127,7 +127,13 @@ pub struct LayeredRepository {
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
// `timelines` mutex during all GC iteration (especially with enforced checkpoint)
// may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: Mutex<()>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
/// Makes every timeline to backup their files to remote storage.
upload_relishes: bool,
@@ -186,6 +192,8 @@ impl Repository for LayeredRepository {
// We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn
// about timelines, so otherwise a race condition is possible, where we create new timeline and GC
// concurrently removes data that is needed by the new timeline.
let _gc_cs = self.gc_cs.lock().unwrap();
let mut timelines = self.timelines.lock().unwrap();
let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
@@ -277,19 +285,7 @@ impl Repository for LayeredRepository {
Ok(())
}
// Wait for all threads to complete and persist repository data before pageserver shutdown.
fn shutdown(&self) -> Result<()> {
trace!("LayeredRepository shutdown for tenant {}", self.tenantid);
let timelines = self.timelines.lock().unwrap();
for (timelineid, timeline) in timelines.iter() {
shutdown_timeline(self.tenantid, *timelineid, timeline)?;
}
Ok(())
}
// TODO this method currentlly does not do anything to prevent (or react to) state updates between a sync task schedule and a sync task end (that causes this update).
// TODO this method currently does not do anything to prevent (or react to) state updates between a sync task schedule and a sync task end (that causes this update).
// Sync task is enqueued and can error and be rescheduled, so some significant time may pass between the events.
//
/// Reacts on the timeline sync state change, changing pageserver's memory state for this timeline (unload or load of the timeline files).
@@ -300,7 +296,7 @@ impl Repository for LayeredRepository {
) -> Result<()> {
let mut timelines_accessor = self.timelines.lock().unwrap();
let timeline_to_shutdown = match new_state {
match new_state {
TimelineSyncState::Ready(_) => {
let reloaded_timeline =
self.init_local_timeline(timeline_id, &mut timelines_accessor)?;
@@ -320,10 +316,6 @@ impl Repository for LayeredRepository {
};
drop(timelines_accessor);
if let Some(timeline) = timeline_to_shutdown {
shutdown_timeline(self.tenantid, timeline_id, &timeline)?;
}
Ok(())
}
@@ -349,30 +341,6 @@ impl Repository for LayeredRepository {
}
}
fn shutdown_timeline(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
timeline: &LayeredTimelineEntry,
) -> Result<(), anyhow::Error> {
match timeline {
LayeredTimelineEntry::Local(timeline) => {
timeline
.upload_relishes
.store(false, atomic::Ordering::Relaxed);
walreceiver::stop_wal_receiver(timeline_id);
trace!("repo shutdown. checkpoint timeline {}", timeline_id);
// Do not reconstruct pages to reduce shutdown time
timeline.checkpoint(CheckpointConfig::Flush)?;
//TODO Wait for walredo process to shutdown too
}
LayeredTimelineEntry::Remote { .. } => warn!(
"Skipping shutdown of a remote timeline {} for tenant {}",
timeline_id, tenant_id
),
}
Ok(())
}
#[derive(Clone)]
enum LayeredTimelineEntry {
Local(Arc<LayeredTimeline>),
@@ -489,6 +457,7 @@ impl LayeredRepository {
tenantid,
conf,
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
upload_relishes,
}
@@ -505,10 +474,10 @@ impl LayeredRepository {
let _enter = info_span!("saving metadata").entered();
let path = metadata_path(conf, timelineid, tenantid);
// use OpenOptions to ensure file presence is consistent with first_save
let mut file = OpenOptions::new()
.write(true)
.create_new(first_save)
.open(&path)?;
let mut file = VirtualFile::open_with_options(
&path,
OpenOptions::new().write(true).create_new(first_save),
)?;
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
@@ -575,7 +544,8 @@ impl LayeredRepository {
let now = Instant::now();
// grab mutex to prevent new timelines from being created here.
// TODO: We will hold it for a long time
let _gc_cs = self.gc_cs.lock().unwrap();
let mut timelines = self.timelines.lock().unwrap();
// Scan all timelines. For each timeline, remember the timeline ID and
@@ -641,8 +611,10 @@ impl LayeredRepository {
// Ok, we now know all the branch points.
// Perform GC for each timeline.
for timelineid in timelineids {
if tenant_mgr::shutdown_requested() {
return Ok(totals);
if thread_mgr::is_shutdown_requested() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
}
// We have already loaded all timelines above
@@ -663,6 +635,7 @@ impl LayeredRepository {
}
if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
drop(timelines);
let branchpoints: Vec<Lsn> = all_branchpoints
.range((
Included((timelineid, Lsn(0))),
@@ -682,6 +655,7 @@ impl LayeredRepository {
let result = timeline.gc_timeline(branchpoints, cutoff)?;
totals += result;
timelines = self.timelines.lock().unwrap();
}
}
@@ -759,6 +733,12 @@ pub struct LayeredTimeline {
/// to avoid deadlock.
write_lock: Mutex<()>,
// Prevent concurrent checkpoints.
// Checkpoints are normally performed by one thread. But checkpoint can also be manually requested by admin
// (that's used in tests), and shutdown also forces a checkpoint. These forced checkpoints run in a different thread
// and could be triggered at the same time as a normal checkpoint.
checkpoint_cs: Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
latest_gc_cutoff_lsn: AtomicLsn,
@@ -1118,6 +1098,7 @@ impl LayeredTimeline {
upload_relishes: AtomicBool::new(upload_relishes),
write_lock: Mutex::new(()),
checkpoint_cs: Mutex::new(()),
latest_gc_cutoff_lsn: AtomicLsn::from(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
@@ -1435,6 +1416,9 @@ impl LayeredTimeline {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
fn checkpoint_internal(&self, checkpoint_distance: u64, reconstruct_pages: bool) -> Result<()> {
// Prevent concurrent checkpoints
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();

View File

@@ -11,6 +11,7 @@ pub mod remote_storage;
pub mod repository;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -14,12 +14,11 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use regex::Regex;
use std::io;
use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::{io, net::TcpStream};
use tracing::*;
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::auth::{self, JwtAuth};
@@ -39,6 +38,8 @@ use crate::config::PageServerConf;
use crate::relish::*;
use crate::repository::Timeline;
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::walreceiver;
use crate::CheckpointConfig;
@@ -189,30 +190,61 @@ pub fn thread_main(
listener: TcpListener,
auth_type: AuthType,
) -> anyhow::Result<()> {
let mut join_handles = Vec::new();
listener.set_nonblocking(true)?;
let basic_rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
while !tenant_mgr::shutdown_requested() {
let (socket, peer_addr) = listener.accept()?;
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
let local_auth = auth.clone();
let tokio_listener = {
let _guard = basic_rt.enter();
tokio::net::TcpListener::from_std(listener)
}?;
let handle = thread::Builder::new()
.name("serving Page Service thread".into())
.spawn(move || {
if let Err(err) = page_service_conn_main(conf, local_auth, socket, auth_type) {
error!(%err, "page server thread exited with error");
// Wait for a new connection to arrive, or for server shutdown.
while let Some(res) = basic_rt.block_on(async {
let shutdown_watcher = thread_mgr::shutdown_watcher();
tokio::select! {
biased;
_ = shutdown_watcher => {
// We were requested to shut down.
None
}
res = tokio_listener.accept() => {
Some(res)
}
}
}) {
match res {
Ok((socket, peer_addr)) => {
// Connection established. Spawn a new thread to handle it.
debug!("accepted connection from {}", peer_addr);
let local_auth = auth.clone();
// PageRequestHandler threads are not associated with any particular
// timeline in the thread manager. In practice most connections will
// only deal with a particular timeline, but we don't know which one
// yet.
if let Err(err) = thread_mgr::spawn(
ThreadKind::PageRequestHandler,
None,
None,
"serving Page Service thread",
move || page_service_conn_main(conf, local_auth, socket, auth_type),
) {
// Thread creation failed. Log the error and continue.
error!("could not spawn page service thread: {:?}", err);
}
})
.unwrap();
join_handles.push(handle);
}
Err(err) => {
// accept() failed. Log the error, and loop back to retry on next connection.
error!("accept() failed: {:?}", err);
}
}
}
debug!("page_service loop terminated. wait for connections to cancel");
for handle in join_handles.into_iter() {
handle.join().unwrap();
}
debug!("page_service loop terminated");
Ok(())
}
@@ -220,10 +252,10 @@ pub fn thread_main(
fn page_service_conn_main(
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
socket: TcpStream,
socket: tokio::net::TcpStream,
auth_type: AuthType,
) -> anyhow::Result<()> {
// Immediatsely increment the gauge, then create a job to decrement it on thread exit.
// Immediately increment the gauge, then create a job to decrement it on thread exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
@@ -232,6 +264,19 @@ fn page_service_conn_main(
gauge.dec();
}
// We use Tokio to accept the connection, but the rest of the code works with a
// regular socket. Convert.
let socket = socket
.into_std()
.context("could not convert tokio::net:TcpStream to std::net::TcpStream")?;
socket
.set_nonblocking(false)
.context("could not put socket to blocking mode")?;
socket
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let mut conn_handler = PageServerHandler::new(conf, auth);
let pgbackend = PostgresBackend::new(socket, auth_type, None, true)?;
pgbackend.run(&mut conn_handler)
@@ -286,7 +331,7 @@ impl PageServerHandler {
/* switch client to COPYBOTH */
pgb.write_message(&BeMessage::CopyBothResponse)?;
while !tenant_mgr::shutdown_requested() {
while !thread_mgr::is_shutdown_requested() {
match pgb.read_message() {
Ok(message) => {
if let Some(message) = message {
@@ -320,7 +365,7 @@ impl PageServerHandler {
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:#}", e);
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
@@ -594,7 +639,7 @@ impl postgres_backend::Handler for PageServerHandler {
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Failed to fetch local timeline for callmemaybe requests")?;
walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned());
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("branch_create ") {

View File

@@ -5,7 +5,7 @@
//! There are a few components the storage machinery consists of:
//! * [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
//! * [`local_fs`] allows to use local file system as an external storage
//! * [`rust_s3`] uses AWS S3 bucket entirely as an external storage
//! * [`rust_s3`] uses AWS S3 bucket as an external storage
//!
//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync.
//! Synchronization internals are split into submodules
@@ -89,7 +89,6 @@ use std::{
collections::HashMap,
ffi, fs,
path::{Path, PathBuf},
thread,
};
use anyhow::{bail, Context};
@@ -125,8 +124,6 @@ pub struct SyncStartupData {
/// To reuse the local file scan logic, the timeline states are returned even if no sync loop get started during init:
/// in this case, no remote files exist and all local timelines with correct metadata files are considered ready.
pub initial_timeline_states: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>,
/// A handle to the sync loop, if it was started from the configuration provided.
pub sync_loop_handle: Option<thread::JoinHandle<anyhow::Result<()>>>,
}
/// Based on the config, initiates the remote storage connection and starts a separate thread
@@ -176,7 +173,6 @@ pub fn start_local_timeline_sync(
}
Ok(SyncStartupData {
initial_timeline_states,
sync_loop_handle: None,
})
}
}
@@ -205,7 +201,7 @@ fn local_tenant_timeline_files(
}
}
Err(e) => error!(
"Failed to list tenants dir entry {:?} in directory {}, reason: {:#}",
"Failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
tenants_dir_entry,
tenants_dir.display(),
e
@@ -246,14 +242,14 @@ fn collect_timelines_for_tenant(
);
}
Err(e) => error!(
"Failed to process timeline dir contents at '{}', reason: {:#}",
"Failed to process timeline dir contents at '{}', reason: {:?}",
timeline_path.display(),
e
),
}
}
Err(e) => error!(
"Failed to list timelines for entry tenant {}, reason: {:#}",
"Failed to list timelines for entry tenant {}, reason: {:?}",
tenant_id, e
),
}

View File

@@ -1,6 +1,8 @@
//! AWS S3 storage wrapper around `rust_s3` library.
//! Currently does not allow multiple pageservers to use the same bucket concurrently: objects are
//! placed in the root of the bucket.
//!
//! Respects `prefix_in_bucket` property from [`S3Config`],
//! allowing multiple pageservers to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::path::{Path, PathBuf};
@@ -23,8 +25,26 @@ impl S3ObjectKey {
&self.0
}
fn download_destination(&self, pageserver_workdir: &Path) -> PathBuf {
pageserver_workdir.join(self.0.split(S3_FILE_SEPARATOR).collect::<PathBuf>())
fn download_destination(
&self,
pageserver_workdir: &Path,
prefix_to_strip: Option<&str>,
) -> PathBuf {
let path_without_prefix = match prefix_to_strip {
Some(prefix) => self.0.strip_prefix(prefix).unwrap_or_else(|| {
panic!(
"Could not strip prefix '{}' from S3 object key '{}'",
prefix, self.0
)
}),
None => &self.0,
};
pageserver_workdir.join(
path_without_prefix
.split(S3_FILE_SEPARATOR)
.collect::<PathBuf>(),
)
}
}
@@ -32,6 +52,7 @@ impl S3ObjectKey {
pub struct S3 {
pageserver_workdir: &'static Path,
bucket: Bucket,
prefix_in_bucket: Option<String>,
}
impl S3 {
@@ -49,6 +70,20 @@ impl S3 {
None,
)
.context("Failed to create the s3 credentials")?;
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
let mut prefix = prefix;
while prefix.starts_with(S3_FILE_SEPARATOR) {
prefix = &prefix[1..]
}
let mut prefix = prefix.to_string();
while prefix.ends_with(S3_FILE_SEPARATOR) {
prefix.pop();
}
prefix
});
Ok(Self {
bucket: Bucket::new_with_path_style(
aws_config.bucket_name.as_str(),
@@ -57,6 +92,7 @@ impl S3 {
)
.context("Failed to create the s3 bucket")?,
pageserver_workdir,
prefix_in_bucket,
})
}
}
@@ -67,7 +103,7 @@ impl RemoteStorage for S3 {
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath> {
let relative_path = strip_path_prefix(self.pageserver_workdir, local_path)?;
let mut key = String::new();
let mut key = self.prefix_in_bucket.clone().unwrap_or_default();
for segment in relative_path {
key.push(S3_FILE_SEPARATOR);
key.push_str(&segment.to_string_lossy());
@@ -76,13 +112,14 @@ impl RemoteStorage for S3 {
}
fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result<PathBuf> {
Ok(storage_path.download_destination(self.pageserver_workdir))
Ok(storage_path
.download_destination(self.pageserver_workdir, self.prefix_in_bucket.as_deref()))
}
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
let list_response = self
.bucket
.list(String::new(), None)
.list(self.prefix_in_bucket.clone().unwrap_or_default(), None)
.await
.context("Failed to list s3 objects")?;
@@ -225,7 +262,7 @@ mod tests {
assert_eq!(
local_path,
key.download_destination(&repo_harness.conf.workdir),
key.download_destination(&repo_harness.conf.workdir, None),
"Download destination should consist of s3 path joined with the pageserver workdir prefix"
);
@@ -239,14 +276,18 @@ mod tests {
let segment_1 = "matching";
let segment_2 = "file";
let local_path = &repo_harness.conf.workdir.join(segment_1).join(segment_2);
let storage = dummy_storage(&repo_harness.conf.workdir);
let expected_key = S3ObjectKey(format!(
"{SEPARATOR}{}{SEPARATOR}{}",
"{}{SEPARATOR}{}{SEPARATOR}{}",
storage.prefix_in_bucket.as_deref().unwrap_or_default(),
segment_1,
segment_2,
SEPARATOR = S3_FILE_SEPARATOR,
));
let actual_key = dummy_storage(&repo_harness.conf.workdir)
let actual_key = storage
.storage_path(local_path)
.expect("Matching path should map to S3 path normally");
assert_eq!(
@@ -308,18 +349,30 @@ mod tests {
let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID);
let relative_timeline_path = timeline_dir.strip_prefix(&repo_harness.conf.workdir)?;
let s3_key = create_s3_key(&relative_timeline_path.join("not a metadata"));
let s3_key = create_s3_key(
&relative_timeline_path.join("not a metadata"),
storage.prefix_in_bucket.as_deref(),
);
assert_eq!(
s3_key.download_destination(&repo_harness.conf.workdir),
s3_key.download_destination(
&repo_harness.conf.workdir,
storage.prefix_in_bucket.as_deref()
),
storage
.local_path(&s3_key)
.expect("For a valid input, valid S3 info should be parsed"),
"Should be able to parse metadata out of the correctly named remote delta file"
);
let s3_key = create_s3_key(&relative_timeline_path.join(METADATA_FILE_NAME));
let s3_key = create_s3_key(
&relative_timeline_path.join(METADATA_FILE_NAME),
storage.prefix_in_bucket.as_deref(),
);
assert_eq!(
s3_key.download_destination(&repo_harness.conf.workdir),
s3_key.download_destination(
&repo_harness.conf.workdir,
storage.prefix_in_bucket.as_deref()
),
storage
.local_path(&s3_key)
.expect("For a valid input, valid S3 info should be parsed"),
@@ -356,18 +409,18 @@ mod tests {
Credentials::anonymous().unwrap(),
)
.unwrap(),
prefix_in_bucket: Some("dummy_prefix/".to_string()),
}
}
fn create_s3_key(relative_file_path: &Path) -> S3ObjectKey {
S3ObjectKey(
relative_file_path
.iter()
.fold(String::new(), |mut path_string, segment| {
path_string.push(S3_FILE_SEPARATOR);
path_string.push_str(segment.to_str().unwrap());
path_string
}),
)
fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> S3ObjectKey {
S3ObjectKey(relative_file_path.iter().fold(
prefix.unwrap_or_default().to_string(),
|mut path_string, segment| {
path_string.push(S3_FILE_SEPARATOR);
path_string.push_str(segment.to_str().unwrap());
path_string
},
))
}
}

View File

@@ -80,16 +80,19 @@ use std::{
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
sync::Arc,
thread,
};
use anyhow::{bail, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use tokio::{fs, sync::RwLock};
use tokio::{
sync::mpsc::{self, UnboundedReceiver},
time::Instant,
fs,
runtime::Runtime,
sync::{
mpsc::{self, UnboundedReceiver},
RwLock,
},
time::{Duration, Instant},
};
use tracing::*;
@@ -106,7 +109,7 @@ use super::{RemoteStorage, SyncStartupData, TimelineSyncId};
use crate::{
config::PageServerConf, layered_repository::metadata::TimelineMetadata,
remote_storage::storage_sync::compression::read_archive_header, repository::TimelineSyncState,
tenant_mgr::set_timeline_states,
tenant_mgr::set_timeline_states, thread_mgr, thread_mgr::ThreadKind,
};
use zenith_metrics::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge};
@@ -369,7 +372,7 @@ pub(super) fn spawn_storage_sync_thread<
Ok(local_path) => Some(local_path),
Err(e) => {
error!(
"Failed to find local path for remote path {:?}: {:#}",
"Failed to find local path for remote path {:?}: {:?}",
remote_path, e
);
None
@@ -379,9 +382,12 @@ pub(super) fn spawn_storage_sync_thread<
let initial_timeline_states = schedule_first_sync_tasks(&remote_index, local_timeline_files);
let handle = thread::Builder::new()
.name("Remote storage sync thread".to_string())
.spawn(move || {
thread_mgr::spawn(
ThreadKind::StorageSync,
None,
None,
"Remote storage sync thread",
move || {
storage_sync_loop(
runtime,
conf,
@@ -391,19 +397,25 @@ pub(super) fn spawn_storage_sync_thread<
max_concurrent_sync,
max_sync_errors,
)
})
.context("Failed to spawn remote storage sync thread")?;
},
)
.context("Failed to spawn remote storage sync thread")?;
Ok(SyncStartupData {
initial_timeline_states,
sync_loop_handle: Some(handle),
})
}
enum LoopStep {
NewStates(HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>),
Shutdown,
}
#[allow(clippy::too_many_arguments)]
fn storage_sync_loop<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
runtime: tokio::runtime::Runtime,
runtime: Runtime,
conf: &'static PageServerConf,
mut receiver: UnboundedReceiver<SyncTask>,
index: RemoteTimelineIndex,
@@ -412,23 +424,34 @@ fn storage_sync_loop<
max_sync_errors: NonZeroU32,
) -> anyhow::Result<()> {
let remote_assets = Arc::new((storage, RwLock::new(index)));
while !crate::tenant_mgr::shutdown_requested() {
let new_timeline_states = runtime.block_on(
loop_step(
conf,
&mut receiver,
Arc::clone(&remote_assets),
max_concurrent_sync,
max_sync_errors,
)
.instrument(debug_span!("storage_sync_loop_step")),
);
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
set_timeline_states(conf, new_timeline_states);
debug!("Sync loop step completed");
loop {
let loop_step = runtime.block_on(async {
tokio::select! {
new_timeline_states = loop_step(
conf,
&mut receiver,
Arc::clone(&remote_assets),
max_concurrent_sync,
max_sync_errors,
)
.instrument(debug_span!("storage_sync_loop_step")) => LoopStep::NewStates(new_timeline_states),
_ = thread_mgr::shutdown_watcher() => LoopStep::Shutdown,
}
});
match loop_step {
LoopStep::NewStates(new_timeline_states) => {
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
set_timeline_states(conf, new_timeline_states);
debug!("Sync loop step completed");
}
LoopStep::Shutdown => {
debug!("Shutdown requested, stopping");
break;
}
}
}
debug!("Shutdown requested, stopping");
Ok(())
}
@@ -482,7 +505,7 @@ async fn loop_step<
Ok(extra_step) => extra_step,
Err(e) => {
error!(
"Failed to process storage sync task for tenant {}, timeline {}: {:#}",
"Failed to process storage sync task for tenant {}, timeline {}: {:?}",
sync_id.0, sync_id.1, e
);
None
@@ -539,7 +562,7 @@ async fn process_task<
"Waiting {} seconds before starting the task",
seconds_to_wait
);
tokio::time::sleep(tokio::time::Duration::from_secs_f64(seconds_to_wait)).await;
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
}
let sync_start = Instant::now();

View File

@@ -80,7 +80,7 @@ pub(super) async fn download_timeline<
{
Ok(remote_timeline) => Cow::Owned(remote_timeline),
Err(e) => {
error!("Failed to download full timeline index: {:#}", e);
error!("Failed to download full timeline index: {:?}", e);
return match remote_disk_consistent_lsn {
Some(disk_consistent_lsn) => {
sync_queue::push(SyncTask::new(
@@ -112,7 +112,7 @@ pub(super) async fn download_timeline<
if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.0).await {
error!(
"Failed to download missing branches for sync id {}: {:#}",
"Failed to download missing branches for sync id {}: {:?}",
sync_id, e
);
sync_queue::push(SyncTask::new(
@@ -150,7 +150,7 @@ pub(super) async fn download_timeline<
Err(e) => {
let archives_left = archives_total - archives_downloaded;
error!(
"Failed to download archive {:?} (archives downloaded: {}; archives left: {}) for tenant {} timeline {}, requeueing the download: {:#}",
"Failed to download archive {:?} (archives downloaded: {}; archives left: {}) for tenant {} timeline {}, requeueing the download: {:?}",
archive_id, archives_downloaded, archives_left, tenant_id, timeline_id, e
);
sync_queue::push(SyncTask::new(
@@ -202,7 +202,7 @@ async fn try_download_archive<
archive_to_download.disk_consistent_lsn(),
local_metadata.disk_consistent_lsn()
),
Err(e) => warn!("Failed to read local metadata file, assuing it's safe to override its with the download. Read: {:#}", e),
Err(e) => warn!("Failed to read local metadata file, assuming it's safe to override its with the download. Read: {:#}", e),
}
compression::uncompress_file_stream_with_index(
conf.timeline_path(&timeline_id, &tenant_id),
@@ -307,7 +307,7 @@ async fn download_missing_branches<
while let Some(download_result) = remote_only_branches_downloads.next().await {
if let Err(e) = download_result {
branch_downloads_failed = true;
error!("Failed to download a branch file: {:#}", e);
error!("Failed to download a branch file: {:?}", e);
}
}
ensure!(

View File

@@ -43,7 +43,7 @@ pub(super) async fn upload_timeline_checkpoint<
debug!("Uploading checkpoint for sync id {}", sync_id);
if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.0).await {
error!(
"Failed to upload missing branches for sync id {}: {:#}",
"Failed to upload missing branches for sync id {}: {:?}",
sync_id, e
);
sync_queue::push(SyncTask::new(
@@ -69,7 +69,7 @@ pub(super) async fn upload_timeline_checkpoint<
match update_index_description(remote_assets.as_ref(), &timeline_dir, sync_id).await {
Ok(remote_timeline) => Some(Cow::Owned(remote_timeline)),
Err(e) => {
error!("Failed to download full timeline index: {:#}", e);
error!("Failed to download full timeline index: {:?}", e);
sync_queue::push(SyncTask::new(
sync_id,
retries,
@@ -132,7 +132,7 @@ pub(super) async fn upload_timeline_checkpoint<
}
Err(e) => {
error!(
"Failed to upload checkpoint: {:#}, requeueing the upload",
"Failed to upload checkpoint: {:?}, requeueing the upload",
e
);
sync_queue::push(SyncTask::new(
@@ -253,7 +253,7 @@ async fn upload_missing_branches<
.await
.add_branch_file(tenant_id, local_only_branch.clone()),
Err(e) => {
error!("Failed to upload branch file: {:#}", e);
error!("Failed to upload branch file: {:?}", e);
branch_uploads_failed = true;
}
}

View File

@@ -19,8 +19,6 @@ pub type BlockNumber = u32;
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
fn shutdown(&self) -> Result<()>;
/// Updates timeline based on the new sync state, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn set_timeline_state(

View File

@@ -5,15 +5,16 @@ use crate::branches;
use crate::config::PageServerConf;
use crate::layered_repository::LayeredRepository;
use crate::repository::{Repository, Timeline, TimelineSyncState};
use crate::tenant_threads;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::walredo::PostgresRedoManager;
use crate::CheckpointConfig;
use anyhow::{anyhow, bail, Context, Result};
use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap};
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
@@ -23,7 +24,7 @@ lazy_static! {
struct Tenant {
state: TenantState,
repo: Option<Arc<dyn Repository>>,
repo: Arc<dyn Repository>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -56,8 +57,6 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
TENANTS.lock().unwrap()
}
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
/// Updates tenants' repositories, changing their timelines state in memory.
pub fn set_timeline_states(
conf: &'static PageServerConf,
@@ -73,28 +72,7 @@ pub fn set_timeline_states(
let mut m = access_tenants();
for (tenant_id, timeline_states) in timeline_states {
let tenant = m.entry(tenant_id).or_insert_with(|| Tenant {
state: TenantState::Idle,
repo: None,
});
if let Err(e) = put_timelines_into_tenant(conf, tenant, tenant_id, timeline_states) {
error!(
"Failed to update timeline states for tenant {}: {:#}",
tenant_id, e
);
}
}
}
fn put_timelines_into_tenant(
conf: &'static PageServerConf,
tenant: &mut Tenant,
tenant_id: ZTenantId,
timeline_states: HashMap<ZTimelineId, TimelineSyncState>,
) -> anyhow::Result<()> {
let repo = match tenant.repo.as_ref() {
Some(repo) => Arc::clone(repo),
None => {
let tenant = m.entry(tenant_id).or_insert_with(|| {
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
@@ -105,13 +83,43 @@ fn put_timelines_into_tenant(
tenant_id,
conf.remote_storage_config.is_some(),
));
tenant.repo = Some(Arc::clone(&repo));
repo
Tenant {
state: TenantState::Idle,
repo,
}
});
if let Err(e) = put_timelines_into_tenant(tenant, tenant_id, timeline_states) {
error!(
"Failed to update timeline states for tenant {}: {:?}",
tenant_id, e
);
}
};
}
}
fn put_timelines_into_tenant(
tenant: &mut Tenant,
tenant_id: ZTenantId,
timeline_states: HashMap<ZTimelineId, TimelineSyncState>,
) -> anyhow::Result<()> {
for (timeline_id, timeline_state) in timeline_states {
repo.set_timeline_state(timeline_id, timeline_state)
// If the timeline is being put into any other state than Ready,
// stop any threads operating on it.
//
// FIXME: This is racy. A page service thread could just get
// handle on the Timeline, before we call set_timeline_state()
if !matches!(timeline_state, TimelineSyncState::Ready(_)) {
thread_mgr::shutdown_threads(None, Some(tenant_id), Some(timeline_id));
// Should we run a final checkpoint to flush all the data to
// disk? Doesn't seem necessary; all of the states other than
// Ready imply that the data on local disk is corrupt or incomplete,
// and we don't want to flush that to disk.
}
tenant
.repo
.set_timeline_state(timeline_id, timeline_state)
.with_context(|| {
format!(
"Failed to update timeline {} state to {:?}",
@@ -123,29 +131,49 @@ fn put_timelines_into_tenant(
Ok(())
}
// Check this flag in the thread loops to know when to exit
pub fn shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::Relaxed)
}
pub fn shutdown_all_tenants() -> Result<()> {
SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed);
let tenantids = list_tenantids()?;
for tenantid in &tenantids {
set_tenant_state(*tenantid, TenantState::Stopping)?;
///
/// Shut down all tenants. This runs as part of pageserver shutdown.
///
pub fn shutdown_all_tenants() {
let mut m = access_tenants();
let mut tenantids = Vec::new();
for (tenantid, tenant) in m.iter_mut() {
tenant.state = TenantState::Stopping;
tenantids.push(*tenantid)
}
drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Checkpointer), None, None);
// Ok, no background threads running anymore. Flush any remaining data in
// memory to disk.
//
// We assume that any incoming connections that might request pages from
// the repository have already been terminated by the caller, so there
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
for tenantid in tenantids {
// Wait for checkpointer and GC to finish their job
tenant_threads::wait_for_tenant_threads_to_stop(tenantid);
let repo = get_repository_for_tenant(tenantid)?;
debug!("shutdown tenant {}", tenantid);
repo.shutdown()?;
match get_repository_for_tenant(tenantid) {
Ok(repo) => {
if let Err(err) = repo.checkpoint_iteration(CheckpointConfig::Flush) {
error!(
"Could not checkpoint tenant {} during shutdown: {:?}",
tenantid, err
);
}
}
Err(err) => {
error!(
"Could not get repository for tenant {} during shutdown: {:?}",
tenantid, err
);
}
}
}
Ok(())
}
pub fn create_repository_for_tenant(
@@ -153,7 +181,7 @@ pub fn create_repository_for_tenant(
tenantid: ZTenantId,
) -> Result<()> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = Some(branches::create_repo(conf, tenantid, wal_redo_manager)?);
let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?;
match access_tenants().entry(tenantid) {
hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid),
@@ -172,22 +200,51 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
Some(access_tenants().get(&tenantid)?.state)
}
pub fn set_tenant_state(tenantid: ZTenantId, newstate: TenantState) -> Result<TenantState> {
///
/// Change the state of a tenant to Active and launch its checkpointer and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Result<()> {
let mut m = access_tenants();
let tenant = m.get_mut(&tenantid);
let tenant = m
.get_mut(&tenantid)
.ok_or_else(|| anyhow!("Tenant not found for id {}", tenantid))?;
match tenant {
Some(tenant) => {
if newstate == TenantState::Idle && tenant.state != TenantState::Active {
// Only Active tenant can become Idle
return Ok(tenant.state);
}
info!("set_tenant_state: {} -> {}", tenant.state, newstate);
tenant.state = newstate;
Ok(tenant.state)
info!("activating tenant {}", tenantid);
match tenant.state {
// If the tenant is already active, nothing to do.
TenantState::Active => {}
// If it's Idle, launch the checkpointer and GC threads
TenantState::Idle => {
thread_mgr::spawn(
ThreadKind::Checkpointer,
Some(tenantid),
None,
"Checkpointer thread",
move || crate::tenant_threads::checkpoint_loop(tenantid, conf),
)?;
// FIXME: if we fail to launch the GC thread, but already launched the
// checkpointer, we're in a strange state.
thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenantid),
None,
"GC thread",
move || crate::tenant_threads::gc_loop(tenantid, conf),
)?;
tenant.state = TenantState::Active;
}
TenantState::Stopping => {
// don't re-activate it if it's being stopped
}
None => bail!("Tenant not found for id {}", tenantid),
}
Ok(())
}
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
@@ -196,10 +253,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Reposito
.get(&tenantid)
.ok_or_else(|| anyhow!("Tenant not found for tenant {}", tenantid))?;
match &tenant.repo {
Some(repo) => Ok(Arc::clone(repo)),
None => anyhow::bail!("Repository for tenant {} is not yet valid", tenantid),
}
Ok(Arc::clone(&tenant.repo))
}
pub fn get_timeline_for_tenant(
@@ -212,16 +266,6 @@ pub fn get_timeline_for_tenant(
.ok_or_else(|| anyhow!("cannot fetch timeline {}", timelineid))
}
fn list_tenantids() -> Result<Vec<ZTenantId>> {
access_tenants()
.iter()
.map(|v| {
let (tenantid, _) = v;
Ok(*tenantid)
})
.collect()
}
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde(with = "hex")]

View File

@@ -5,88 +5,14 @@ use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::CheckpointConfig;
use anyhow::Result;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Mutex;
use std::thread::JoinHandle;
use std::time::Duration;
use tracing::*;
use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
use zenith_utils::zid::ZTenantId;
struct TenantHandleEntry {
checkpointer_handle: Option<JoinHandle<()>>,
gc_handle: Option<JoinHandle<()>>,
}
// Preserve handles to wait for thread completion
// at shutdown
lazy_static! {
static ref TENANT_HANDLES: Mutex<HashMap<ZTenantId, TenantHandleEntry>> =
Mutex::new(HashMap::new());
}
lazy_static! {
static ref TENANT_THREADS_COUNT: IntGaugeVec = register_int_gauge_vec!(
"tenant_threads_count",
"Number of live tenant threads",
&["tenant_thread_type"]
)
.expect("failed to define a metric");
}
// Launch checkpointer and GC for the tenant.
// It's possible that the threads are running already,
// if so, just don't spawn new ones.
pub fn start_tenant_threads(conf: &'static PageServerConf, tenantid: ZTenantId) {
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = handles
.entry(tenantid)
.or_insert_with(|| TenantHandleEntry {
checkpointer_handle: None,
gc_handle: None,
});
if h.checkpointer_handle.is_none() {
h.checkpointer_handle = std::thread::Builder::new()
.name("Checkpointer thread".into())
.spawn(move || {
checkpoint_loop(tenantid, conf).expect("Checkpointer thread died");
})
.ok();
}
if h.gc_handle.is_none() {
h.gc_handle = std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
gc_loop(tenantid, conf).expect("GC thread died");
})
.ok();
}
}
pub fn wait_for_tenant_threads_to_stop(tenantid: ZTenantId) {
let mut handles = TENANT_HANDLES.lock().unwrap();
if let Some(h) = handles.get_mut(&tenantid) {
h.checkpointer_handle.take().map(JoinHandle::join);
trace!("checkpointer for tenant {} has stopped", tenantid);
h.gc_handle.take().map(JoinHandle::join);
trace!("gc for tenant {} has stopped", tenantid);
}
handles.remove(&tenantid);
}
///
/// Checkpointer thread's main loop
///
fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
let gauge = TENANT_THREADS_COUNT.with_label_values(&["checkpointer"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
pub fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
@@ -112,13 +38,7 @@ fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result
///
/// GC thread's main loop
///
fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
let gauge = TENANT_THREADS_COUNT.with_label_values(&["gc"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
pub fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;

View File

@@ -0,0 +1,284 @@
//!
//! This module provides centralized handling of threads in the Page Server.
//!
//! We provide a few basic facilities:
//! - A global registry of threads that lists what kind of threads they are, and
//! which tenant or timeline they are working on
//!
//! - The ability to request a thread to shut down.
//!
//!
//! # How it works?
//!
//! There is a global hashmap of all the threads (`THREADS`). Whenever a new
//! thread is spawned, a PageServerThread entry is added there, and when a
//! thread dies, it removes itself from the hashmap. If you want to kill a
//! thread, you can scan the hashmap to find it.
//!
//! # Thread shutdown
//!
//! To kill a thread, we rely on co-operation from the victim. Each thread is
//! expected to periodically call the `is_shutdown_requested()` function, and
//! if it returns true, exit gracefully. In addition to that, when waiting for
//! the network or other long-running operation, you can use
//! `shutdown_watcher()` function to get a Future that will become ready if
//! the current thread has been requested to shut down. You can use that with
//! Tokio select!(), but note that it relies on thread-local storage, so it
//! will only work with the "current-thread" Tokio runtime!
//!
//!
//! TODO: This would be a good place to also handle panics in a somewhat sane way.
//! Depending on what thread panics, we might want to kill the whole server, or
//! only a single tenant or timeline.
//!
use std::cell::RefCell;
use std::collections::HashMap;
use std::panic;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
use tokio::sync::watch;
use tracing::{info, warn};
use lazy_static::lazy_static;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
/// Each thread that we track is associated with a "thread ID". It's just
/// an increasing number that we assign, not related to any system thread
/// id.
static ref NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
/// Global registry of threads
static ref THREADS: Mutex<HashMap<u64, Arc<PageServerThread>>> = Mutex::new(HashMap::new());
}
// There is a Tokio watch channel for each thread, which can be used to signal the
// thread that it needs to shut down. This thread local variable holds the receiving
// end of the channel. The sender is kept in the global registry, so that anyone
// can send the signal to request thread shutdown.
thread_local!(static SHUTDOWN_RX: RefCell<Option<watch::Receiver<()>>> = RefCell::new(None));
// Each thread holds reference to its own PageServerThread here.
thread_local!(static CURRENT_THREAD: RefCell<Option<Arc<PageServerThread>>> = RefCell::new(None));
///
/// There are many kinds of threads in the system. Some are associated with a particular
/// tenant or timeline, while others are global.
///
/// Note that we don't try to limit how may threads of a certain kind can be running
/// at the same time.
///
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ThreadKind {
// libpq listener thread. It just accepts connection and spawns a
// PageRequestHandler thread for each connection.
LibpqEndpointListener,
// HTTP endpoint listener.
HttpEndpointListener,
// Thread that handles a single connection. A PageRequestHandler thread
// starts detached from any particular tenant or timeline, but it can be
// associated with one later, after receiving a command from the client.
PageRequestHandler,
// Thread that connects to a safekeeper to fetch WAL for one timeline.
WalReceiver,
// Thread that handles checkpointing of all timelines for a tenant.
Checkpointer,
// Thread that handles GC of a tenant
GarbageCollector,
// Thread for synchronizing pageserver relish data with the remote storage.
// Shared by all tenants.
StorageSync,
}
struct PageServerThread {
_thread_id: u64,
kind: ThreadKind,
/// Tenant and timeline that this thread is associated with.
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
name: String,
// To request thread shutdown, set the flag, and send a dummy message to the
// channel to notify it.
shutdown_requested: AtomicBool,
shutdown_tx: watch::Sender<()>,
/// Handle for waiting for the thread to exit. It can be None, if the
/// the thread has already exited.
join_handle: Mutex<Option<JoinHandle<()>>>,
}
/// Launch a new thread
pub fn spawn<F, E>(
kind: ThreadKind,
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
name: &str,
f: F,
) -> std::io::Result<()>
where
F: FnOnce() -> Result<(), E> + Send + 'static,
{
let (shutdown_tx, shutdown_rx) = watch::channel(());
let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
let thread = PageServerThread {
_thread_id: thread_id,
kind,
tenant_id,
timeline_id,
name: name.to_string(),
shutdown_requested: AtomicBool::new(false),
shutdown_tx,
join_handle: Mutex::new(None),
};
let thread_rc = Arc::new(thread);
let mut jh_guard = thread_rc.join_handle.lock().unwrap();
THREADS
.lock()
.unwrap()
.insert(thread_id, Arc::clone(&thread_rc));
let thread_rc2 = Arc::clone(&thread_rc);
let join_handle = match thread::Builder::new()
.name(name.to_string())
.spawn(move || thread_wrapper(thread_id, thread_rc2, shutdown_rx, f))
{
Ok(handle) => handle,
Err(err) => {
// Could not spawn the thread. Remove the entry
THREADS.lock().unwrap().remove(&thread_id);
return Err(err);
}
};
*jh_guard = Some(join_handle);
drop(jh_guard);
// The thread is now running. Nothing more to do here
Ok(())
}
/// This wrapper function runs in a newly-spawned thread. It initializes the
/// thread-local variables and calls the payload function
fn thread_wrapper<F, E>(
thread_id: u64,
thread: Arc<PageServerThread>,
shutdown_rx: watch::Receiver<()>,
f: F,
) where
F: FnOnce() -> Result<(), E> + Send + 'static,
{
SHUTDOWN_RX.with(|rx| {
*rx.borrow_mut() = Some(shutdown_rx);
});
CURRENT_THREAD.with(|ct| {
*ct.borrow_mut() = Some(thread);
});
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
let result = panic::catch_unwind(AssertUnwindSafe(f));
// Remove our entry from the global hashmap.
THREADS.lock().unwrap().remove(&thread_id);
// If the thread payload panic'd, exit with the panic.
if let Err(err) = result {
panic::resume_unwind(err);
}
}
/// Is there a thread running that matches the criteria
/// Signal and wait for threads to shut down.
///
///
/// The arguments are used to select the threads to kill. Any None arguments are
/// ignored. For example, to shut down all WalReceiver threads:
///
/// shutdown_threads(Some(ThreadKind::WalReceiver), None, None)
///
/// Or to shut down all threads for given timeline:
///
/// shutdown_threads(None, Some(timelineid), None)
///
pub fn shutdown_threads(
kind: Option<ThreadKind>,
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
) {
let mut victim_threads = Vec::new();
let threads = THREADS.lock().unwrap();
for thread in threads.values() {
if (kind.is_none() || Some(thread.kind) == kind)
&& (tenant_id.is_none() || thread.tenant_id == tenant_id)
&& (timeline_id.is_none() || thread.timeline_id == timeline_id)
{
thread.shutdown_requested.store(true, Ordering::Relaxed);
// FIXME: handle error?
let _ = thread.shutdown_tx.send(());
victim_threads.push(Arc::clone(thread));
}
}
drop(threads);
for thread in victim_threads {
info!("waiting for {} to shut down", thread.name);
if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {
let _ = join_handle.join();
} else {
// The thread had not even fully started yet. Or it was shut down
// concurrently and alrady exited
}
}
}
/// A Future that can be used to check if the current thread has been requested to
/// shut down.
pub async fn shutdown_watcher() {
let _ = SHUTDOWN_RX
.with(|rx| {
rx.borrow()
.as_ref()
.expect("shutdown_requested() called in an unexpected thread")
.clone()
})
.changed()
.await;
}
/// Has the current thread been requested to shut down?
pub fn is_shutdown_requested() -> bool {
CURRENT_THREAD.with(|ct| {
if let Some(ct) = ct.borrow().as_ref() {
ct.shutdown_requested.load(Ordering::Relaxed)
} else {
if !cfg!(test) {
warn!("is_shutdown_requested() called in an unexpected thread");
}
false
}
})
}

View File

@@ -10,15 +10,46 @@
//! This is similar to PostgreSQL's virtual file descriptor facility in
//! src/backend/storage/file/fd.c
//!
use lazy_static::lazy_static;
use std::fs::{File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use zenith_metrics::{register_histogram_vec, register_int_gauge_vec, HistogramVec, IntGaugeVec};
use once_cell::sync::OnceCell;
// Metrics collected on disk IO operations
const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
0.000001, // 1 usec
0.00001, // 10 usec
0.0001, // 100 usec
0.001, // 1 msec
0.01, // 10 msec
0.1, // 100 msec
1.0, // 1 sec
];
lazy_static! {
static ref STORAGE_IO_TIME: HistogramVec = register_histogram_vec!(
"pageserver_io_time",
"Time spent in IO operations",
&["operation", "tenant_id", "timeline_id"],
STORAGE_IO_TIME_BUCKETS.into()
)
.expect("failed to define a metric");
}
lazy_static! {
static ref STORAGE_IO_SIZE: IntGaugeVec = register_int_gauge_vec!(
"pageserver_io_size",
"Amount of bytes",
&["operation", "tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -51,6 +82,10 @@ pub struct VirtualFile {
/// storing it here.
pub path: PathBuf,
open_options: OpenOptions,
/// For metrics
tenantid: String,
timelineid: String,
}
#[derive(PartialEq, Clone, Copy)]
@@ -145,7 +180,13 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
drop(old_file);
// We do not have information about tenantid/timelineid of evicted file.
// It is possible to store path together with file or use filepath crate,
// but as far as close() is not expected to be fast, it is not so critical to gather
// precise per-tenant statistic here.
STORAGE_IO_TIME
.with_label_values(&["close", "-", "-"])
.observe_closure_duration(|| drop(old_file));
}
// Prepare the slot for reuse and return it
@@ -185,9 +226,20 @@ impl VirtualFile {
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let parts = path.to_str().unwrap().split('/').collect::<Vec<&str>>();
let tenantid;
let timelineid;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenantid = parts[parts.len() - 4].to_string();
timelineid = parts[parts.len() - 2].to_string();
} else {
tenantid = "*".to_string();
timelineid = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = open_options.open(path)?;
let file = STORAGE_IO_TIME
.with_label_values(&["open", &tenantid, &timelineid])
.observe_closure_duration(|| open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -204,6 +256,8 @@ impl VirtualFile {
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenantid,
timelineid,
};
slot_guard.file.replace(file);
@@ -213,13 +267,13 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub fn sync_all(&self) -> Result<(), Error> {
self.with_file(|file| file.sync_all())?
self.with_file("fsync", |file| file.sync_all())?
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
fn with_file<F, R>(&self, mut func: F) -> Result<R, Error>
fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
@@ -242,7 +296,9 @@ impl VirtualFile {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(func(file));
return Ok(STORAGE_IO_TIME
.with_label_values(&[op, &self.tenantid, &self.timelineid])
.observe_closure_duration(|| func(file)));
}
}
}
@@ -267,7 +323,9 @@ impl VirtualFile {
let (handle, mut slot_guard) = open_files.find_victim_slot();
// Open the physical file
let file = self.open_options.open(&self.path)?;
let file = STORAGE_IO_TIME
.with_label_values(&["open", &self.tenantid, &self.timelineid])
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
//
@@ -276,7 +334,9 @@ impl VirtualFile {
// library RwLock doesn't allow downgrading without releasing the lock,
// and that doesn't seem worth the trouble. (parking_lot RwLock would
// allow it)
let result = func(&file);
let result = STORAGE_IO_TIME
.with_label_values(&[op, &self.tenantid, &self.timelineid])
.observe_closure_duration(|| func(&file));
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -299,7 +359,13 @@ impl Drop for VirtualFile {
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
slot_guard.file.take();
// Unlike files evicted by replacement algorithm, here
// we group close time by tenantid/timelineid.
// At allows to compare number/time of "normal" file closes
// with file eviction.
STORAGE_IO_TIME
.with_label_values(&["close", &self.tenantid, &self.timelineid])
.observe_closure_duration(|| slot_guard.file.take());
}
}
}
@@ -335,7 +401,7 @@ impl Seek for VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self.with_file(|mut file| file.seek(SeekFrom::End(offset)))??
self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -357,11 +423,23 @@ impl Seek for VirtualFile {
impl FileExt for VirtualFile {
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
self.with_file(|file| file.read_at(buf, offset))?
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenantid, &self.timelineid])
.add(size as i64);
}
result
}
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
self.with_file(|file| file.write_at(buf, offset))?
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenantid, &self.timelineid])
.add(size as i64);
}
result
}
}

View File

@@ -7,8 +7,8 @@
use crate::config::PageServerConf;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::tenant_threads;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::walingest::WalIngest;
use anyhow::{bail, Context, Error, Result};
use lazy_static::lazy_static;
@@ -19,12 +19,9 @@ use postgres_types::PgLsn;
use std::cell::Cell;
use std::collections::HashMap;
use std::str::FromStr;
use std::thread;
use std::thread::JoinHandle;
use std::thread_local;
use std::time::SystemTime;
use tokio::pin;
use tokio::sync::oneshot;
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_stream::StreamExt;
@@ -38,13 +35,10 @@ use zenith_utils::zid::ZTimelineId;
//
struct WalReceiverEntry {
wal_producer_connstr: String,
wal_receiver_handle: Option<JoinHandle<()>>,
wal_receiver_interrupt_sender: Option<oneshot::Sender<()>>,
tenantid: ZTenantId,
}
lazy_static! {
static ref WAL_RECEIVERS: Mutex<HashMap<ZTimelineId, WalReceiverEntry>> =
static ref WAL_RECEIVERS: Mutex<HashMap<(ZTenantId, ZTimelineId), WalReceiverEntry>> =
Mutex::new(HashMap::new());
}
@@ -55,97 +49,54 @@ thread_local! {
pub(crate) static IS_WAL_RECEIVER: Cell<bool> = Cell::new(false);
}
// Wait for walreceiver to stop
// Now it stops when pageserver shutdown is requested.
// In future we can make this more granular and send shutdown signals
// per tenant/timeline to cancel inactive walreceivers.
// TODO deal with blocking pg connections
pub fn stop_wal_receiver(timelineid: ZTimelineId) {
fn drop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
let mut receivers = WAL_RECEIVERS.lock();
if let Some(r) = receivers.get_mut(&timelineid) {
match r.wal_receiver_interrupt_sender.take() {
Some(s) => {
if s.send(()).is_err() {
warn!("wal receiver interrupt signal already sent");
}
}
None => {
warn!("wal_receiver_interrupt_sender is missing, wal recever shouldn't be running")
}
}
info!("waiting for wal receiver to stop");
let handle = r.wal_receiver_handle.take();
// do not hold the lock while joining the handle (deadlock is possible otherwise)
drop(receivers);
// there is no timeout or try_join option available so in case of a bug this can hang forever
handle.map(JoinHandle::join);
}
}
pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) {
let mut receivers = WAL_RECEIVERS.lock();
receivers.remove(&timelineid);
// Check if it was the last walreceiver of the tenant.
// TODO now we store one WalReceiverEntry per timeline,
// so this iterator looks a bit strange.
for (_timelineid, entry) in receivers.iter() {
if entry.tenantid == tenantid {
return;
}
}
// When last walreceiver of the tenant is gone, change state to Idle
tenant_mgr::set_tenant_state(tenantid, TenantState::Idle).unwrap();
receivers.remove(&(tenantid, timelineid));
}
// Launch a new WAL receiver, or tell one that's running about change in connection string
pub fn launch_wal_receiver(
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
) {
) -> Result<()> {
let mut receivers = WAL_RECEIVERS.lock();
match receivers.get_mut(&timelineid) {
match receivers.get_mut(&(tenantid, timelineid)) {
Some(receiver) => {
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
None => {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let wal_receiver_handle = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
thread_mgr::spawn(
ThreadKind::WalReceiver,
Some(tenantid),
Some(timelineid),
"WAL receiver thread",
move || {
IS_WAL_RECEIVER.with(|c| c.set(true));
thread_main(conf, timelineid, tenantid, rx);
})
.unwrap();
thread_main(conf, tenantid, timelineid)
},
)?;
let receiver = WalReceiverEntry {
wal_producer_connstr: wal_producer_connstr.into(),
wal_receiver_handle: Some(wal_receiver_handle),
wal_receiver_interrupt_sender: Some(tx),
tenantid,
};
receivers.insert(timelineid, receiver);
receivers.insert((tenantid, timelineid), receiver);
// Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::set_tenant_state(tenantid, TenantState::Active).unwrap();
tenant_threads::start_tenant_threads(conf, tenantid);
tenant_mgr::activate_tenant(conf, tenantid)?;
}
};
Ok(())
}
// Look up current WAL producer connection string in the hash table
fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String {
let receivers = WAL_RECEIVERS.lock();
receivers
.get(&timelineid)
.get(&(tenantid, timelineid))
.unwrap()
.wal_producer_connstr
.clone()
@@ -156,25 +107,18 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
//
fn thread_main(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
interrupt_receiver: oneshot::Receiver<()>,
) {
timelineid: ZTimelineId,
) -> Result<()> {
let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered();
info!("WAL receiver thread started");
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(timelineid);
let wal_producer_connstr = get_wal_producer_connstr(tenantid, timelineid);
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it.
let res = walreceiver_main(
conf,
tenantid,
timelineid,
&wal_producer_connstr,
interrupt_receiver,
);
let res = walreceiver_main(conf, tenantid, timelineid, &wal_producer_connstr);
// TODO cleanup info messages
if let Err(e) = res {
@@ -188,7 +132,8 @@ fn thread_main(
// Drop it from list of active WAL_RECEIVERS
// so that next callmemaybe request launched a new thread
drop_wal_receiver(timelineid, tenantid);
drop_wal_receiver(tenantid, timelineid);
Ok(())
}
fn walreceiver_main(
@@ -196,7 +141,6 @@ fn walreceiver_main(
tenantid: ZTenantId,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
mut interrupt_receiver: oneshot::Receiver<()>,
) -> Result<(), Error> {
// Connect to the database in replication mode.
info!("connecting to {:?}", wal_producer_connstr);
@@ -273,12 +217,15 @@ fn walreceiver_main(
let mut walingest = WalIngest::new(&*timeline, startpoint)?;
while let Some(replication_message) = runtime.block_on(async {
let shutdown_watcher = thread_mgr::shutdown_watcher();
tokio::select! {
replication_message = physical_stream.next() => replication_message,
_ = &mut interrupt_receiver => {
// check for shutdown first
biased;
_ = shutdown_watcher => {
info!("walreceiver interrupted");
None
}
replication_message = physical_stream.next() => replication_message,
}
}) {
let replication_message = replication_message?;

View File

@@ -13,6 +13,8 @@ lazy_static = "1.4.0"
md5 = "0.7.0"
rand = "0.8.3"
hex = "0.4.3"
hyper = "0.14"
routerify = "2"
parking_lot = "0.11.2"
serde = "1"
serde_json = "1"
@@ -23,3 +25,4 @@ rustls = "0.19.1"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
zenith_utils = { path = "../zenith_utils" }
zenith_metrics = { path = "../zenith_metrics" }

15
proxy/src/http.rs Normal file
View File

@@ -0,0 +1,15 @@
use hyper::{Body, Request, Response, StatusCode};
use routerify::RouterBuilder;
use zenith_utils::http::endpoint;
use zenith_utils::http::error::ApiError;
use zenith_utils::http::json::json_response;
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?)
}
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
router.get("/v1/status", status_handler)
}

View File

@@ -9,15 +9,18 @@ use anyhow::bail;
use clap::{App, Arg};
use state::{ProxyConfig, ProxyState};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::{tcp_listener, GIT_VERSION};
mod cplane_api;
mod http;
mod mgmt;
mod proxy;
mod state;
mod waiters;
fn main() -> anyhow::Result<()> {
zenith_metrics::set_common_metrics_prefix("zenith_proxy");
let arg_matches = App::new("Zenith proxy/router")
.version(GIT_VERSION)
.arg(
@@ -36,6 +39,14 @@ fn main() -> anyhow::Result<()> {
.help("listen for management callback connection on ip:port")
.default_value("127.0.0.1:7000"),
)
.arg(
Arg::with_name("http")
.short("h")
.long("http")
.takes_value(true)
.help("listen for incoming http connections (metrics, etc) on ip:port")
.default_value("127.0.0.1:7001"),
)
.arg(
Arg::with_name("uri")
.short("u")
@@ -82,6 +93,7 @@ fn main() -> anyhow::Result<()> {
let config = ProxyConfig {
proxy_address: arg_matches.value_of("proxy").unwrap().parse()?,
mgmt_address: arg_matches.value_of("mgmt").unwrap().parse()?,
http_address: arg_matches.value_of("http").unwrap().parse()?,
redirect_uri: arg_matches.value_of("uri").unwrap().parse()?,
auth_endpoint: arg_matches.value_of("auth-endpoint").unwrap().parse()?,
ssl_config,
@@ -91,6 +103,9 @@ fn main() -> anyhow::Result<()> {
println!("Version: {}", GIT_VERSION);
// Check that we can bind to address before further initialization
println!("Starting http on {}", state.conf.http_address);
let http_listener = tcp_listener::bind(state.conf.http_address)?;
println!("Starting proxy on {}", state.conf.proxy_address);
let pageserver_listener = tcp_listener::bind(state.conf.proxy_address)?;
@@ -98,6 +113,16 @@ fn main() -> anyhow::Result<()> {
let mgmt_listener = tcp_listener::bind(state.conf.mgmt_address)?;
let threads = [
thread::Builder::new()
.name("Http thread".into())
.spawn(move || {
let router = http::make_router();
endpoint::serve_thread_main(
router,
http_listener,
std::future::pending(), // never shut down
)
})?,
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
thread::Builder::new()

View File

@@ -10,6 +10,7 @@ use std::collections::HashMap;
use std::net::{SocketAddr, TcpStream};
use std::{io, thread};
use tokio_postgres::NoTls;
use zenith_metrics::{new_common_metric_name, register_int_counter, IntCounter};
use zenith_utils::postgres_backend::{self, PostgresBackend, ProtoState, Stream};
use zenith_utils::pq_proto::{BeMessage as Be, FeMessage as Fe, *};
use zenith_utils::sock_split::{ReadStream, WriteStream};
@@ -33,6 +34,24 @@ impl CancelClosure {
lazy_static! {
// Enables serving CancelRequests
static ref CANCEL_MAP: Mutex<HashMap<CancelKeyData, CancelClosure>> = Mutex::new(HashMap::new());
// Metrics
static ref NUM_CONNECTIONS_ACCEPTED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_accepted"),
"Number of TCP client connections accepted."
).unwrap();
static ref NUM_CONNECTIONS_CLOSED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_closed"),
"Number of TCP client connections closed."
).unwrap();
static ref NUM_CONNECTIONS_FAILED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_failed"),
"Number of TCP client connections that closed due to error."
).unwrap();
static ref NUM_BYTES_PROXIED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_bytes_proxied"),
"Number of bytes sent/received between any client and backend."
).unwrap();
}
thread_local! {
@@ -52,6 +71,7 @@ pub fn thread_main(
loop {
let (socket, peer_addr) = listener.accept()?;
println!("accepted connection from {}", peer_addr);
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
socket.set_nodelay(true).unwrap();
// TODO Use a threadpool instead. Maybe use tokio's threadpool by
@@ -61,10 +81,12 @@ pub fn thread_main(
.name("Proxy thread".into())
.spawn(move || {
if let Err(err) = proxy_conn_main(state, socket) {
NUM_CONNECTIONS_FAILED_COUNTER.inc();
println!("error: {}", err);
}
// Clean up CANCEL_MAP.
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
THREAD_CANCEL_KEY_DATA.with(|cell| {
if let Some(cancel_key_data) = cell.get() {
CANCEL_MAP.lock().remove(&cancel_key_data);
@@ -164,14 +186,24 @@ impl ProxyConnection {
fn handle_startup(&mut self) -> anyhow::Result<Option<(String, String)>> {
let have_tls = self.pgb.tls_config.is_some();
let mut encrypted = false;
let mut received_something = false;
loop {
let msg = match self.pgb.read_message()? {
Some(Fe::StartupPacket(msg)) => msg,
None => bail!("connection is lost"),
None => {
if received_something {
bail!("connection is lost");
} else {
// Probably load balancer health check
// TODO check peer_addr to make sure.
return Ok(None);
}
}
bad => bail!("unexpected message type: {:?}", bad),
};
println!("got message: {:?}", msg);
received_something = true;
match msg {
FeStartupPacket::GssEncRequest => {
@@ -339,6 +371,7 @@ fn proxy(
// so we can afford to lose `res` in case `flush` fails
let res = self.0.write(buf);
if res.is_ok() {
NUM_BYTES_PROXIED_COUNTER.inc_by(buf.len() as u64);
self.flush()?;
}
res

View File

@@ -10,8 +10,12 @@ pub struct ProxyConfig {
/// main entrypoint for users to connect to
pub proxy_address: SocketAddr,
/// http management endpoint. Upon user account creation control plane
/// internally used for status and prometheus metrics
pub http_address: SocketAddr,
/// management endpoint. Upon user account creation control plane
/// will notify us here, so that we can 'unfreeze' user session.
/// TODO It uses postgres protocol over TCP but should be migrated to http.
pub mgmt_address: SocketAddr,
/// send unauthenticated users to this URI

View File

@@ -47,6 +47,9 @@ Useful environment variables:
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
`ZENITH_PAGESERVER_OVERRIDES`: add a `;`-separated set of configs that will be passed as
`--pageserver-config-override=${value}` parameter values when zenith cli is invoked
`RUST_LOG`: logging configuration to pass into Zenith CLI
Let stdout, stderr and `INFO` log messages go to the terminal instead of capturing them:
`pytest -s --log-cli-level=INFO ...`

View File

@@ -5,7 +5,7 @@ import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.zenith_fixtures import ZenithEnvBuilder
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -13,10 +13,18 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Create a couple of branches off the main branch, at a historical point in time.
#
def test_branch_behind(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
# Use safekeeper in this test to avoid a subtle race condition.
# Without safekeeper, walreceiver reconnection can stuck
# because of IO deadlock.
#
# See https://github.com/zenithdb/zenith/issues/1068
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
# Branch at the point where only 100 rows were inserted
env.zenith_cli(["branch", "test_branch_behind", "empty"])
env.zenith_cli(["branch", "test_branch_behind", "main"])
pgmain = env.postgres.create_start('test_branch_behind')
log.info("postgres is running on 'test_branch_behind' branch")

View File

@@ -1,5 +1,5 @@
import json
from uuid import uuid4
from uuid import uuid4, UUID
import pytest
import psycopg2
import requests
@@ -96,6 +96,15 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: str):
client.tenant_create(tenant_id)
assert tenant_id.hex in {t['id'] for t in client.tenant_list()}
# check its timelines
timelines = client.timeline_list(tenant_id)
assert len(timelines) > 0
for timeline_id_str in timelines:
timeline_details = client.timeline_details(tenant_id.hex, timeline_id_str)
assert timeline_details['type'] == 'Local'
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str
# create branch
branch_name = uuid4().hex
client.branch_create(tenant_id, branch_name, "main")

View File

@@ -0,0 +1,88 @@
# It's possible to run any regular test with the local fs remote storage via
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" pipenv ......
import tempfile, time, shutil, os
from contextlib import closing
from pathlib import Path
from fixtures.zenith_fixtures import ZenithEnvBuilder, LocalFsStorage, check_restored_datadir_content
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Tests that a piece of data is backed up and restored correctly:
#
# 1. Initial pageserver
# * starts a pageserver with remote storage, stores specific data in its tables
# * triggers a checkpoint (which produces a local data scheduled for backup), gets the corresponding timeline id
# * polls the timeline status to ensure it's copied remotely
# * stops the pageserver, clears all local directories
#
# 2. Second pageserver
# * starts another pageserver, connected to the same remote storage
# * same timeline id is queried for status, triggering timeline's download
# * timeline status is polled until it's downloaded
# * queries the specific data, ensuring that it matches the one stored before
#
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.rust_log_override = 'debug'
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage()
data_id = 1
data_secret = 'very secret secret'
##### First start, insert secret data and upload it to the remote storage
env = zenith_env_builder.init()
pg = env.postgres.create_start()
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t1(id int primary key, secret text);
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
''')
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant_id} {timeline_id}")
log.info("waiting for upload") # TODO api to check if upload is done
time.sleep(2)
##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / 'tenants'
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
log.info("waiting for timeline redownload")
client = env.pageserver.http_client()
attempts = 0
while True:
timeline_details = client.timeline_details(tenant_id, timeline_id)
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['type'] == 'Local':
log.info("timeline downloaded, checking its data")
break
attempts += 1
if attempts > 10:
raise Exception("timeline redownload failed")
log.debug("still waiting")
time.sleep(1)
pg = env.postgres.create_start()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'SELECT secret FROM t1 WHERE id = {data_id};')
assert cur.fetchone() == (data_secret, )

View File

@@ -25,7 +25,7 @@ from dataclasses import dataclass
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union
from typing_extensions import Literal
import pytest
@@ -342,10 +342,14 @@ class ZenithEnvBuilder:
def __init__(self,
repo_dir: Path,
port_distributor: PortDistributor,
pageserver_remote_storage: Optional[RemoteStorage] = None,
num_safekeepers: int = 0,
pageserver_auth_enabled: bool = False):
pageserver_auth_enabled: bool = False,
rust_log_override: Optional[str] = None):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.pageserver_remote_storage = pageserver_remote_storage
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.env: Optional[ZenithEnv] = None
@@ -356,6 +360,11 @@ class ZenithEnvBuilder:
self.env = ZenithEnv(self)
return self.env
def enable_local_fs_remote_storage(self):
assert self.pageserver_remote_storage is None, "remote storage is enabled already"
self.pageserver_remote_storage = LocalFsStorage(
Path(self.repo_dir / 'local_fs_remote_storage'))
def __enter__(self):
return self
@@ -404,6 +413,7 @@ class ZenithEnv:
"""
def __init__(self, config: ZenithEnvBuilder):
self.repo_dir = config.repo_dir
self.rust_log_override = config.rust_log_override
self.port_distributor = config.port_distributor
self.postgres = PostgresFactory(self)
@@ -434,7 +444,9 @@ auth_type = '{pageserver_auth_type}'
"""
# Create a corresponding ZenithPageserver object
self.pageserver = ZenithPageserver(self, port=pageserver_port)
self.pageserver = ZenithPageserver(self,
port=pageserver_port,
remote_storage=config.pageserver_remote_storage)
# Create config and a Safekeeper object for each safekeeper
for i in range(1, config.num_safekeepers + 1):
@@ -465,6 +477,8 @@ sync = false # Disable fsyncs to make the tests go faster
tmp.flush()
cmd = ['init', f'--config={tmp.name}']
append_pageserver_param_overrides(cmd, config.pageserver_remote_storage)
self.zenith_cli(cmd)
# Start up the page server and all the safekeepers
@@ -509,6 +523,9 @@ sync = false # Disable fsyncs to make the tests go faster
env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
if self.rust_log_override is not None:
env_vars['RUST_LOG'] = self.rust_log_override
# Pass coverage settings
var = 'LLVM_PROFILE_FILE'
val = os.environ.get(var)
@@ -665,6 +682,20 @@ class ZenithPageserverHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def timeline_list(self, tenant_id: uuid.UUID) -> List[str]:
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def timeline_details(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]:
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
@@ -677,17 +708,38 @@ class PageserverPort:
http: int
@dataclass
class LocalFsStorage:
root: Path
@dataclass
class S3Storage:
bucket: str
region: str
access_key: str
secret_key: str
RemoteStorage = Union[LocalFsStorage, S3Storage]
class ZenithPageserver(PgProtocol):
"""
An object representing a running pageserver.
Initializes the repository via `zenith init`.
"""
def __init__(self, env: ZenithEnv, port: PageserverPort, enable_auth=False):
def __init__(self,
env: ZenithEnv,
port: PageserverPort,
remote_storage: Optional[RemoteStorage] = None,
enable_auth=False):
super().__init__(host='localhost', port=port.pg)
self.env = env
self.running = False
self.service_port = port # do not shadow PgProtocol.port which is just int
self.remote_storage = remote_storage
def start(self) -> 'ZenithPageserver':
"""
@@ -696,7 +748,10 @@ class ZenithPageserver(PgProtocol):
"""
assert self.running == False
self.env.zenith_cli(['pageserver', 'start'])
start_args = ['pageserver', 'start']
append_pageserver_param_overrides(start_args, self.remote_storage)
self.env.zenith_cli(start_args)
self.running = True
return self
@@ -729,6 +784,28 @@ class ZenithPageserver(PgProtocol):
)
def append_pageserver_param_overrides(params_to_update: List[str],
pageserver_remote_storage: Optional[RemoteStorage]):
if pageserver_remote_storage is not None:
if isinstance(pageserver_remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'"
elif isinstance(pageserver_remote_storage, S3Storage):
pageserver_storage_override = f"bucket_name='{pageserver_remote_storage.bucket}',\
bucket_region='{pageserver_remote_storage.region}',access_key_id='{pageserver_remote_storage.access_key}',\
secret_access_key='{pageserver_remote_storage.secret_key}'"
else:
raise Exception(f'Unknown storage configuration {pageserver_remote_storage}')
params_to_update.append(
f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}')
env_overrides = os.getenv('ZENITH_PAGESERVER_OVERRIDES')
if env_overrides is not None:
params_to_update += [
f'--pageserver-config-override={o.strip()}' for o in env_overrides.split(';')
]
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str):

View File

@@ -10,6 +10,7 @@ use std::fs::File;
use std::path::{Path, PathBuf};
use std::thread;
use tracing::*;
use walkeeper::timeline::{CreateControlFile, FileStorage};
use zenith_utils::http::endpoint;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
@@ -86,8 +87,21 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Do not wait for changes to be written safely to disk"),
)
.arg(
Arg::with_name("dump-control-file")
.long("dump-control-file")
.takes_value(true)
.help("Dump control file at path specifed by this argument and exit"),
)
.get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") {
let state = FileStorage::load_control_file(Path::new(addr), CreateControlFile::False)?;
let json = serde_json::to_string(&state)?;
print!("{}", json);
return Ok(());
}
let mut conf: SafeKeeperConf = Default::default();
if let Some(dir) = arg_matches.value_of("datadir") {
@@ -183,7 +197,12 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.spawn(|| {
// TODO authentication
let router = http::make_router(conf_);
endpoint::serve_thread_main(router, http_listener).unwrap();
endpoint::serve_thread_main(
router,
http_listener,
std::future::pending(), // never shut down
)
.unwrap();
})?,
);

View File

@@ -62,7 +62,7 @@ impl Default for SafeKeeperConf {
daemonize: false,
no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
}

View File

@@ -30,7 +30,7 @@ use zenith_utils::pq_proto::SystemId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 2;
pub const SK_FORMAT_VERSION: u32 = 3;
const SK_PROTOCOL_VERSION: u32 = 1;
const UNKNOWN_SERVER_VERSION: u32 = 0;
@@ -133,9 +133,11 @@ pub struct ServerInfo {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
/// Zenith timelineid
pub ztli: ZTimelineId,
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub wal_seg_size: u32,
}
@@ -149,6 +151,7 @@ pub struct SafeKeeperState {
pub server: ServerInfo,
/// Unique id of the last *elected* proposer we dealed with. Not needed
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// part of WAL acknowledged by quorum and available locally
pub commit_lsn: Lsn,
@@ -171,7 +174,7 @@ impl SafeKeeperState {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
tenant_id: ZTenantId::from([0u8; 16]),
ztli: ZTimelineId::from([0u8; 16]),
timeline_id: ZTimelineId::from([0u8; 16]),
wal_seg_size: 0,
},
proposer_uuid: [0; 16],
@@ -560,13 +563,13 @@ where
// set basic info about server, if not yet
self.s.server.system_id = msg.system_id;
self.s.server.tenant_id = msg.tenant_id;
self.s.server.ztli = msg.ztli;
self.s.server.timeline_id = msg.ztli;
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage
.persist(&self.s)
.with_context(|| "failed to persist shared state")?;
self.metrics = SafeKeeperMetrics::new(self.s.server.ztli);
self.metrics = SafeKeeperMetrics::new(self.s.server.timeline_id);
info!(
"processed greeting from proposer {:?}, sending term {:?}",

View File

@@ -79,7 +79,7 @@ struct ReplicationConnGuard {
impl Drop for ReplicationConnGuard {
fn drop(&mut self) {
self.timeline.update_replica_state(self.replica, None);
self.timeline.remove_replica(self.replica);
}
}
@@ -120,14 +120,12 @@ impl ReplicationConn {
/// This is spawned into the background by `handle_start_replication`.
fn background_thread(
mut stream_in: ReadStream,
timeline: Arc<Timeline>,
replica_id: usize,
replica_guard: Arc<ReplicationConnGuard>,
) -> Result<()> {
let replica_id = replica_guard.replica;
let timeline = &replica_guard.timeline;
let mut state = ReplicaState::new();
let _guard = ReplicationConnGuard {
replica: replica_id,
timeline: timeline.clone(),
};
// Wait for replica's feedback.
while let Some(msg) = FeMessage::read(&mut stream_in)? {
match &msg {
@@ -140,7 +138,7 @@ impl ReplicationConn {
// Note: deserializing is on m[1..] because we skip the tag byte.
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
timeline.update_replica_state(replica_id, Some(state));
timeline.update_replica_state(replica_id, state);
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
@@ -148,7 +146,7 @@ impl ReplicationConn {
state.last_received_lsn = reply.write_lsn;
state.disk_consistent_lsn = reply.flush_lsn;
state.remote_consistent_lsn = reply.apply_lsn;
timeline.update_replica_state(replica_id, Some(state));
timeline.update_replica_state(replica_id, state);
}
_ => warn!("unexpected message {:?}", msg),
}
@@ -207,16 +205,23 @@ impl ReplicationConn {
// This replica_id is used below to check if it's time to stop replication.
let replica_id = bg_timeline.add_replica(state);
// Use a guard object to remove our entry from the timeline, when the background
// thread and us have both finished using it.
let replica_guard = Arc::new(ReplicationConnGuard {
replica: replica_id,
timeline: bg_timeline,
});
let bg_replica_guard = Arc::clone(&replica_guard);
// TODO: here we got two threads, one for writing WAL and one for receiving
// feedback. If one of them fails, we should shutdown the other one too.
let _ = thread::Builder::new()
.name("HotStandbyFeedback thread".into())
.spawn(move || {
if let Err(err) = Self::background_thread(bg_stream_in, bg_timeline, replica_id) {
if let Err(err) = Self::background_thread(bg_stream_in, bg_replica_guard) {
error!("Replication background thread failed: {}", err);
}
})
.unwrap();
})?;
let mut wal_seg_size: usize;
loop {

View File

@@ -9,7 +9,7 @@ use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use tracing::*;
@@ -121,7 +121,7 @@ impl SharedState {
}
/// Assign new replica ID. We choose first empty cell in the replicas vector
/// or extend the vector if there are not free items.
/// or extend the vector if there are no free slots.
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
self.replicas[pos] = Some(state);
@@ -136,13 +136,14 @@ impl SharedState {
/// If create=false and file doesn't exist, bails out.
fn create_restore(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let (file_storage, state) = FileStorage::load_from_control_file(conf, timelineid, create)
let state = FileStorage::load_control_file_conf(conf, timeline_id, create)
.with_context(|| "failed to load from control file")?;
let file_storage = FileStorage::new(timeline_id, conf);
let flush_lsn = if state.server.wal_seg_size != 0 {
let wal_dir = conf.timeline_dir(&timelineid);
let wal_dir = conf.timeline_dir(&timeline_id);
find_end_of_wal(
&wal_dir,
state.server.wal_seg_size as usize,
@@ -297,9 +298,15 @@ impl Timeline {
shared_state.add_replica(state)
}
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
pub fn update_replica_state(&self, id: usize, state: ReplicaState) {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.replicas[id] = state;
shared_state.replicas[id] = Some(state);
}
pub fn remove_replica(&self, id: usize) {
let mut shared_state = self.mutex.lock().unwrap();
assert!(shared_state.replicas[id].is_some());
shared_state.replicas[id] = None;
}
pub fn get_end_of_wal(&self) -> Lsn {
@@ -381,7 +388,7 @@ impl GlobalTimelines {
}
#[derive(Debug)]
struct FileStorage {
pub struct FileStorage {
// save timeline dir to avoid reconstructing it every time
timeline_dir: PathBuf,
conf: SafeKeeperConf,
@@ -389,6 +396,17 @@ struct FileStorage {
}
impl FileStorage {
fn new(timeline_id: ZTimelineId, conf: &SafeKeeperConf) -> FileStorage {
let timeline_dir = conf.timeline_dir(&timeline_id);
let timelineid_str = format!("{}", timeline_id);
FileStorage {
timeline_dir,
conf: conf.clone(),
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
}
}
// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
// Read the version independent part
@@ -409,20 +427,24 @@ impl FileStorage {
upgrade_control_file(buf, version)
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_from_control_file(
fn load_control_file_conf(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
let timeline_dir = conf.timeline_dir(&timelineid);
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
) -> Result<SafeKeeperState> {
let path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
Self::load_control_file(path, create)
}
/// Read in the control file.
/// If create=false and file doesn't exist, bails out.
pub fn load_control_file<P: AsRef<Path>>(
control_file_path: P,
create: CreateControlFile,
) -> Result<SafeKeeperState> {
info!(
"loading control file {}, create={:?}",
control_file_path.display(),
control_file_path.as_ref().display(),
create,
);
@@ -434,7 +456,7 @@ impl FileStorage {
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.display(),
control_file_path.as_ref().display(),
)
})?;
@@ -465,21 +487,15 @@ impl FileStorage {
);
FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context(
|| format!("while reading control file {}", control_file_path.display(),),
|| {
format!(
"while reading control file {}",
control_file_path.as_ref().display(),
)
},
)?
};
let timelineid_str = format!("{}", timelineid);
Ok((
FileStorage {
timeline_dir,
conf: conf.clone(),
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
},
state,
))
Ok(state)
}
}
@@ -549,7 +565,7 @@ impl Storage for FileStorage {
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.ztli;
let ztli = server.timeline_id;
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
@@ -637,7 +653,7 @@ impl Storage for FileStorage {
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.ztli;
let ztli = server.timeline_id;
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
@@ -737,7 +753,10 @@ mod test {
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
.expect("failed to create timeline dir");
FileStorage::load_from_control_file(conf, timeline_id, create)
Ok((
FileStorage::new(timeline_id, conf),
FileStorage::load_control_file_conf(conf, timeline_id, create)?,
))
}
#[test]

View File

@@ -5,7 +5,12 @@ use crate::safekeeper::{
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use tracing::*;
use zenith_utils::{bin_ser::LeSer, lsn::Lsn};
use zenith_utils::{
bin_ser::LeSer,
lsn::Lsn,
pq_proto::SystemId,
zid::{ZTenantId, ZTimelineId},
};
/// Persistent consensus state of the acceptor.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -35,6 +40,36 @@ struct SafeKeeperStateV1 {
wal_start_lsn: Lsn,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ServerInfoV2 {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub tenant_id: ZTenantId,
/// Zenith timelineid
pub ztli: ZTimelineId,
pub wal_seg_size: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafeKeeperStateV2 {
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
pub server: ServerInfoV2,
/// Unique id of the last *elected* proposer we dealed with. Not needed
/// for correctness, exists for monitoring purposes.
pub proposer_uuid: PgUuid,
/// part of WAL acknowledged by quorum and available locally
pub commit_lsn: Lsn,
/// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone)
pub truncate_lsn: Lsn,
// Safekeeper starts receiving WAL from this LSN, zeros before it ought to
// be skipped during decoding.
pub wal_start_lsn: Lsn,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
// migrate to storing full term history
if version == 1 {
@@ -55,6 +90,25 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
truncate_lsn: oldstate.truncate_lsn,
wal_start_lsn: oldstate.wal_start_lsn,
});
// migrate to hexing some zids
} else if version == 2 {
info!("reading safekeeper control file version {}", version);
let oldstate = SafeKeeperStateV2::des(&buf[..buf.len()])?;
let server = ServerInfo {
pg_version: oldstate.server.pg_version,
system_id: oldstate.server.system_id,
tenant_id: oldstate.server.tenant_id,
timeline_id: oldstate.server.ztli,
wal_seg_size: oldstate.server.wal_seg_size,
};
return Ok(SafeKeeperState {
acceptor_state: oldstate.acceptor_state,
server,
proposer_uuid: oldstate.proposer_uuid,
commit_lsn: oldstate.commit_lsn,
truncate_lsn: oldstate.truncate_lsn,
wal_start_lsn: oldstate.wal_start_lsn,
});
}
bail!("unsupported safekeeper control file version {}", version)
}

View File

@@ -102,12 +102,21 @@ fn main() -> Result<()> {
.required(false)
.value_name("stop-mode");
let pageserver_config_args = Arg::with_name("pageserver-config-override")
.long("pageserver-config-override")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
.required(false);
let matches = App::new("Zenith CLI")
.setting(AppSettings::ArgRequiredElseHelp)
.version(GIT_VERSION)
.subcommand(
SubCommand::with_name("init")
.about("Initialize a new Zenith repository")
.arg(pageserver_config_args.clone())
.arg(
Arg::with_name("config")
.long("config")
@@ -133,10 +142,10 @@ fn main() -> Result<()> {
.setting(AppSettings::ArgRequiredElseHelp)
.about("Manage pageserver")
.subcommand(SubCommand::with_name("status"))
.subcommand(SubCommand::with_name("start").about("Start local pageserver"))
.subcommand(SubCommand::with_name("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver")
.arg(stop_mode_arg.clone()))
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver"))
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver").arg(pageserver_config_args))
)
.subcommand(
SubCommand::with_name("safekeeper")
@@ -403,6 +412,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
if let Err(e) = pageserver.init(
// default_tenantid was generated by the `env.init()` call above
Some(&env.default_tenantid.unwrap().to_string()),
&pageserver_config_overrides(init_match),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);
@@ -411,6 +421,14 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
Ok(())
}
fn pageserver_config_overrides<'a>(init_match: &'a ArgMatches) -> Vec<&'a str> {
init_match
.values_of("pageserver-config-override")
.into_iter()
.flatten()
.collect()
}
fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
match tenant_match.subcommand() {
@@ -572,8 +590,8 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
let pageserver = PageServerNode::from_env(env);
match sub_match.subcommand() {
("start", Some(_sub_m)) => {
if let Err(e) = pageserver.start() {
("start", Some(start_match)) => {
if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) {
eprintln!("pageserver start failed: {}", e);
exit(1);
}
@@ -588,22 +606,20 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
}
("restart", Some(_sub_m)) => {
("restart", Some(restart_match)) => {
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver.start() {
if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) {
eprintln!("pageserver start failed: {}", e);
exit(1);
}
}
(sub_name, _) => {
bail!("Unexpected pageserver subcommand '{}'", sub_name)
}
(sub_name, _) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
}
Ok(())
}
@@ -662,12 +678,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(())
}
fn handle_start_all(_sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
// Postgres nodes are not started automatically
if let Err(e) = pageserver.start() {
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
eprintln!("pageserver start failed: {}", e);
exit(1);
}

View File

@@ -8,22 +8,15 @@ use lazy_static::lazy_static;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use std::net::TcpListener;
use tracing::info;
use zenith_metrics::{new_common_metric_name, register_int_counter, IntCounter};
use zenith_metrics::{Encoder, TextEncoder};
use std::sync::Mutex;
use tokio::sync::oneshot::Sender;
use std::future::Future;
use std::net::TcpListener;
use super::error::ApiError;
lazy_static! {
/// Channel used to send shutdown signal - wrapped in an Option to allow
/// it to be taken by value (since oneshot channels consume themselves on send)
static ref SHUTDOWN_SENDER: Mutex<Option<Sender<()>>> = Mutex::new(None);
}
lazy_static! {
static ref SERVE_METRICS_COUNT: IntCounter = register_int_counter!(
new_common_metric_name("serve_metrics_count"),
@@ -153,17 +146,20 @@ pub fn check_permission(req: &Request<Body>, tenantid: Option<ZTenantId>) -> Res
}
}
/// Initiate graceful shutdown of the http endpoint
pub fn shutdown() {
if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() {
let _ = tx.send(());
}
}
pub fn serve_thread_main(
///
/// Start listening for HTTP requests on given socket.
///
/// 'shutdown_future' can be used to stop. If the Future becomes
/// ready, we stop listening for new requests, and the function returns.
///
pub fn serve_thread_main<S>(
router_builder: RouterBuilder<hyper::Body, ApiError>,
listener: TcpListener,
) -> anyhow::Result<()> {
shutdown_future: S,
) -> anyhow::Result<()>
where
S: Future<Output = ()> + Send + Sync,
{
info!("Starting a http endpoint at {}", listener.local_addr()?);
// Create a Service from the router above to handle incoming requests.
@@ -176,14 +172,9 @@ pub fn serve_thread_main(
let _guard = runtime.enter();
let (send, recv) = tokio::sync::oneshot::channel::<()>();
*SHUTDOWN_SENDER.lock().unwrap() = Some(send);
let server = Server::from_tcp(listener)?
.serve(service)
.with_graceful_shutdown(async {
recv.await.ok();
});
.with_graceful_shutdown(shutdown_future);
runtime.block_on(server)?;

View File

@@ -69,7 +69,7 @@ impl HttpErrorBody {
}
pub async fn handler(err: routerify::RouteError) -> Response<Body> {
tracing::error!("{}", err);
tracing::error!("Error processing HTTP request: {:?}", err);
err.downcast::<ApiError>()
.expect("handler should always return api error")
.into_response()