Compare commits

..

6 Commits

Author SHA1 Message Date
Arseny Sher
e5c0c7dc4d Add tokio-console tracing as optional feature. 2022-11-21 10:45:25 +04:00
Arseny Sher
7ffd740b5f Switch to broadcast channel and parking_lot rwlock. 2022-11-21 10:45:25 +04:00
Arseny Sher
7850fe4fa6 Add readme. 2022-11-21 10:45:25 +04:00
Arseny Sher
7bc953b66a Review round, tests. 2022-11-19 17:16:07 +04:00
Arseny Sher
3aad65eb3f Add neon_broker binary.
Which ought to replace neon_broker. This patch only adds the binary and adjusts
Dockerfile to include it; subsequent ones will add deploy of helm chart and the
actual replacement.

It is a simple and fast pub-sub message bus. In this patch only safekeeper
message is supported, but others can be easily added.

Compilation now requires protoc to be installed. Installing protobuf-compiler
package is fine for Debian/Ubuntu.

ref
https://github.com/neondatabase/neon/pull/2733
https://github.com/neondatabase/neon/issues/2394
2022-11-18 21:48:29 +04:00
Alexander Bayandin
03190a2161 GitHub Actions: Do not create Allure report for cancelled jobs (#2813)
If a workflow is cancelled, do not delay its finishing by creating an allure
report.
2022-11-15 10:27:59 +00:00
29 changed files with 1466 additions and 786 deletions

View File

@@ -5,6 +5,7 @@
!Cargo.lock
!Makefile
!broker/
!.cargo/
!.config/
!control_plane/

View File

@@ -190,7 +190,7 @@ runs:
prefix: latest
- name: Create Allure report
if: always()
if: success() || failure()
uses: ./.github/actions/allure-report
with:
action: store

View File

@@ -25,6 +25,7 @@ mkdir neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver_binutils neon_install/bin/
docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/
docker cp ${ID}:/usr/local/bin/neon_broker neon_install/bin/
docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/
docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/
docker cp ${ID}:/usr/local/v15/bin/ neon_install/v15/bin/

View File

@@ -265,7 +265,7 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Create Allure report
if: always()
if: success() || failure()
uses: ./.github/actions/allure-report
with:
action: generate

View File

@@ -305,7 +305,7 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ regress-tests, benchmarks ]
if: always()
if: success() || failure()
strategy:
fail-fast: false
matrix:

View File

@@ -48,11 +48,11 @@ jobs:
if: matrix.os == 'ubuntu-latest'
run: |
sudo apt update
sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev libssl-dev
sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev libssl-dev protobuf-compiler
- name: Install macOS postgres dependencies
if: matrix.os == 'macos-latest'
run: brew install flex bison openssl
run: brew install flex bison openssl protobuf
- name: Set pg 14 revision for caching
id: pg_v14_rev

249
Cargo.lock generated
View File

@@ -457,11 +457,26 @@ checksum = "6bf8832993da70a4c6d13c581f4463c2bdda27b9bf1c5498dc4365543abe6d6f"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex 0.3.0",
"once_cell",
"strsim",
"termcolor",
]
[[package]]
name = "clap_derive"
version = "4.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42f169caba89a7d512b5418b09864543eeb4d497416c917d7137863bd2076ad"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
@@ -554,6 +569,42 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "console-api"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
dependencies = [
"prost 0.11.2",
"prost-types 0.11.2",
"tonic 0.8.2",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a3a81dfaf6b66bce5d159eddae701e3a002f194d378cbf7be5f053c281d9be"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures",
"hdrhistogram",
"humantime",
"prost-types 0.11.2",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic 0.8.2",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const_format"
version = "0.2.30"
@@ -1005,11 +1056,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8664f6ea68aba5503d42dd1be786b0f1bd9b7972e7f40208c83ef74db91bf"
dependencies = [
"http",
"prost",
"prost 0.10.4",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tonic 0.7.2",
"tonic-build 0.7.2",
"tower",
"tower-service",
]
@@ -1087,6 +1138,16 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flate2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -1329,6 +1390,19 @@ dependencies = [
"ahash",
]
[[package]]
name = "hdrhistogram"
version = "7.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8"
dependencies = [
"base64",
"byteorder",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "heapless"
version = "0.7.16"
@@ -1890,6 +1964,32 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae"
[[package]]
name = "neon_broker"
version = "0.1.0"
dependencies = [
"async-stream",
"bytes",
"clap 4.0.15",
"console-subscriber",
"futures-core",
"futures-util",
"git-version",
"humantime",
"hyper",
"metrics",
"once_cell",
"parking_lot 0.12.1",
"prost 0.11.2",
"tokio",
"tokio-stream",
"tonic 0.8.2",
"tonic-build 0.8.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "nix"
version = "0.23.1"
@@ -2464,6 +2564,30 @@ dependencies = [
"syn",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -2515,7 +2639,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.10.1",
]
[[package]]
name = "prost"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0841812012b2d4a6145fae9a6af1534873c32aa67fff26bd09f8fa42c83f95a"
dependencies = [
"bytes",
"prost-derive 0.11.2",
]
[[package]]
@@ -2533,13 +2667,35 @@ dependencies = [
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"prost 0.10.4",
"prost-types 0.10.1",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-build"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d8b442418ea0822409d9e7d047cbf1e7e9e1760b172bf9982cf29d517c93511"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost 0.11.2",
"prost-types 0.11.2",
"regex",
"syn",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.10.1"
@@ -2553,6 +2709,19 @@ dependencies = [
"syn",
]
[[package]]
name = "prost-derive"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "164ae68b6587001ca506d3bf7f1000bfa248d0e1217b618108fba4ec1d0cc306"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.10.1"
@@ -2560,7 +2729,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68"
dependencies = [
"bytes",
"prost",
"prost 0.10.4",
]
[[package]]
name = "prost-types"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "747761bc3dc48f9a34553bf65605cf6cb6288ba219f3450b4275dbd81539551a"
dependencies = [
"bytes",
"prost 0.11.2",
]
[[package]]
@@ -3671,10 +3850,12 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"winapi",
]
@@ -3822,8 +4003,40 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"prost 0.10.4",
"prost-derive 0.10.1",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.11.2",
"prost-derive 0.11.2",
"tokio",
"tokio-stream",
"tokio-util",
@@ -3842,7 +4055,20 @@ checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-build 0.10.4",
"quote",
"syn",
]
[[package]]
name = "tonic-build"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build 0.11.2",
"quote",
"syn",
]
@@ -4401,7 +4627,8 @@ dependencies = [
"num-bigint",
"num-integer",
"num-traits",
"prost",
"prost 0.10.4",
"prost 0.11.2",
"rand",
"regex",
"regex-syntax",

View File

@@ -11,6 +11,7 @@ cargo-features = ["named-profiles"]
[workspace]
members = [
"broker",
"compute_tools",
"control_plane",
"pageserver",

View File

@@ -44,7 +44,7 @@ COPY . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin proxy --locked --release \
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin neon_broker --bin proxy --locked --release \
&& cachepot -s
# Build final image
@@ -67,6 +67,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/

View File

@@ -35,12 +35,12 @@ Pageserver consists of:
* On Ubuntu or Debian, this set of packages should be sufficient to build the code:
```bash
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client
libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client protobuf-compiler
```
* On Fedora, these packages are needed:
```bash
dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib
libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib protobuf-compiler
```
2. [Install Rust](https://www.rust-lang.org/tools/install)

42
broker/Cargo.toml Normal file
View File

@@ -0,0 +1,42 @@
[package]
name = "neon_broker"
version = "0.1.0"
edition = "2021"
[features]
bench = []
# for exploring with tokio-console. Note that tokio_unstable cfg must be enabled, i.e.
# RUSTFLAGS="--cfg tokio_unstable" cargo build -r -p neon_broker --features console
console = ["dep:console-subscriber", "tokio/full", "tokio/tracing"]
[[bin]]
name = "neon_broker_bench"
path = "src/bin/bench.rs"
# build benchmarking binary only if explicitly requested with '--features bench'
required-features = ["bench"]
[dependencies]
async-stream = "0.3"
bytes = "1.0"
clap = { version = "4.0", features = ["derive"] }
futures-core = "0.3"
futures-util = "0.3"
git-version = "0.3.5"
humantime = "2.1.0"
hyper = {version = "0.14.14", features = ["full"]}
once_cell = "1.13.0"
parking_lot = "0.12"
prost = "0.11"
tonic = "0.8"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"
tracing = "0.1.27"
console-subscriber = {version = "0.1.8", optional = true }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[build-dependencies]
tonic-build = "0.8"

7
broker/build.rs Normal file
View File

@@ -0,0 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Generate code to deterministic location to make finding it easier.
tonic_build::configure()
.out_dir("proto/") // put generated code to proto/
.compile(&["proto/broker.proto"], &["proto/"])?;
Ok(())
}

2
broker/proto/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# protobuf generated code
neon_broker.rs

44
broker/proto/broker.proto Normal file
View File

@@ -0,0 +1,44 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package neon_broker;
service NeonBroker {
// Subscribe to safekeeper updates.
rpc SubscribeSafekeeperInfo(SubscribeSafekeeperInfoRequest) returns (stream SafekeeperTimelineInfo) {};
// Publish safekeeper updates.
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
}
message SubscribeSafekeeperInfoRequest {
oneof subscription_key {
google.protobuf.Empty all = 1; // subscribe to everything
TenantTimelineId tenant_timeline_id = 2; // subscribe to specific timeline
}
}
message SafekeeperTimelineInfo {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
// Term of the last entry.
uint64 last_log_term = 3;
// LSN of the last record.
uint64 flush_lsn = 4;
// Up to which LSN safekeeper regards its WAL as committed.
uint64 commit_lsn = 5;
// LSN up to which safekeeper has backed WAL.
uint64 backup_lsn = 6;
// LSN of last checkpoint uploaded by pageserver.
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
uint64 local_start_lsn = 9;
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
}
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}

174
broker/src/bin/bench.rs Normal file
View File

@@ -0,0 +1,174 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use clap::Parser;
use neon_broker::neon_broker_proto::subscribe_safekeeper_info_request::SubscriptionKey;
use neon_broker::neon_broker_proto::TenantTimelineId as ProtoTenantTimelineId;
use neon_broker::neon_broker_proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use neon_broker::NeonBrokerClientChannel;
use neon_broker::DEFAULT_LISTEN_ADDR;
use tokio::time::{self};
use tonic::Request;
const ABOUT: &str = r#"
A simple benchmarking tool for neon_broker. Creates specified number of per
timeline publishers and subscribers; each publisher continiously sends
messages, subscribers read them. Each second the tool outputs number of
messages summed across all subscribers and min number of messages
recevied by single subscriber.
For example,
cargo build -r -p neon_broker --features bench && target/release/neon_broker
target/release/neon_broker_bench -s 1 -p 1
"#;
#[derive(Parser, Debug)]
#[clap(author, version, about = ABOUT)]
struct Args {
/// Number of publishers
#[clap(short = 'p', long, value_parser, default_value_t = 1)]
num_pubs: u64,
/// Number of subscribers
#[clap(short = 's', long, value_parser, default_value_t = 1)]
num_subs: u64,
}
async fn progress_reporter(counters: Vec<Arc<AtomicU64>>) {
let mut interval = time::interval(Duration::from_millis(1000));
let mut c_old = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
let mut c_min_old = counters
.iter()
.map(|c| c.load(Ordering::Relaxed))
.min()
.unwrap_or(0);
let mut started_at = None;
let mut skipped: u64 = 0;
loop {
interval.tick().await;
let c_new = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
let c_min_new = counters
.iter()
.map(|c| c.load(Ordering::Relaxed))
.min()
.unwrap_or(0);
if c_new > 0 && started_at.is_none() {
started_at = Some(Instant::now());
skipped = c_new;
}
let avg_rps = started_at.map(|s| {
let dur = s.elapsed();
let dur_secs = dur.as_secs() as f64 + (dur.subsec_millis() as f64) / 1000.0;
let avg_rps = (c_new - skipped) as f64 / dur_secs;
(dur, avg_rps)
});
println!(
"sum rps {}, min rps {} total {}, total min {}, duration, avg sum rps {:?}",
c_new - c_old,
c_min_new - c_min_old,
c_new,
c_min_new,
avg_rps
);
c_old = c_new;
c_min_old = c_min_new;
}
}
fn tli_from_u64(i: u64) -> Vec<u8> {
let mut timeline_id = vec![0xFF; 8];
timeline_id.extend_from_slice(&i.to_be_bytes());
timeline_id
}
async fn subscribe(client: Option<NeonBrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
let mut client = match client {
Some(c) => c,
None => NeonBrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
.await
.unwrap(),
};
// let key = SubscriptionKey::All(());
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(i),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
};
let mut stream = client
.subscribe_safekeeper_info(request)
.await
.unwrap()
.into_inner();
while let Some(_feature) = stream.message().await.unwrap() {
counter.fetch_add(1, Ordering::Relaxed);
}
}
async fn publish(client: Option<NeonBrokerClientChannel>, n_keys: u64) {
let mut client = match client {
Some(c) => c,
None => NeonBrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
.await
.unwrap(),
};
let mut counter: u64 = 0;
// create stream producing new values
let outbound = async_stream::stream! {
loop {
let info = SafekeeperTimelineInfo {
safekeeper_id: 1,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(counter % n_keys),
}),
last_log_term: 0,
flush_lsn: counter,
commit_lsn: 2,
backup_lsn: 3,
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
local_start_lsn: 0,
};
counter += 1;
yield info;
}
};
let response = client.publish_safekeeper_info(Request::new(outbound)).await;
println!("pub response is {:?}", response);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let mut counters = Vec::with_capacity(args.num_subs as usize);
for _ in 0..args.num_subs {
counters.push(Arc::new(AtomicU64::new(0)));
}
let h = tokio::spawn(progress_reporter(counters.clone()));
let c = NeonBrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
.await
.unwrap();
for i in 0..args.num_subs {
let c = Some(c.clone());
// let c = None;
tokio::spawn(subscribe(c, counters[i as usize].clone(), i));
}
for _i in 0..args.num_pubs {
// let c = Some(c.clone());
let c = None;
tokio::spawn(publish(c, args.num_subs as u64));
}
h.await?;
Ok(())
}

View File

@@ -0,0 +1,510 @@
//! Simple pub-sub based on grpc (tonic) and Tokio mpsc for storage nodes
//! messaging. The main design goal is to avoid central synchronization during
//! normal flow, resorting to it only when pub/sub change happens. Each
//! subscriber holds mpsc for messages it sits on; tx end is sent to existing
//! publishers and saved in shared state for new ones. Publishers maintain
//! locally set of subscribers they stream messages to.
//!
//! Subscriptions to 1) single timeline 2) everything are possible. We could add
//! subscription to set of timelines to save grpc streams, but testing shows
//! many individual streams is also ok.
//!
//! Message is dropped if subscriber can't consume it, not affecting other
//! subscribers.
//!
//! Only safekeeper message is supported, but it is not hard to add something
//! else with generics.
use clap::{command, Parser};
use futures_core::Stream;
use futures_util::StreamExt;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, StatusCode};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tonic::codegen::Service;
use tonic::Code;
use tonic::{Request, Response, Status};
use tracing::*;
use metrics::{Encoder, TextEncoder};
use neon_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
use neon_broker::neon_broker_proto::neon_broker_server::{NeonBroker, NeonBrokerServer};
use neon_broker::neon_broker_proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use neon_broker::neon_broker_proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use neon_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR};
use utils::id::TenantTimelineId;
use utils::project_git_version;
project_git_version!(GIT_VERSION);
const DEFAULT_CHAN_SIZE: usize = 256;
const DEFAULT_HTTP2_KEEPALIVE_INTERVAL: &str = "5000ms";
#[derive(Parser, Debug)]
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
struct Args {
/// Endpoint to listen on.
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
listen_addr: SocketAddr,
/// Size of the queue to the subscriber.
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
chan_size: usize,
/// HTTP/2 keepalive interval.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HTTP2_KEEPALIVE_INTERVAL)]
http2_keepalive_interval: Duration,
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
log_format: String,
}
type PubId = u64; // id of publisher for registering in maps
type SubId = u64; // id of subscriber for registering in maps
#[derive(Copy, Clone)]
enum SubscriptionKey {
All,
Timeline(TenantTimelineId),
}
impl SubscriptionKey {
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
match key {
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
}
}
}
}
// Channel to timeline subscribers.
struct ChanToTimelineSub {
chan: broadcast::Sender<SafekeeperTimelineInfo>,
// Tracked separately to know when delete the shmem entry. receiver_count()
// is unhandy for that as unregistering and dropping the receiver side
// happens at different moments.
num_subscribers: u64,
}
struct SharedState {
next_pub_id: PubId,
num_pubs: i64,
next_sub_id: SubId,
num_subs_to_timelines: i64,
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
num_subs_to_all: i64,
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
}
impl SharedState {
pub fn new(chan_size: usize) -> Self {
SharedState {
next_pub_id: 0,
num_pubs: 0,
next_sub_id: 0,
num_subs_to_timelines: 0,
chans_to_timeline_subs: HashMap::new(),
num_subs_to_all: 0,
chan_to_all_subs: broadcast::channel(chan_size).0,
}
}
// Register new publisher.
pub fn register_publisher(&mut self, registry: Registry) -> Publisher {
let pub_id = self.next_pub_id;
self.next_pub_id += 1;
self.num_pubs += 1;
NUM_PUBS.set(self.num_pubs);
Publisher {
id: pub_id,
registry,
}
}
// Unregister publisher.
pub fn unregister_publisher(&mut self) {
self.num_pubs -= 1;
NUM_PUBS.set(self.num_pubs);
}
// Register new subscriber.
pub fn register_subscriber(
&mut self,
sub_key: SubscriptionKey,
registry: Registry,
chan_size: usize,
) -> Subscriber {
let sub_id = self.next_sub_id;
self.next_sub_id += 1;
let sub_rx = match sub_key {
SubscriptionKey::All => {
self.num_subs_to_all += 1;
NUM_SUBS_ALL.set(self.num_subs_to_all);
self.chan_to_all_subs.subscribe()
}
SubscriptionKey::Timeline(ttid) => {
self.num_subs_to_timelines += 1;
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
// Create new broadcast channel for this key, or subscriber to
// the existing one.
let chan_to_timeline_sub =
self.chans_to_timeline_subs
.entry(ttid)
.or_insert(ChanToTimelineSub {
chan: broadcast::channel(chan_size).0,
num_subscribers: 0,
});
chan_to_timeline_sub.num_subscribers += 1;
chan_to_timeline_sub.chan.subscribe()
}
};
Subscriber {
id: sub_id,
key: sub_key,
sub_rx,
registry,
}
}
// Unregister the subscriber.
pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
match sub_key {
SubscriptionKey::All => {
self.num_subs_to_all -= 1;
NUM_SUBS_ALL.set(self.num_subs_to_all);
}
SubscriptionKey::Timeline(ttid) => {
self.num_subs_to_timelines -= 1;
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
// Remove from the map, destroying the channel, if we are the
// last subscriber to this timeline.
// Missing entry is a bug; we must have registered.
let chan_to_timeline_sub = self.chans_to_timeline_subs.get_mut(&ttid).unwrap();
chan_to_timeline_sub.num_subscribers -= 1;
if chan_to_timeline_sub.num_subscribers == 0 {
self.chans_to_timeline_subs.remove(&ttid);
}
}
}
}
}
// SharedState wrapper.
#[derive(Clone)]
struct Registry {
shared_state: Arc<RwLock<SharedState>>,
chan_size: usize,
}
impl Registry {
// Register new publisher in shared state.
pub fn register_publisher(&self) -> Publisher {
let publisher = self.shared_state.write().register_publisher(self.clone());
trace!("registered publisher {}", publisher.id);
publisher
}
pub fn unregister_publisher(&self, publisher: &Publisher) {
self.shared_state.write().unregister_publisher();
trace!("unregistered publisher {}", publisher.id);
}
// Register new subscriber in shared state.
pub fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber {
let subscriber =
self.shared_state
.write()
.register_subscriber(sub_key, self.clone(), self.chan_size);
trace!("registered subscriber {}", subscriber.id);
subscriber
}
// Unregister the subscriber
pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
self.shared_state
.write()
.unregister_subscriber(subscriber.key);
trace!("unregistered subscriber {}", subscriber.id);
}
}
// Private subscriber state.
struct Subscriber {
id: SubId,
key: SubscriptionKey,
// Subscriber receives messages from publishers here.
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
// to unregister itself from shared state in Drop
registry: Registry,
}
impl Drop for Subscriber {
fn drop(&mut self) {
self.registry.unregister_subscriber(self);
}
}
// Private publisher state
struct Publisher {
id: PubId,
registry: Registry,
}
impl Publisher {
// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
// send message to subscribers for everything
let shared_state = self.registry.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers
let ttid =
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as rx is destroyed after removing from the
// map the last subscriber.
subs.chan.send(msg.clone()).unwrap();
}
Ok(())
}
}
impl Drop for Publisher {
fn drop(&mut self) {
self.registry.unregister_publisher(self);
}
}
struct NeonBrokerImpl {
registry: Registry,
}
#[tonic::async_trait]
impl NeonBroker for NeonBrokerImpl {
async fn publish_safekeeper_info(
&self,
request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
) -> Result<Response<()>, Status> {
let mut publisher = self.registry.register_publisher();
let mut stream = request.into_inner();
loop {
match stream.next().await {
Some(Ok(msg)) => publisher.send_msg(&msg)?,
Some(Err(e)) => return Err(e), // grpc error from the stream
None => break, // closed stream
}
}
Ok(Response::new(()))
}
type SubscribeSafekeeperInfoStream =
Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
async fn subscribe_safekeeper_info(
&self,
request: Request<SubscribeSafekeeperInfoRequest>,
) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
let proto_key = request
.into_inner()
.subscription_key
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
let mut subscriber = self.registry.register_subscriber(sub_key);
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
loop {
match subscriber.sub_rx.recv().await {
Ok(info) => yield info,
Err(RecvError::Lagged(_skipped_msg)) => {
// warn!("dropped {} messages, channel is full", skipped_msg);
}
Err(RecvError::Closed) => {
// can't happen, we never drop the channel while there is a subscriber
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
}
}
}
};
Ok(Response::new(
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
))
}
}
// We serve only metrics through http1.
async fn http1_handler(
req: hyper::Request<hyper::body::Body>,
) -> Result<hyper::Response<Body>, Infallible> {
let resp = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let metrics = metrics::gather();
let encoder = TextEncoder::new();
encoder.encode(&metrics, &mut buffer).unwrap();
hyper::Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
_ => hyper::Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap(),
};
Ok(resp)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
#[cfg(not(feature = "console"))] // tokio-console inits tracing_subscriber on its own
utils::logging::init(utils::logging::LogFormat::from_config(&args.log_format)?)?;
info!("version: {GIT_VERSION}");
#[cfg(feature = "console")]
console_subscriber::init();
let registry = Registry {
shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))),
chan_size: args.chan_size,
};
let neon_broker_impl = NeonBrokerImpl {
registry: registry.clone(),
};
let neon_broker_server = NeonBrokerServer::new(neon_broker_impl);
info!("listening on {}", &args.listen_addr);
// grpc is served along with http1 for metrics on a single port, hence we
// don't use tonic's Server.
hyper::Server::bind(&args.listen_addr)
.http2_keep_alive_interval(Some(args.http2_keepalive_interval))
.serve(make_service_fn(move |_| {
let neon_broker_server_cloned = neon_broker_server.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
// Technically this second clone is not needed, but consume
// by async block is apparently unavoidable. BTW, error
// message is enigmatic, see
// https://github.com/rust-lang/rust/issues/68119
//
// We could get away without async block at all, but then we
// need to resort to futures::Either to merge the result,
// which doesn't caress an eye as well.
let mut neon_broker_server_svc = neon_broker_server_cloned.clone();
async move {
if req.headers().get("content-type").map(|x| x.as_bytes())
== Some(b"application/grpc")
{
let res_resp = neon_broker_server_svc.call(req).await;
// Grpc and http1 handlers have slightly different
// Response types: it is UnsyncBoxBody for the
// former one (not sure why) and plain hyper::Body
// for the latter. Both implement HttpBody though,
// and EitherBody is used to merge them.
res_resp.map(|resp| resp.map(EitherBody::Left))
} else {
let res_resp = http1_handler(req).await;
res_resp.map(|resp| resp.map(EitherBody::Right))
}
}
}))
}
}))
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use neon_broker::neon_broker_proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::sync::broadcast::error::TryRecvError;
use utils::id::{TenantId, TimelineId};
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
SafekeeperTimelineInfo {
safekeeper_id: 1,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: vec![0x00; 16],
timeline_id,
}),
last_log_term: 0,
flush_lsn: 1,
commit_lsn: 2,
backup_lsn: 3,
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
local_start_lsn: 0,
}
}
fn tli_from_u64(i: u64) -> Vec<u8> {
let mut timeline_id = vec![0xFF; 8];
timeline_id.extend_from_slice(&i.to_be_bytes());
timeline_id
}
#[tokio::test]
async fn test_registry() {
let registry = Registry {
shared_state: Arc::new(RwLock::new(SharedState::new(16))),
chan_size: 16,
};
// subscribe to timeline 2
let ttid_2 = TenantTimelineId {
tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
};
let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
let mut subscriber_2 = registry.register_subscriber(sub_key_2);
let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All);
// send two messages with different keys
let msg_1 = msg(tli_from_u64(1));
let msg_2 = msg(tli_from_u64(2));
let mut publisher = registry.register_publisher();
publisher.send_msg(&msg_1).expect("failed to send msg");
publisher.send_msg(&msg_2).expect("failed to send msg");
// msg with key 2 should arrive to subscriber_2
assert!(
matches!(subscriber_2.sub_rx.try_recv(), Ok(msg) if matches!(msg.tenant_timeline_id.as_ref(), Some(ttid) if parse_proto_ttid(ttid).unwrap() == ttid_2))
);
// but nothing more
assert!(
matches!(subscriber_2.sub_rx.try_recv(), Err(err) if matches!(err, TryRecvError::Empty))
);
// subscriber_all should receive both messages
assert!(matches!(subscriber_all.sub_rx.try_recv(), Ok(..)));
assert!(matches!(subscriber_all.sub_rx.try_recv(), Ok(..)));
assert!(
matches!(subscriber_all.sub_rx.try_recv(), Err(err) if matches!(err, TryRecvError::Empty))
);
}
}

108
broker/src/lib.rs Normal file
View File

@@ -0,0 +1,108 @@
use hyper::body::HttpBody;
use std::pin::Pin;
use std::task::{Context, Poll};
use tonic::codegen::StdError;
use tonic::{transport::Channel, Code, Status};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use neon_broker_proto::{
neon_broker_client::NeonBrokerClient, TenantTimelineId as ProtoTenantTimelineId,
};
// Code generated by protobuf.
pub mod neon_broker_proto {
include!("../proto/neon_broker.rs");
}
pub mod metrics;
// Re-exports to avoid direct tonic dependency in user crates.
pub use tonic::Request;
pub use tonic::Streaming;
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
// NeonBrokerClient charged with tonic provided Channel transport; helps to
// avoid depending on tonic directly in user crates.
pub type NeonBrokerClientChannel = NeonBrokerClient<Channel>;
impl NeonBrokerClient<Channel> {
/// Create a new client to the given endpoint, but don't actually connect until the first request.
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
Ok(Self::new(conn))
}
}
// parse variable length bytes from protobuf
pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
.map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
Status::new(
Code::InvalidArgument,
format!("malformed timeline_id: {}", e),
)
})?;
Ok(TenantTimelineId {
tenant_id,
timeline_id,
})
}
// These several usages don't justify anyhow dependency, though it would work as
// well.
type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
// Provides impl HttpBody for two different types implementing it. Inspired by
// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
pub enum EitherBody<A, B> {
Left(A),
Right(B),
}
impl<A, B> HttpBody for EitherBody<A, B>
where
A: HttpBody + Send + Unpin,
B: HttpBody<Data = A::Data> + Send + Unpin,
A::Error: Into<AnyError>,
B::Error: Into<AnyError>,
{
type Data = A::Data;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
fn is_end_stream(&self) -> bool {
match self {
EitherBody::Left(b) => b.is_end_stream(),
EitherBody::Right(b) => b.is_end_stream(),
}
}
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
}
}
}
fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
err.map(|e| e.map_err(Into::into))
}

25
broker/src/metrics.rs Normal file
View File

@@ -0,0 +1,25 @@
//! Broker metrics.
use metrics::{register_int_gauge, IntGauge};
use once_cell::sync::Lazy;
pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!("broker_active_publishers", "Number of publications")
.expect("Failed to register metric")
});
pub static NUM_SUBS_TIMELINE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"broker_per_timeline_active_subscribers",
"Number of subsciptions to particular tenant timeline id"
)
.expect("Failed to register metric")
});
pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"broker_all_keys_active_subscribers",
"Number of subsciptions to all keys"
)
.expect("Failed to register metric")
});

25
docs/broker.md Normal file
View File

@@ -0,0 +1,25 @@
# Neon broker
Neon storage broker targets two issues:
- Allowing safekeepers and pageservers learn which nodes also hold their
timelines, and timeline statuses there.
- Avoiding O(n^2) connections between storage nodes while doing so.
This is used
- By pageservers to determine the most advanced and alive safekeeper to pull WAL from.
- By safekeepers to synchronize on the timeline: advance
`remote_consistent_lsn`, `backup_lsn`, choose who offloads WAL to s3.
Technically, it is a simple stateless pub-sub message broker based on tonic
(grpc) making multiplexing easy. Since it is stateless, fault tolerance can be
provided by k8s; there is no built in replication support, though it is not hard
to add.
Currently, the only message is `SafekeeperTimelineInfo`. Each safekeeper, for
each active timeline, once in a while pushes timeline status to the broker.
Other nodes subscribe and receive this info, using it per above.
grpcurl can be used to check which values are currently being pushed:
```
grpcurl -proto broker/proto/broker.proto -d '{"all":{}}' -plaintext localhost:50051 neon_broker.NeonBroker/SubscribeSafekeeperInfo
```

View File

@@ -2,6 +2,11 @@
Below you will find a brief overview of each subdir in the source tree in alphabetical order.
`broker`:
Neon storage broker, providing messaging between safekeepers and pageservers.
[broker.md](./broker.md)
`/control_plane`:
Local control plane.

View File

@@ -3,6 +3,13 @@ use std::{fmt, str::FromStr};
use hex::FromHex;
use rand::Rng;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum IdError {
#[error("invalid id length {0}")]
SliceParseError(usize),
}
/// Neon ID is a 128-bit random ID.
/// Used to represent various identifiers. Provides handy utility methods and impls.
@@ -22,6 +29,15 @@ impl Id {
Id::from(arr)
}
pub fn from_slice(src: &[u8]) -> Result<Id, IdError> {
if src.len() != 16 {
return Err(IdError::SliceParseError(src.len()));
}
let mut id_array = [0u8; 16];
id_array.copy_from_slice(src);
Ok(id_array.into())
}
pub fn as_arr(&self) -> [u8; 16] {
self.0
}
@@ -100,6 +116,10 @@ macro_rules! id_newtype {
$t(Id::get_from_buf(buf))
}
pub fn from_slice(src: &[u8]) -> Result<$t, IdError> {
Ok($t(Id::from_slice(src)?))
}
pub fn as_arr(&self) -> [u8; 16] {
self.0.as_arr()
}

View File

@@ -76,7 +76,3 @@ tempfile = "3.2"
[[bench]]
name = "bench_layer_map"
harness = false
[[bench]]
name = "bench_walredo"
harness = false

File diff suppressed because one or more lines are too long

View File

@@ -614,9 +614,8 @@ impl PageServerConf {
PathBuf::from(format!("../tmp_check/test_{test_name}"))
}
#[cfg(test)]
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
let pg_distrib_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
PageServerConf {
id: NodeId(0),
wait_lsn_timeout: Duration::from_secs(60),
@@ -627,7 +626,7 @@ impl PageServerConf {
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
superuser: "cloud_admin".to_string(),
workdir: repo_dir,
pg_distrib_dir,
pg_distrib_dir: PathBuf::new(),
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
remote_storage_config: None,

View File

@@ -50,7 +50,7 @@ use crate::storage_sync::index::RemoteIndex;
use crate::task_mgr;
use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::{PostgresRedoManager, WalRedoManager};
use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
@@ -119,7 +119,7 @@ pub struct Tenant {
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: Mutex<()>,
walredo_mgrs: Mutex<HashMap<u32, Arc<dyn WalRedoManager + Send + Sync>>>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
// provides access to timeline data sitting in the remote storage
// supposed to be used for retrieval of remote consistent lsn in walreceiver
@@ -831,19 +831,6 @@ impl Tenant {
}
let pg_version = new_metadata.pg_version();
let walredo_mgr = self
.walredo_mgrs
.lock()
.unwrap()
.entry(pg_version)
.or_insert_with(|| {
Arc::new(PostgresRedoManager::new(
self.conf,
self.tenant_id,
pg_version,
))
})
.clone();
Ok(Timeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
@@ -851,7 +838,7 @@ impl Tenant {
ancestor,
new_timeline_id,
self.tenant_id,
walredo_mgr,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
pg_version,
))
@@ -860,6 +847,7 @@ impl Tenant {
pub(super) fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
remote_index: RemoteIndex,
upload_layers: bool,
@@ -869,9 +857,9 @@ impl Tenant {
tenant_id,
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
walredo_mgrs: Mutex::new(HashMap::new()),
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
remote_index,
upload_layers,
state,
@@ -1743,16 +1731,18 @@ pub mod harness {
}
pub fn try_load(&self) -> anyhow::Result<Tenant> {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Tenant::new(
self.conf,
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,
self.tenant_id,
RemoteIndex::default(),
false,
);
// populate tenant with locally available timelines
let mut timelines_to_load = HashMap::new();
let mut pg_version = 0u32;
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
.expect("should be able to read timelines dir")
{
@@ -1765,14 +1755,8 @@ pub mod harness {
.parse()?;
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
pg_version = timeline_metadata.pg_version();
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant
.walredo_mgrs
.lock()
.unwrap()
.insert(pg_version, Arc::new(TestRedoManager));
tenant.init_attach_timelines(timelines_to_load)?;
tenant.set_state(TenantState::Active {
background_jobs_running: false,

View File

@@ -216,6 +216,7 @@ impl TenantConf {
}
}
#[cfg(test)]
pub fn dummy_conf() -> Self {
TenantConf {
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,

View File

@@ -21,6 +21,7 @@ use crate::tenant::{
ephemeral_file::is_ephemeral_file, metadata::TimelineMetadata, Tenant, TenantState,
};
use crate::tenant_config::TenantConfOpt;
use crate::walredo::PostgresRedoManager;
use crate::TEMP_FILE_SUFFIX;
use utils::crashsafe::{self, path_with_suffix_extension};
@@ -153,6 +154,7 @@ pub fn attach_local_tenants(
let tenant = Arc::new(Tenant::new(
conf,
TenantConfOpt::default(),
Arc::new(PostgresRedoManager::new(conf, tenant_id)),
tenant_id,
remote_index.clone(),
conf.remote_storage_config.is_some(),
@@ -382,10 +384,12 @@ pub fn create_tenant(
Ok(None)
}
hash_map::Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
create_tenant_files(conf, tenant_conf, tenant_id)?;
let tenant = Arc::new(Tenant::new(
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_index,
conf.remote_storage_config.is_some(),

View File

@@ -22,7 +22,6 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use serde::Serialize;
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
@@ -32,8 +31,7 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use std::{fs, io};
@@ -43,6 +41,7 @@ use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
WAL_REDO_WAIT_TIME,
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::repository::Key;
@@ -58,10 +57,6 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
const N_CHANNELS: usize = 16;
const CHANNEL_SIZE: usize = 1024 * 1024;
const ERR_BUF_SIZE: usize = 8192;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -97,18 +92,16 @@ pub trait WalRedoManager: Send + Sync {
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. It multiplexes requests from multiple threads
/// using `sender` channel and send them to the postgres wal-redo process
/// pipe by separate thread. Responses are returned through set of `receivers`
/// channels, used in round robin manner. Receiver thread is protected by mutex
/// to prevent it's usage by more than one thread
/// In the future, we might want to launch a pool of processes to allow concurrent
/// replay of multiple records.
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
///
pub struct PostgresRedoManager {
// mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads
// and limit size of buffer
sender: SyncSender<(Sender<Bytes>, Vec<u8>)>,
tenant_id: TenantId,
conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>,
}
/// Can this request be served by neon redo functions
@@ -156,7 +149,7 @@ impl WalRedoManager for PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
if records.is_empty() {
error!("invalid WAL redo request with no records");
@@ -173,7 +166,14 @@ impl WalRedoManager for PostgresRedoManager {
let result = if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
} else {
self.apply_batch_postgres(key, lsn, img, &records[batch_start..i])
self.apply_batch_postgres(
key,
lsn,
img,
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
)
};
img = Some(result?);
@@ -185,7 +185,14 @@ impl WalRedoManager for PostgresRedoManager {
if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
} else {
self.apply_batch_postgres(key, lsn, img, &records[batch_start..])
self.apply_batch_postgres(
key,
lsn,
img,
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
)
}
}
}
@@ -194,103 +201,85 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(
conf: &'static PageServerConf,
tenant_id: TenantId,
pg_version: u32,
) -> PostgresRedoManager {
#[allow(clippy::type_complexity)]
let (tx, rx): (
SyncSender<(Sender<Bytes>, Vec<u8>)>,
Receiver<(Sender<Bytes>, Vec<u8>)>,
) = mpsc::sync_channel(CHANNEL_SIZE);
let _proxy = std::thread::spawn(move || {
// This dirty hack is used for lazy spawning if walredo process.
// Otherwise initdb called in bootstrap will conflict with initdb called by main pageserver process
let mut input = Some(rx.recv().unwrap());
while let Ok(mut proc) = PostgresRedoProcess::launch(conf, tenant_id, pg_version) {
loop {
if let Some((sender, data)) = input.take() {
if proc.send(sender, data).is_err() {
break;
}
} else {
let (sender, data) = rx.recv().unwrap();
if proc.send(sender, data).is_err() {
break;
}
}
while let Ok((sender, data)) = rx.try_recv() {
if proc.send(sender, data).is_err() {
break;
}
}
if proc.receive().is_err() {
break;
}
}
}
panic!("Failed to launch wal-redo postgres");
});
PostgresRedoManager { sender: tx }
}
fn apply_wal_records(
&self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
) -> Result<Bytes, WalRedoError> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_id,
conf,
process: Mutex::new(None),
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(WalRedoError::InvalidRecord);
}
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
WAL_REDO_RECORDS_HISTOGRAM.observe(records.len() as f64);
WAL_REDO_BYTES_HISTOGRAM.observe(writebuf.len() as f64);
let (tx, rx) = mpsc::channel();
self.sender.send((tx, writebuf)).unwrap();
Ok(rx.recv().unwrap())
}
///
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
/// Process one request for WAL redo using wal-redo postgres
///
fn apply_batch_postgres(
&self,
key: Key,
_lsn: Lsn,
lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
let start_time = Instant::now();
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = self.apply_wal_records(buf_tag, base_img, records);
let result = process
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
let duration = end_time.duration_since(lock_time);
let len = records.len();
let nbytes = records.iter().fold(0, |acumulator, record| {
acumulator
+ match &record.1 {
NeonWalRecord::Postgres { rec, .. } => rec.len(),
_ => unreachable!("Only PostgreSQL records are accepted in this batch"),
}
});
WAL_REDO_TIME.observe(duration.as_secs_f64());
WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
debug!(
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
len,
nbytes,
duration.as_micros(),
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if result.is_err() {
error!(
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
records.len(),
nbytes,
lsn
);
let process = process_guard.take().unwrap();
process.kill();
}
result
}
@@ -593,163 +582,17 @@ impl<C: CommandExt> CloseFileDescriptors for C {
///
struct PostgresRedoProcess {
tenant_id: TenantId,
_child: NoLeakChild,
child: NoLeakChild,
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
wal_redo_timeout: Duration,
// Double ended queue for buffered response senders
resp_deque: VecDeque<Sender<Bytes>>,
// Reconstructed page
page: [u8; BLCKSZ as usize],
// Position in reconstructed page buufer
page_pos: usize,
// Pool file descriptors
poll_fds: [PollFd; 3],
}
impl PostgresRedoProcess {
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
fn receive(&mut self) -> Result<(), Error> {
while !self.resp_deque.is_empty() {
let n = loop {
match nix::poll::poll(
&mut self.poll_fds[0..2],
self.wal_redo_timeout.as_millis() as i32,
) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = self.poll_fds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; ERR_BUF_SIZE] = [0; ERR_BUF_SIZE];
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = self.poll_fds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
if let Some(sender) = self.resp_deque.pop_front() {
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
} else {
return Err(Error::new(
ErrorKind::BrokenPipe,
"Malformed output of walredo process",
));
}
self.page_pos = 0;
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(())
}
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
fn send(&mut self, sender: Sender<Bytes>, data: Vec<u8>) -> Result<(), Error> {
let mut written = 0usize;
let data_len = data.len();
self.resp_deque.push_back(sender);
while written < data_len {
let n = loop {
match nix::poll::poll(
&mut self.poll_fds[0..3],
self.wal_redo_timeout.as_millis() as i32,
) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = self.poll_fds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; ERR_BUF_SIZE] = [0; ERR_BUF_SIZE];
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
panic!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
let in_revents = self.poll_fds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
written += self.stdin.write(&data[written..data_len])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = self.poll_fds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
if let Some(sender) = self.resp_deque.pop_front() {
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
} else {
return Err(Error::new(
ErrorKind::BrokenPipe,
"Malformed output of walredo process",
));
}
self.page_pos = 0;
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(())
}
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%tenant_id))]
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
fn launch(
conf: &PageServerConf,
tenant_id: TenantId,
@@ -762,6 +605,7 @@ impl PostgresRedoProcess {
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
TEMP_FILE_SUFFIX,
);
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
info!(
@@ -783,14 +627,9 @@ impl PostgresRedoProcess {
)
})?;
info!(
"running initdb in {} for pg_verson={}",
datadir.display(),
pg_version
);
info!("running initdb in {}", datadir.display());
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("-n")
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
@@ -870,25 +709,152 @@ impl PostgresRedoProcess {
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
let poll_fds = [
PollFd::new(stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(stdin.as_raw_fd(), PollFlags::POLLOUT),
];
Ok(PostgresRedoProcess {
tenant_id,
_child: child,
child,
stdin,
stdout,
stderr,
wal_redo_timeout: conf.wal_redo_timeout,
resp_deque: VecDeque::with_capacity(N_CHANNELS),
page: [0u8; BLCKSZ as usize],
page_pos: 0,
poll_fds,
})
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn kill(self) {
self.child.kill_and_wait();
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn apply_wal_records(
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(Error::new(
ErrorKind::Other,
"tried to pass neon wal record to postgres WAL redo",
));
}
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
// The input is now in 'writebuf'. Do a blind write first, writing as much as
// we can, before calling poll(). That skips one call to poll() if the stdin is
// already available for writing, which it almost certainly is because the
// process is idle.
let mut nwrite = self.stdin.write(&writebuf)?;
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
while nresult < BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
let n = loop {
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have more data to write and 'stdin' is writeable, do write.
if nwrite < writebuf.len() {
let in_revents = pollfds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += self.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(Bytes::from(resultbuf))
}
}
/// Wrapper type around `std::process::Child` which guarantees that the child

View File

@@ -17,13 +17,13 @@ ahash = { version = "0.7", features = ["std"] }
anyhow = { version = "1", features = ["backtrace", "std"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "iana-time-zone", "js-sys", "oldtime", "serde", "std", "time", "wasm-bindgen", "wasmbind", "winapi"] }
clap = { version = "4", features = ["color", "error-context", "help", "std", "string", "suggestions", "usage"] }
clap = { version = "4", features = ["color", "derive", "error-context", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
@@ -33,7 +33,8 @@ nom = { version = "7", features = ["alloc", "std"] }
num-bigint = { version = "0.4", features = ["std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] }
prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
@@ -58,7 +59,8 @@ libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] }
prost = { version = "0.10", features = ["prost-derive", "std"] }
prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] }
prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }