mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 14:10:37 +00:00
Compare commits
34 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d7bce5f72 | ||
|
|
42a0f5be00 | ||
|
|
d4065f2e85 | ||
|
|
5315614974 | ||
|
|
b99bed510d | ||
|
|
580584c8fc | ||
|
|
d823e84ed5 | ||
|
|
231dfbaed6 | ||
|
|
5cf53786f9 | ||
|
|
9b9bbad462 | ||
|
|
537b2c1ae6 | ||
|
|
31123d1fa8 | ||
|
|
4f2ac51bdd | ||
|
|
7b2f9dc908 | ||
|
|
293bc69e4f | ||
|
|
e9b158e8f5 | ||
|
|
8c09ff1764 | ||
|
|
a66a3b4ed8 | ||
|
|
6913bedc09 | ||
|
|
8a43dfd573 | ||
|
|
065adcf75b | ||
|
|
a180273dc5 | ||
|
|
6c6aa04ce4 | ||
|
|
cd8c96233f | ||
|
|
ecbda94790 | ||
|
|
0d6d8fefd3 | ||
|
|
d296a76e3e | ||
|
|
646e0f3581 | ||
|
|
0bfc422eb3 | ||
|
|
1209572cec | ||
|
|
42f603cb13 | ||
|
|
6bcfd6441f | ||
|
|
dedb03bb5a | ||
|
|
abb07df028 |
10
.github/workflows/build_and_test.yml
vendored
10
.github/workflows/build_and_test.yml
vendored
@@ -127,8 +127,8 @@ jobs:
|
|||||||
target/
|
target/
|
||||||
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
|
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
|
||||||
key: |
|
key: |
|
||||||
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
||||||
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-
|
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-
|
||||||
|
|
||||||
- name: Cache postgres v14 build
|
- name: Cache postgres v14 build
|
||||||
id: cache_pg_14
|
id: cache_pg_14
|
||||||
@@ -389,7 +389,7 @@ jobs:
|
|||||||
!~/.cargo/registry/src
|
!~/.cargo/registry/src
|
||||||
~/.cargo/git/
|
~/.cargo/git/
|
||||||
target/
|
target/
|
||||||
key: v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
key: v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
||||||
|
|
||||||
- name: Get Neon artifact
|
- name: Get Neon artifact
|
||||||
uses: ./.github/actions/download
|
uses: ./.github/actions/download
|
||||||
@@ -768,5 +768,5 @@ jobs:
|
|||||||
- name: Re-deploy proxy
|
- name: Re-deploy proxy
|
||||||
run: |
|
run: |
|
||||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||||
|
|||||||
2
.github/workflows/codestyle.yml
vendored
2
.github/workflows/codestyle.yml
vendored
@@ -106,7 +106,7 @@ jobs:
|
|||||||
!~/.cargo/registry/src
|
!~/.cargo/registry/src
|
||||||
~/.cargo/git
|
~/.cargo/git
|
||||||
target
|
target
|
||||||
key: v4-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
|
key: v5-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
|
||||||
|
|
||||||
- name: Run cargo clippy
|
- name: Run cargo clippy
|
||||||
run: ./run_clippy.sh
|
run: ./run_clippy.sh
|
||||||
|
|||||||
105
Cargo.lock
generated
105
Cargo.lock
generated
@@ -497,8 +497,10 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"clap 3.2.16",
|
"clap 3.2.16",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
|
"futures",
|
||||||
"hyper",
|
"hyper",
|
||||||
"log",
|
"log",
|
||||||
|
"notify",
|
||||||
"postgres",
|
"postgres",
|
||||||
"regex",
|
"regex",
|
||||||
"serde",
|
"serde",
|
||||||
@@ -540,11 +542,11 @@ dependencies = [
|
|||||||
"git-version",
|
"git-version",
|
||||||
"nix",
|
"nix",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"pageserver",
|
"pageserver_api",
|
||||||
"postgres",
|
"postgres",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"safekeeper",
|
"safekeeper_api",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_with",
|
"serde_with",
|
||||||
"tar",
|
"tar",
|
||||||
@@ -1072,6 +1074,15 @@ dependencies = [
|
|||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fsevent-sys"
|
||||||
|
version = "4.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures"
|
name = "futures"
|
||||||
version = "0.3.21"
|
version = "0.3.21"
|
||||||
@@ -1493,6 +1504,26 @@ dependencies = [
|
|||||||
"str_stack",
|
"str_stack",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "inotify"
|
||||||
|
version = "0.9.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"inotify-sys",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "inotify-sys"
|
||||||
|
version = "0.1.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "instant"
|
name = "instant"
|
||||||
version = "0.1.12"
|
version = "0.1.12"
|
||||||
@@ -1552,6 +1583,26 @@ dependencies = [
|
|||||||
"simple_asn1",
|
"simple_asn1",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "kqueue"
|
||||||
|
version = "1.0.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
|
||||||
|
dependencies = [
|
||||||
|
"kqueue-sys",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "kqueue-sys"
|
||||||
|
version = "1.0.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "kstring"
|
name = "kstring"
|
||||||
version = "1.0.6"
|
version = "1.0.6"
|
||||||
@@ -1797,6 +1848,24 @@ dependencies = [
|
|||||||
"minimal-lexical",
|
"minimal-lexical",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "notify"
|
||||||
|
version = "5.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"crossbeam-channel",
|
||||||
|
"filetime",
|
||||||
|
"fsevent-sys",
|
||||||
|
"inotify",
|
||||||
|
"kqueue",
|
||||||
|
"libc",
|
||||||
|
"mio",
|
||||||
|
"walkdir",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-bigint"
|
name = "num-bigint"
|
||||||
version = "0.4.3"
|
version = "0.4.3"
|
||||||
@@ -1975,6 +2044,7 @@ dependencies = [
|
|||||||
"nix",
|
"nix",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
|
"pageserver_api",
|
||||||
"postgres",
|
"postgres",
|
||||||
"postgres-protocol",
|
"postgres-protocol",
|
||||||
"postgres-types",
|
"postgres-types",
|
||||||
@@ -1989,6 +2059,7 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_with",
|
"serde_with",
|
||||||
"signal-hook",
|
"signal-hook",
|
||||||
|
"svg_fmt",
|
||||||
"tar",
|
"tar",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
@@ -2003,6 +2074,17 @@ dependencies = [
|
|||||||
"workspace_hack",
|
"workspace_hack",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pageserver_api"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"const_format",
|
||||||
|
"serde",
|
||||||
|
"serde_with",
|
||||||
|
"utils",
|
||||||
|
"workspace_hack",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "parking_lot"
|
name = "parking_lot"
|
||||||
version = "0.11.2"
|
version = "0.11.2"
|
||||||
@@ -2891,6 +2973,7 @@ dependencies = [
|
|||||||
"postgres_ffi",
|
"postgres_ffi",
|
||||||
"regex",
|
"regex",
|
||||||
"remote_storage",
|
"remote_storage",
|
||||||
|
"safekeeper_api",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_with",
|
"serde_with",
|
||||||
@@ -2906,6 +2989,17 @@ dependencies = [
|
|||||||
"workspace_hack",
|
"workspace_hack",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "safekeeper_api"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"const_format",
|
||||||
|
"serde",
|
||||||
|
"serde_with",
|
||||||
|
"utils",
|
||||||
|
"workspace_hack",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "same-file"
|
name = "same-file"
|
||||||
version = "1.0.6"
|
version = "1.0.6"
|
||||||
@@ -3240,6 +3334,12 @@ version = "2.4.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "svg_fmt"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "symbolic-common"
|
name = "symbolic-common"
|
||||||
version = "8.8.0"
|
version = "8.8.0"
|
||||||
@@ -4142,6 +4242,7 @@ dependencies = [
|
|||||||
"bstr",
|
"bstr",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"crossbeam-utils",
|
||||||
"either",
|
"either",
|
||||||
"fail",
|
"fail",
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
|
|||||||
@@ -8,8 +8,10 @@ anyhow = "1.0"
|
|||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
clap = "3.0"
|
clap = "3.0"
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
|
futures = "0.3.13"
|
||||||
hyper = { version = "0.14", features = ["full"] }
|
hyper = { version = "0.14", features = ["full"] }
|
||||||
log = { version = "0.4", features = ["std", "serde"] }
|
log = { version = "0.4", features = ["std", "serde"] }
|
||||||
|
notify = "5.0.0"
|
||||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||||
regex = "1"
|
regex = "1"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|||||||
@@ -258,14 +258,7 @@ impl ComputeNode {
|
|||||||
.spawn()
|
.spawn()
|
||||||
.expect("cannot start postgres process");
|
.expect("cannot start postgres process");
|
||||||
|
|
||||||
// Try default Postgres port if it is not provided
|
wait_for_postgres(&mut pg, pgdata_path)?;
|
||||||
let port = self
|
|
||||||
.spec
|
|
||||||
.cluster
|
|
||||||
.settings
|
|
||||||
.find("port")
|
|
||||||
.unwrap_or_else(|| "5432".to_string());
|
|
||||||
wait_for_postgres(&mut pg, &port, pgdata_path)?;
|
|
||||||
|
|
||||||
// If connection fails,
|
// If connection fails,
|
||||||
// it may be the old node with `zenith_admin` superuser.
|
// it may be the old node with `zenith_admin` superuser.
|
||||||
|
|||||||
@@ -1,18 +1,19 @@
|
|||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
use std::fs;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
use std::net::{SocketAddr, TcpStream};
|
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process::Child;
|
use std::process::Child;
|
||||||
use std::str::FromStr;
|
use std::time::{Duration, Instant};
|
||||||
use std::{fs, thread, time};
|
|
||||||
|
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use postgres::{Client, Transaction};
|
use postgres::{Client, Transaction};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
|
use notify::{RecursiveMode, Watcher};
|
||||||
|
|
||||||
|
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||||
|
|
||||||
/// Rust representation of Postgres role info with only those fields
|
/// Rust representation of Postgres role info with only those fields
|
||||||
/// that matter for us.
|
/// that matter for us.
|
||||||
@@ -230,52 +231,85 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
|
|||||||
Ok(postgres_dbs)
|
Ok(postgres_dbs)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wait for Postgres to become ready to accept connections:
|
/// Wait for Postgres to become ready to accept connections. It's ready to
|
||||||
/// - state should be `ready` in the `pgdata/postmaster.pid`
|
/// accept connections when the state-field in `pgdata/postmaster.pid` says
|
||||||
/// - and we should be able to connect to 127.0.0.1:5432
|
/// 'ready'.
|
||||||
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
|
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||||
let pid_path = pgdata.join("postmaster.pid");
|
let pid_path = pgdata.join("postmaster.pid");
|
||||||
let mut slept: u64 = 0; // ms
|
|
||||||
let pause = time::Duration::from_millis(100);
|
|
||||||
|
|
||||||
let timeout = time::Duration::from_millis(10);
|
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
|
||||||
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
|
// completed initialization and is ready to accept connections. We want to
|
||||||
|
// react quickly and perform the rest of our initialization as soon as
|
||||||
|
// PostgreSQL starts accepting connections. Use 'notify' to be notified
|
||||||
|
// whenever the PID file is changed, and whenever it changes, read it to
|
||||||
|
// check if it's now "ready".
|
||||||
|
//
|
||||||
|
// You cannot actually watch a file before it exists, so we first watch the
|
||||||
|
// data directory, and once the postmaster.pid file appears, we switch to
|
||||||
|
// watch the file instead. We also wake up every 100 ms to poll, just in
|
||||||
|
// case we miss some events for some reason. Not strictly necessary, but
|
||||||
|
// better safe than sorry.
|
||||||
|
let (tx, rx) = std::sync::mpsc::channel();
|
||||||
|
let mut watcher = notify::recommended_watcher(move |res| {
|
||||||
|
let _ = tx.send(res);
|
||||||
|
})?;
|
||||||
|
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
|
||||||
|
|
||||||
|
let started_at = Instant::now();
|
||||||
|
let mut postmaster_pid_seen = false;
|
||||||
loop {
|
loop {
|
||||||
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
|
|
||||||
// but postgres starts listening almost immediately, even if it is not really
|
|
||||||
// ready to accept connections).
|
|
||||||
if slept >= POSTGRES_WAIT_TIMEOUT {
|
|
||||||
bail!("timed out while waiting for Postgres to start");
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Ok(Some(status)) = pg.try_wait() {
|
if let Ok(Some(status)) = pg.try_wait() {
|
||||||
// Postgres exited, that is not what we expected, bail out earlier.
|
// Postgres exited, that is not what we expected, bail out earlier.
|
||||||
let code = status.code().unwrap_or(-1);
|
let code = status.code().unwrap_or(-1);
|
||||||
bail!("Postgres exited unexpectedly with code {}", code);
|
bail!("Postgres exited unexpectedly with code {}", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let res = rx.recv_timeout(Duration::from_millis(100));
|
||||||
|
log::debug!("woken up by notify: {res:?}");
|
||||||
|
// If there are multiple events in the channel already, we only need to be
|
||||||
|
// check once. Swallow the extra events before we go ahead to check the
|
||||||
|
// pid file.
|
||||||
|
while let Ok(res) = rx.try_recv() {
|
||||||
|
log::debug!("swallowing extra event: {res:?}");
|
||||||
|
}
|
||||||
|
|
||||||
// Check that we can open pid file first.
|
// Check that we can open pid file first.
|
||||||
if let Ok(file) = File::open(&pid_path) {
|
if let Ok(file) = File::open(&pid_path) {
|
||||||
|
if !postmaster_pid_seen {
|
||||||
|
log::debug!("postmaster.pid appeared");
|
||||||
|
watcher
|
||||||
|
.unwatch(pgdata)
|
||||||
|
.expect("Failed to remove pgdata dir watch");
|
||||||
|
watcher
|
||||||
|
.watch(&pid_path, RecursiveMode::NonRecursive)
|
||||||
|
.expect("Failed to add postmaster.pid file watch");
|
||||||
|
postmaster_pid_seen = true;
|
||||||
|
}
|
||||||
|
|
||||||
let file = BufReader::new(file);
|
let file = BufReader::new(file);
|
||||||
let last_line = file.lines().last();
|
let last_line = file.lines().last();
|
||||||
|
|
||||||
// Pid file could be there and we could read it, but it could be empty, for example.
|
// Pid file could be there and we could read it, but it could be empty, for example.
|
||||||
if let Some(Ok(line)) = last_line {
|
if let Some(Ok(line)) = last_line {
|
||||||
let status = line.trim();
|
let status = line.trim();
|
||||||
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
|
log::debug!("last line of postmaster.pid: {status:?}");
|
||||||
|
|
||||||
// Now Postgres is ready to accept connections
|
// Now Postgres is ready to accept connections
|
||||||
if status == "ready" && can_connect {
|
if status == "ready" {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
thread::sleep(pause);
|
// Give up after POSTGRES_WAIT_TIMEOUT.
|
||||||
slept += 100;
|
let duration = started_at.elapsed();
|
||||||
|
if duration >= POSTGRES_WAIT_TIMEOUT {
|
||||||
|
bail!("timed out while waiting for Postgres to start");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log::info!("PostgreSQL is now running, continuing to configure it");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,9 @@ thiserror = "1"
|
|||||||
nix = "0.23"
|
nix = "0.23"
|
||||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
||||||
|
|
||||||
pageserver = { path = "../pageserver" }
|
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
|
||||||
safekeeper = { path = "../safekeeper" }
|
# instead, so that recompile times are better.
|
||||||
|
pageserver_api = { path = "../libs/pageserver_api" }
|
||||||
|
safekeeper_api = { path = "../libs/safekeeper_api" }
|
||||||
utils = { path = "../libs/utils" }
|
utils = { path = "../libs/utils" }
|
||||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||||
|
|||||||
@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
|
|||||||
use control_plane::safekeeper::SafekeeperNode;
|
use control_plane::safekeeper::SafekeeperNode;
|
||||||
use control_plane::storage::PageServerNode;
|
use control_plane::storage::PageServerNode;
|
||||||
use control_plane::{etcd, local_env};
|
use control_plane::{etcd, local_env};
|
||||||
use pageserver::config::defaults::{
|
use pageserver_api::models::TimelineInfo;
|
||||||
|
use pageserver_api::{
|
||||||
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
|
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
|
||||||
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
|
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
|
||||||
};
|
};
|
||||||
use pageserver::http::models::TimelineInfo;
|
use safekeeper_api::{
|
||||||
use safekeeper::defaults::{
|
|
||||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
|
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -285,7 +285,7 @@ impl LocalEnv {
|
|||||||
branch_name: &str,
|
branch_name: &str,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
) -> Option<TimelineId> {
|
) -> Option<TimelineId> {
|
||||||
dbg!(&self.branch_name_mappings)
|
self.branch_name_mappings
|
||||||
.get(branch_name)?
|
.get(branch_name)?
|
||||||
.iter()
|
.iter()
|
||||||
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
|
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use nix::unistd::Pid;
|
|||||||
use postgres::Config;
|
use postgres::Config;
|
||||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||||
use reqwest::{IntoUrl, Method};
|
use reqwest::{IntoUrl, Method};
|
||||||
use safekeeper::http::models::TimelineCreateRequest;
|
use safekeeper_api::models::TimelineCreateRequest;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use utils::{
|
use utils::{
|
||||||
connstring::connection_address,
|
connstring::connection_address,
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
|
|||||||
use nix::errno::Errno;
|
use nix::errno::Errno;
|
||||||
use nix::sys::signal::{kill, Signal};
|
use nix::sys::signal::{kill, Signal};
|
||||||
use nix::unistd::Pid;
|
use nix::unistd::Pid;
|
||||||
use pageserver::http::models::{
|
use pageserver_api::models::{
|
||||||
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
|
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
|
||||||
};
|
};
|
||||||
use postgres::{Config, NoTls};
|
use postgres::{Config, NoTls};
|
||||||
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
|
|||||||
let url = self.url().to_owned();
|
let url = self.url().to_owned();
|
||||||
Err(PageserverHttpError::Response(
|
Err(PageserverHttpError::Response(
|
||||||
match self.json::<HttpErrorBody>() {
|
match self.json::<HttpErrorBody>() {
|
||||||
Ok(err_body) => format!("Error: {}", err_body.msg),
|
Ok(err_body) => format!("Response error: {}", err_body.msg),
|
||||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
@@ -181,14 +181,15 @@ impl PageServerNode {
|
|||||||
new_timeline_id: Option<TimelineId>,
|
new_timeline_id: Option<TimelineId>,
|
||||||
pg_version: u32,
|
pg_version: u32,
|
||||||
) -> anyhow::Result<TimelineId> {
|
) -> anyhow::Result<TimelineId> {
|
||||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
|
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
|
||||||
|
.context("Failed to create tenant")?;
|
||||||
let initial_timeline_info = self.timeline_create(
|
let initial_timeline_info = self.timeline_create(
|
||||||
initial_tenant_id,
|
initial_tenant_id,
|
||||||
new_timeline_id,
|
new_timeline_id,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
Some(pg_version),
|
Some(pg_version),
|
||||||
)?;
|
).context("Failed to create timeline")?;
|
||||||
Ok(initial_timeline_info.timeline_id)
|
Ok(initial_timeline_info.timeline_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -419,6 +420,11 @@ impl PageServerNode {
|
|||||||
.map(|x| x.parse::<NonZeroU64>())
|
.map(|x| x.parse::<NonZeroU64>())
|
||||||
.transpose()
|
.transpose()
|
||||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||||
|
trace_read_requests: settings
|
||||||
|
.remove("trace_read_requests")
|
||||||
|
.map(|x| x.parse::<bool>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||||
};
|
};
|
||||||
if !settings.is_empty() {
|
if !settings.is_empty() {
|
||||||
bail!("Unrecognized tenant settings: {settings:?}")
|
bail!("Unrecognized tenant settings: {settings:?}")
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ A single virtual environment with all dependencies is described in the single `P
|
|||||||
sudo apt install python3.9
|
sudo apt install python3.9
|
||||||
```
|
```
|
||||||
- Install `poetry`
|
- Install `poetry`
|
||||||
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation)`.
|
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation).
|
||||||
- Install dependencies via `./scripts/pysync`.
|
- Install dependencies via `./scripts/pysync`.
|
||||||
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
|
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
|
||||||
so if you have different version some linting tools can yield different result locally vs in the CI.
|
so if you have different version some linting tools can yield different result locally vs in the CI.
|
||||||
|
|||||||
111
hack/demo.py
111
hack/demo.py
@@ -1,111 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import textwrap
|
|
||||||
import uuid
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import testgres
|
|
||||||
|
|
||||||
|
|
||||||
def run_command(args):
|
|
||||||
print('> Cmd:', ' '.join(args))
|
|
||||||
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
|
||||||
ret = p.wait()
|
|
||||||
output = p.stdout.read().strip()
|
|
||||||
if output:
|
|
||||||
print(textwrap.indent(output, '>> '))
|
|
||||||
if ret != 0:
|
|
||||||
raise subprocess.CalledProcessError(ret, args)
|
|
||||||
|
|
||||||
|
|
||||||
def make_tarfile(output_filename, source_dir):
|
|
||||||
print("* Packing the backup into a tarball")
|
|
||||||
cmd = ["tar", r"--transform=s/\.\///", "-C", str(source_dir), "-cf", str(output_filename), "."]
|
|
||||||
run_command(cmd)
|
|
||||||
|
|
||||||
|
|
||||||
def create_tenant(tenant_id):
|
|
||||||
print("* Creating a new tenant")
|
|
||||||
cmd = ["neon_local", "tenant", "create", f"--tenant-id={tenant_id}"]
|
|
||||||
run_command(cmd)
|
|
||||||
|
|
||||||
|
|
||||||
def import_backup(args, backup_dir: Path):
|
|
||||||
tar = Path('/tmp/base.tar')
|
|
||||||
make_tarfile(tar, backup_dir / 'data')
|
|
||||||
|
|
||||||
print("* Importing the timeline into the pageserver")
|
|
||||||
|
|
||||||
manifest = json.loads((backup_dir / "data" / "backup_manifest").read_text())
|
|
||||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
|
||||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
|
||||||
print("> LSNs:", start_lsn, end_lsn)
|
|
||||||
|
|
||||||
cmd = (
|
|
||||||
"neon_local timeline import "
|
|
||||||
f"--tenant-id {args.tenant_id} "
|
|
||||||
f"--base-lsn {start_lsn} "
|
|
||||||
f"--end-lsn {end_lsn} "
|
|
||||||
f"--base-tarfile {tar} "
|
|
||||||
f"--timeline-id {args.timeline_id} "
|
|
||||||
f"--node-name {args.node}"
|
|
||||||
)
|
|
||||||
|
|
||||||
run_command(cmd.split())
|
|
||||||
|
|
||||||
|
|
||||||
def debug_prints(node):
|
|
||||||
tuples = node.execute("table foo")
|
|
||||||
oid = node.execute("select 'foo'::regclass::oid")[0][0]
|
|
||||||
print("> foo's tuples:", tuples, "&", "oid:", oid)
|
|
||||||
print("> DBs:", node.execute("select oid, datname from pg_database"))
|
|
||||||
|
|
||||||
|
|
||||||
def main(args):
|
|
||||||
print("* Creating a node")
|
|
||||||
node = testgres.get_new_node()
|
|
||||||
node.init(unix_sockets=False, allow_streaming=True).start()
|
|
||||||
node.execute("create table foo as select 1")
|
|
||||||
debug_prints(node)
|
|
||||||
# node.pgbench_init(scale=1)
|
|
||||||
|
|
||||||
print("* Creating a backup")
|
|
||||||
backup = node.backup()
|
|
||||||
backup_dir = Path(backup.base_dir)
|
|
||||||
print("> Backup dir:", backup_dir)
|
|
||||||
|
|
||||||
# pr = backup.spawn_primary().start()
|
|
||||||
# debug_prints(pr)
|
|
||||||
# exit(1)
|
|
||||||
|
|
||||||
create_tenant(args.tenant_id)
|
|
||||||
import_backup(args, backup_dir)
|
|
||||||
|
|
||||||
print("> Tenant:", args.tenant_id)
|
|
||||||
print("> Timeline:", args.timeline_id)
|
|
||||||
print("> Node:", args.node)
|
|
||||||
|
|
||||||
print("* Starting postgres")
|
|
||||||
cmd = ["neon_local", "pg", "start", f"--tenant-id={args.tenant_id}", f"--timeline-id={args.timeline_id}", args.node]
|
|
||||||
run_command(cmd)
|
|
||||||
|
|
||||||
print("* Opening psql session...")
|
|
||||||
cmd = ["psql", f"host=127.0.0.1 port=55432 user={os.getlogin()} dbname=postgres"]
|
|
||||||
subprocess.call(cmd)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
tenant_id = uuid.uuid4().hex
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument("--tenant-id", default=tenant_id)
|
|
||||||
parser.add_argument("--timeline-id", default=tenant_id)
|
|
||||||
parser.add_argument("node")
|
|
||||||
|
|
||||||
args = parser.parse_args(sys.argv[1:])
|
|
||||||
main(args)
|
|
||||||
12
libs/pageserver_api/Cargo.toml
Normal file
12
libs/pageserver_api/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
[package]
|
||||||
|
name = "pageserver_api"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_with = "1.12.0"
|
||||||
|
const_format = "0.2.21"
|
||||||
|
|
||||||
|
utils = { path = "../utils" }
|
||||||
|
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||||
9
libs/pageserver_api/src/lib.rs
Normal file
9
libs/pageserver_api/src/lib.rs
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
use const_format::formatcp;
|
||||||
|
|
||||||
|
/// Public API types
|
||||||
|
pub mod models;
|
||||||
|
|
||||||
|
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||||
|
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||||
|
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||||
|
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||||
@@ -7,7 +7,17 @@ use utils::{
|
|||||||
lsn::Lsn,
|
lsn::Lsn,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::tenant::TenantState;
|
/// A state of a tenant in pageserver's memory.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub enum TenantState {
|
||||||
|
/// Tenant is fully operational, its background jobs might be running or not.
|
||||||
|
Active { background_jobs_running: bool },
|
||||||
|
/// A tenant is recognized by pageserver, but not yet ready to operate:
|
||||||
|
/// e.g. not present locally and being downloaded or being read into memory from the file system.
|
||||||
|
Paused,
|
||||||
|
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
|
||||||
|
Broken,
|
||||||
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
@@ -42,6 +52,7 @@ pub struct TenantCreateRequest {
|
|||||||
pub walreceiver_connect_timeout: Option<String>,
|
pub walreceiver_connect_timeout: Option<String>,
|
||||||
pub lagging_wal_timeout: Option<String>,
|
pub lagging_wal_timeout: Option<String>,
|
||||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
|
pub trace_read_requests: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
@@ -3,9 +3,11 @@
|
|||||||
#![allow(non_snake_case)]
|
#![allow(non_snake_case)]
|
||||||
// bindgen creates some unsafe code with no doc comments.
|
// bindgen creates some unsafe code with no doc comments.
|
||||||
#![allow(clippy::missing_safety_doc)]
|
#![allow(clippy::missing_safety_doc)]
|
||||||
// suppress warnings on rust 1.53 due to bindgen unit tests.
|
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
|
||||||
// https://github.com/rust-lang/rust-bindgen/issues/1651
|
#![allow(clippy::useless_transmute)]
|
||||||
#![allow(deref_nullptr)]
|
// modules included with the postgres_ffi macro depend on the types of the specific version's
|
||||||
|
// types, and trigger a too eager lint.
|
||||||
|
#![allow(clippy::duplicate_mod)]
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use utils::bin_ser::SerializeError;
|
use utils::bin_ser::SerializeError;
|
||||||
|
|||||||
@@ -57,12 +57,10 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
|
|||||||
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
|
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
|
||||||
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
|
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
|
||||||
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
||||||
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
|
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
|
||||||
pub fn XLogSegNoOffsetToRecPtr(
|
pub fn XLogSegNoOffsetToRecPtr(
|
||||||
segno: XLogSegNo,
|
segno: XLogSegNo,
|
||||||
offset: u32,
|
offset: u32,
|
||||||
@@ -71,7 +69,6 @@ pub fn XLogSegNoOffsetToRecPtr(
|
|||||||
segno * (wal_segsz_bytes as u64) + (offset as u64)
|
segno * (wal_segsz_bytes as u64) + (offset as u64)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
|
||||||
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
|
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
|
||||||
format!(
|
format!(
|
||||||
"{:>08X}{:>08X}{:>08X}",
|
"{:>08X}{:>08X}{:>08X}",
|
||||||
@@ -81,7 +78,6 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
|
||||||
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
|
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
|
||||||
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
||||||
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
||||||
@@ -89,12 +85,10 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
|
|||||||
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
|
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
|
||||||
pub fn IsXLogFileName(fname: &str) -> bool {
|
pub fn IsXLogFileName(fname: &str) -> bool {
|
||||||
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
|
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
|
||||||
pub fn IsPartialXLogFileName(fname: &str) -> bool {
|
pub fn IsPartialXLogFileName(fname: &str) -> bool {
|
||||||
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
|
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
|
||||||
}
|
}
|
||||||
|
|||||||
12
libs/safekeeper_api/Cargo.toml
Normal file
12
libs/safekeeper_api/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
[package]
|
||||||
|
name = "safekeeper_api"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_with = "1.12.0"
|
||||||
|
const_format = "0.2.21"
|
||||||
|
|
||||||
|
utils = { path = "../utils" }
|
||||||
|
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||||
10
libs/safekeeper_api/src/lib.rs
Normal file
10
libs/safekeeper_api/src/lib.rs
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
use const_format::formatcp;
|
||||||
|
|
||||||
|
/// Public API types
|
||||||
|
pub mod models;
|
||||||
|
|
||||||
|
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
|
||||||
|
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||||
|
|
||||||
|
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
|
||||||
|
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||||
@@ -58,6 +58,7 @@ rstar = "0.9.3"
|
|||||||
num-traits = "0.2.15"
|
num-traits = "0.2.15"
|
||||||
amplify_num = "0.4.1"
|
amplify_num = "0.4.1"
|
||||||
|
|
||||||
|
pageserver_api = { path = "../libs/pageserver_api" }
|
||||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||||
etcd_broker = { path = "../libs/etcd_broker" }
|
etcd_broker = { path = "../libs/etcd_broker" }
|
||||||
metrics = { path = "../libs/metrics" }
|
metrics = { path = "../libs/metrics" }
|
||||||
@@ -66,6 +67,7 @@ remote_storage = { path = "../libs/remote_storage" }
|
|||||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||||
close_fds = "0.3.2"
|
close_fds = "0.3.2"
|
||||||
walkdir = "2.3.2"
|
walkdir = "2.3.2"
|
||||||
|
svg_fmt = "0.4.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
hex-literal = "0.3"
|
hex-literal = "0.3"
|
||||||
|
|||||||
185
pageserver/src/bin/draw_trace.rs
Normal file
185
pageserver/src/bin/draw_trace.rs
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
use clap::{App, Arg};
|
||||||
|
use futures::TryFutureExt;
|
||||||
|
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
|
||||||
|
|
||||||
|
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
|
||||||
|
use std::io::Write;
|
||||||
|
use std::{
|
||||||
|
fs::{read_dir, File},
|
||||||
|
io::BufReader,
|
||||||
|
str::FromStr,
|
||||||
|
};
|
||||||
|
use svg_fmt::*;
|
||||||
|
use utils::{
|
||||||
|
id::{TenantId, TimelineId},
|
||||||
|
lsn::Lsn,
|
||||||
|
pq_proto::{BeMessage, FeMessage},
|
||||||
|
};
|
||||||
|
|
||||||
|
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
|
||||||
|
let set: BTreeSet<T> = coords.into_iter().collect();
|
||||||
|
|
||||||
|
let mut map: BTreeMap<T, usize> = BTreeMap::new();
|
||||||
|
for (i, e) in set.iter().enumerate() {
|
||||||
|
map.insert(*e, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
(set.len(), map)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> anyhow::Result<()> {
|
||||||
|
// TODO upgrade to struct macro arg parsing
|
||||||
|
let arg_matches = App::new("Pageserver trace visualization tool")
|
||||||
|
.about("Makes a svg file that displays the read pattern")
|
||||||
|
.arg(
|
||||||
|
Arg::new("traces_dir")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("Directory where the read traces are stored"),
|
||||||
|
)
|
||||||
|
.get_matches();
|
||||||
|
|
||||||
|
// (blkno, lsn)
|
||||||
|
let mut dots = Vec::<(u32, Lsn)>::new();
|
||||||
|
|
||||||
|
|
||||||
|
let mut dump_file = File::create("dump.txt").expect("can't make file");
|
||||||
|
let mut deltas = HashMap::<i32, u32>::new();
|
||||||
|
let mut prev1: u32 = 0;
|
||||||
|
let mut prev2: u32 = 0;
|
||||||
|
let mut prev3: u32 = 0;
|
||||||
|
|
||||||
|
println!("scanning trace ...");
|
||||||
|
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||||
|
for tenant_dir in read_dir(traces_dir)? {
|
||||||
|
let entry = tenant_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
// println!("tenant: {tenant_id}");
|
||||||
|
|
||||||
|
println!("opening {path:?}");
|
||||||
|
for timeline_dir in read_dir(path)? {
|
||||||
|
let entry = timeline_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
// println!("hi");
|
||||||
|
// println!("timeline: {timeline_id}");
|
||||||
|
|
||||||
|
println!("opening {path:?}");
|
||||||
|
for trace_dir in read_dir(path)? {
|
||||||
|
let entry = trace_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
println!("opening {path:?}");
|
||||||
|
let file = File::open(path.clone())?;
|
||||||
|
let mut reader = BufReader::new(file);
|
||||||
|
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
|
||||||
|
// println!("Parsed message {:?}", msg);
|
||||||
|
match msg {
|
||||||
|
PagestreamFeMessage::Exists(_) => {}
|
||||||
|
PagestreamFeMessage::Nblocks(_) => {}
|
||||||
|
PagestreamFeMessage::GetPage(req) => {
|
||||||
|
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
|
||||||
|
// dots.push((req.blkno, req.lsn));
|
||||||
|
// HACK
|
||||||
|
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
|
||||||
|
|
||||||
|
let delta1 = (req.blkno as i32) - (prev1 as i32);
|
||||||
|
let delta2 = (req.blkno as i32) - (prev2 as i32);
|
||||||
|
let delta3 = (req.blkno as i32) - (prev3 as i32);
|
||||||
|
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
|
||||||
|
delta1
|
||||||
|
} else {
|
||||||
|
delta2
|
||||||
|
};
|
||||||
|
if i32::abs(delta3) < i32::abs(delta) {
|
||||||
|
delta = delta3;
|
||||||
|
}
|
||||||
|
let delta = delta1;
|
||||||
|
|
||||||
|
prev3 = prev2;
|
||||||
|
prev2 = prev1;
|
||||||
|
prev1 = req.blkno;
|
||||||
|
|
||||||
|
match deltas.get_mut(&delta) {
|
||||||
|
Some(c) => {*c += 1;},
|
||||||
|
None => {deltas.insert(delta, 1);},
|
||||||
|
};
|
||||||
|
|
||||||
|
if delta == 9 {
|
||||||
|
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
PagestreamFeMessage::DbSize(_) => {}
|
||||||
|
};
|
||||||
|
|
||||||
|
// HACK
|
||||||
|
// if dots.len() > 1000 {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut other = deltas.len();
|
||||||
|
deltas.retain(|_, count| *count > 3);
|
||||||
|
other -= deltas.len();
|
||||||
|
dbg!(other);
|
||||||
|
dbg!(deltas);
|
||||||
|
|
||||||
|
// Collect all coordinates
|
||||||
|
let mut keys: Vec<u32> = vec![];
|
||||||
|
let mut lsns: Vec<Lsn> = vec![];
|
||||||
|
for dot in &dots {
|
||||||
|
keys.push(dot.0);
|
||||||
|
lsns.push(dot.1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Analyze
|
||||||
|
let (key_max, key_map) = analyze(keys);
|
||||||
|
let (lsn_max, lsn_map) = analyze(lsns);
|
||||||
|
|
||||||
|
// Draw
|
||||||
|
println!("drawing trace ...");
|
||||||
|
let mut svg_file = File::create("out.svg").expect("can't make file");
|
||||||
|
writeln!(
|
||||||
|
&mut svg_file,
|
||||||
|
"{}",
|
||||||
|
BeginSvg {
|
||||||
|
w: (key_max + 1) as f32,
|
||||||
|
h: (lsn_max + 1) as f32,
|
||||||
|
}
|
||||||
|
)?;
|
||||||
|
for (key, lsn) in &dots {
|
||||||
|
let key = key_map.get(&key).unwrap();
|
||||||
|
let lsn = lsn_map.get(&lsn).unwrap();
|
||||||
|
writeln!(
|
||||||
|
&mut svg_file,
|
||||||
|
" {}",
|
||||||
|
rectangle(
|
||||||
|
*key as f32,
|
||||||
|
*lsn as f32,
|
||||||
|
10.0,
|
||||||
|
10.0
|
||||||
|
)
|
||||||
|
.fill(Fill::Color(red()))
|
||||||
|
.stroke(Stroke::Color(black(), 0.0))
|
||||||
|
.border_radius(0.5)
|
||||||
|
)?;
|
||||||
|
// println!(" {}",
|
||||||
|
// rectangle(key_start as f32 + stretch * margin,
|
||||||
|
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
|
||||||
|
// key_diff as f32 - stretch * 2.0 * margin,
|
||||||
|
// stretch * (lsn_diff - 2.0 * margin))
|
||||||
|
// // .fill(rgb(200, 200, 200))
|
||||||
|
// .fill(fill)
|
||||||
|
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
|
||||||
|
// .border_radius(0.4)
|
||||||
|
// );
|
||||||
|
}
|
||||||
|
|
||||||
|
writeln!(&mut svg_file, "{}", EndSvg)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
133
pageserver/src/bin/replay.rs
Normal file
133
pageserver/src/bin/replay.rs
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
use bytes::BytesMut;
|
||||||
|
use pageserver::page_service::PagestreamFeMessage;
|
||||||
|
use std::{
|
||||||
|
fs::{read_dir, File},
|
||||||
|
io::BufReader,
|
||||||
|
path::PathBuf,
|
||||||
|
str::FromStr,
|
||||||
|
};
|
||||||
|
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||||
|
|
||||||
|
use clap::{App, Arg};
|
||||||
|
use utils::{
|
||||||
|
id::{TenantId, TimelineId},
|
||||||
|
pq_proto::{BeMessage, FeMessage},
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO put this in library, dedup with stuff in control_plane
|
||||||
|
/// Client for the pageserver's pagestream API
|
||||||
|
struct PagestreamApi {
|
||||||
|
stream: TcpStream,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PagestreamApi {
|
||||||
|
async fn connect(
|
||||||
|
connstr: &str,
|
||||||
|
tenant: &TenantId,
|
||||||
|
timeline: &TimelineId,
|
||||||
|
) -> anyhow::Result<PagestreamApi> {
|
||||||
|
// Parse connstr
|
||||||
|
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
|
||||||
|
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
|
||||||
|
|
||||||
|
// Connect
|
||||||
|
let mut stream = TcpStream::connect(tcp_addr).await?;
|
||||||
|
let (client, conn) = config
|
||||||
|
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Enter pagestream protocol
|
||||||
|
let init_query = format!("pagestream {} {}", tenant, timeline);
|
||||||
|
tokio::select! {
|
||||||
|
_ = conn => panic!("connection closed during pagestream initialization"),
|
||||||
|
_ = client.query(init_query.as_str(), &[]) => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(PagestreamApi { stream })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||||
|
let request = {
|
||||||
|
let msg_bytes = msg.serialize();
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
let copy_msg = BeMessage::CopyData(&msg_bytes);
|
||||||
|
|
||||||
|
// TODO it's actually a fe message but it doesn't have a serializer yet
|
||||||
|
BeMessage::write(&mut buf, ©_msg)?;
|
||||||
|
buf.freeze()
|
||||||
|
};
|
||||||
|
self.stream.write_all(&request).await?;
|
||||||
|
|
||||||
|
// TODO It's actually a be message, but it doesn't have a parser.
|
||||||
|
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
|
||||||
|
let _response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||||
|
Some(FeMessage::CopyData(page)) => page,
|
||||||
|
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn replay_trace<R: std::io::Read>(
|
||||||
|
reader: &mut R,
|
||||||
|
mut pagestream: PagestreamApi,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
|
||||||
|
println!("Parsed message {:?}", msg);
|
||||||
|
pagestream.make_request(msg).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
// TODO upgrade to struct macro arg parsing
|
||||||
|
let arg_matches = App::new("Pageserver trace replay tool")
|
||||||
|
.about("Replays wal or read traces to test pageserver performance")
|
||||||
|
.arg(
|
||||||
|
Arg::new("traces_dir")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("Directory where the read traces are stored"),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("pageserver_connstr")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("Pageserver pg endpoint to connect to"),
|
||||||
|
)
|
||||||
|
.get_matches();
|
||||||
|
|
||||||
|
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
|
||||||
|
|
||||||
|
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||||
|
for tenant_dir in read_dir(traces_dir)? {
|
||||||
|
let entry = tenant_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
for timeline_dir in read_dir(path)? {
|
||||||
|
let entry = timeline_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
for trace_dir in read_dir(path)? {
|
||||||
|
let entry = trace_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
// TODO The pageserver deletes existing traces?
|
||||||
|
// LOL yes because I use tenant ID as trace id
|
||||||
|
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
|
||||||
|
|
||||||
|
let file = File::open(path.clone())?;
|
||||||
|
let mut reader = BufReader::new(file);
|
||||||
|
// let len = file.metadata().unwrap().len();
|
||||||
|
// println!("replaying {:?} trace {} bytes", path, len);
|
||||||
|
replay_trace(&mut reader, pagestream).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -30,10 +30,10 @@ pub mod defaults {
|
|||||||
use crate::tenant_config::defaults::*;
|
use crate::tenant_config::defaults::*;
|
||||||
use const_format::formatcp;
|
use const_format::formatcp;
|
||||||
|
|
||||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
pub use pageserver_api::{
|
||||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
DEFAULT_PG_LISTEN_PORT,
|
||||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
};
|
||||||
|
|
||||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||||
@@ -364,6 +364,23 @@ impl PageServerConf {
|
|||||||
self.timelines_path(tenant_id).join(timeline_id.to_string())
|
self.timelines_path(tenant_id).join(timeline_id.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn traces_path(&self) -> PathBuf {
|
||||||
|
self.workdir.join("traces")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn trace_path(
|
||||||
|
&self,
|
||||||
|
tenant_id: &TenantId,
|
||||||
|
timeline_id: &TimelineId,
|
||||||
|
connection_id: &TimelineId, // TODO make a new type
|
||||||
|
) -> PathBuf {
|
||||||
|
self.traces_path()
|
||||||
|
.join(tenant_id.to_string())
|
||||||
|
.join(timeline_id.to_string())
|
||||||
|
.join(connection_id.to_string())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/// Points to a place in pageserver's local directory,
|
/// Points to a place in pageserver's local directory,
|
||||||
/// where certain timeline's metadata file should be located.
|
/// where certain timeline's metadata file should be located.
|
||||||
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
|
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
pub mod models;
|
|
||||||
pub mod routes;
|
pub mod routes;
|
||||||
pub use routes::make_router;
|
pub use routes::make_router;
|
||||||
|
|
||||||
|
pub use pageserver_api::models;
|
||||||
|
|||||||
@@ -337,9 +337,16 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
|
|||||||
info!("Handling tenant attach {tenant_id}");
|
info!("Handling tenant attach {tenant_id}");
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
|
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
|
||||||
Ok(_) => Err(ApiError::Conflict(
|
Ok(tenant) => {
|
||||||
"Tenant is already present locally".to_owned(),
|
if tenant.list_timelines().is_empty() {
|
||||||
)),
|
info!("Attaching to tenant {tenant_id} with zero timelines");
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(ApiError::Conflict(
|
||||||
|
"Tenant is already present locally".to_owned(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
Err(_) => Ok(()),
|
Err(_) => Ok(()),
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
@@ -610,6 +617,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||||
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||||
}
|
}
|
||||||
|
if let Some(trace_read_requests) = request_data.trace_read_requests {
|
||||||
|
tenant_conf.trace_read_requests = Some(trace_read_requests);
|
||||||
|
}
|
||||||
|
|
||||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ pub mod tenant;
|
|||||||
pub mod tenant_config;
|
pub mod tenant_config;
|
||||||
pub mod tenant_mgr;
|
pub mod tenant_mgr;
|
||||||
pub mod tenant_tasks;
|
pub mod tenant_tasks;
|
||||||
|
// pub mod timelines;
|
||||||
|
pub mod trace;
|
||||||
pub mod virtual_file;
|
pub mod virtual_file;
|
||||||
pub mod walingest;
|
pub mod walingest;
|
||||||
pub mod walreceiver;
|
pub mod walreceiver;
|
||||||
|
|||||||
@@ -10,7 +10,9 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
use anyhow::{bail, ensure, Context, Result};
|
use anyhow::{bail, ensure, Context, Result};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use byteorder::{BigEndian, ReadBytesExt};
|
||||||
|
use bytes::Buf;
|
||||||
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use std::io;
|
use std::io;
|
||||||
@@ -42,6 +44,7 @@ use crate::task_mgr;
|
|||||||
use crate::task_mgr::TaskKind;
|
use crate::task_mgr::TaskKind;
|
||||||
use crate::tenant::Timeline;
|
use crate::tenant::Timeline;
|
||||||
use crate::tenant_mgr;
|
use crate::tenant_mgr;
|
||||||
|
use crate::trace::Tracer;
|
||||||
use crate::CheckpointConfig;
|
use crate::CheckpointConfig;
|
||||||
|
|
||||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||||
@@ -49,7 +52,9 @@ use postgres_ffi::to_pg_timestamp;
|
|||||||
use postgres_ffi::BLCKSZ;
|
use postgres_ffi::BLCKSZ;
|
||||||
|
|
||||||
// Wrapped in libpq CopyData
|
// Wrapped in libpq CopyData
|
||||||
enum PagestreamFeMessage {
|
// TODO these should be in a library outside the pageserver
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum PagestreamFeMessage {
|
||||||
Exists(PagestreamExistsRequest),
|
Exists(PagestreamExistsRequest),
|
||||||
Nblocks(PagestreamNblocksRequest),
|
Nblocks(PagestreamNblocksRequest),
|
||||||
GetPage(PagestreamGetPageRequest),
|
GetPage(PagestreamGetPageRequest),
|
||||||
@@ -57,7 +62,7 @@ enum PagestreamFeMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wrapped in libpq CopyData
|
// Wrapped in libpq CopyData
|
||||||
enum PagestreamBeMessage {
|
pub enum PagestreamBeMessage {
|
||||||
Exists(PagestreamExistsResponse),
|
Exists(PagestreamExistsResponse),
|
||||||
Nblocks(PagestreamNblocksResponse),
|
Nblocks(PagestreamNblocksResponse),
|
||||||
GetPage(PagestreamGetPageResponse),
|
GetPage(PagestreamGetPageResponse),
|
||||||
@@ -66,106 +71,152 @@ enum PagestreamBeMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamExistsRequest {
|
pub struct PagestreamExistsRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
rel: RelTag,
|
rel: RelTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamNblocksRequest {
|
pub struct PagestreamNblocksRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
rel: RelTag,
|
rel: RelTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamGetPageRequest {
|
pub struct PagestreamGetPageRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
rel: RelTag,
|
pub rel: RelTag,
|
||||||
blkno: u32,
|
pub blkno: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamDbSizeRequest {
|
pub struct PagestreamDbSizeRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
dbnode: u32,
|
dbnode: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamExistsResponse {
|
pub struct PagestreamExistsResponse {
|
||||||
exists: bool,
|
exists: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamNblocksResponse {
|
pub struct PagestreamNblocksResponse {
|
||||||
n_blocks: u32,
|
n_blocks: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamGetPageResponse {
|
pub struct PagestreamGetPageResponse {
|
||||||
page: Bytes,
|
page: Bytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamErrorResponse {
|
pub struct PagestreamErrorResponse {
|
||||||
message: String,
|
message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamDbSizeResponse {
|
pub struct PagestreamDbSizeResponse {
|
||||||
db_size: i64,
|
db_size: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PagestreamFeMessage {
|
impl PagestreamFeMessage {
|
||||||
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
|
pub fn serialize(&self) -> Bytes {
|
||||||
|
let mut bytes = BytesMut::new();
|
||||||
|
|
||||||
|
match self {
|
||||||
|
Self::Exists(req) => {
|
||||||
|
bytes.put_u8(0);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.rel.spcnode);
|
||||||
|
bytes.put_u32(req.rel.dbnode);
|
||||||
|
bytes.put_u32(req.rel.relnode);
|
||||||
|
bytes.put_u8(req.rel.forknum);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::Nblocks(req) => {
|
||||||
|
bytes.put_u8(1);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.rel.spcnode);
|
||||||
|
bytes.put_u32(req.rel.dbnode);
|
||||||
|
bytes.put_u32(req.rel.relnode);
|
||||||
|
bytes.put_u8(req.rel.forknum);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::GetPage(req) => {
|
||||||
|
bytes.put_u8(2);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.rel.spcnode);
|
||||||
|
bytes.put_u32(req.rel.dbnode);
|
||||||
|
bytes.put_u32(req.rel.relnode);
|
||||||
|
bytes.put_u8(req.rel.forknum);
|
||||||
|
bytes.put_u32(req.blkno);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::DbSize(req) => {
|
||||||
|
bytes.put_u8(3);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.dbnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
|
||||||
// TODO these gets can fail
|
// TODO these gets can fail
|
||||||
|
|
||||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||||
//
|
//
|
||||||
// TODO: consider using protobuf or serde bincode for less error prone
|
// TODO: consider using protobuf or serde bincode for less error prone
|
||||||
// serialization.
|
// serialization.
|
||||||
let msg_tag = body.get_u8();
|
let msg_tag = body.read_u8()?;
|
||||||
match msg_tag {
|
match msg_tag {
|
||||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.get_u32(),
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
relnode: body.get_u32(),
|
relnode: body.read_u32::<BigEndian>()?,
|
||||||
forknum: body.get_u8(),
|
forknum: body.read_u8()?,
|
||||||
},
|
},
|
||||||
})),
|
})),
|
||||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.get_u32(),
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
relnode: body.get_u32(),
|
relnode: body.read_u32::<BigEndian>()?,
|
||||||
forknum: body.get_u8(),
|
forknum: body.read_u8()?,
|
||||||
},
|
},
|
||||||
})),
|
})),
|
||||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.get_u32(),
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
relnode: body.get_u32(),
|
relnode: body.read_u32::<BigEndian>()?,
|
||||||
forknum: body.get_u8(),
|
forknum: body.read_u8()?,
|
||||||
},
|
},
|
||||||
blkno: body.get_u32(),
|
blkno: body.read_u32::<BigEndian>()?,
|
||||||
})),
|
})),
|
||||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
})),
|
})),
|
||||||
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
|
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -422,6 +473,17 @@ impl PageServerHandler {
|
|||||||
// so there is no need to reset the association
|
// so there is no need to reset the association
|
||||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||||
|
|
||||||
|
// Make request tracer if needed
|
||||||
|
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
|
||||||
|
let mut tracer = if tenant.get_trace_read_requests() {
|
||||||
|
let path = tenant
|
||||||
|
.conf
|
||||||
|
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
|
||||||
|
Some(Tracer::new(path))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
// Check that the timeline exists
|
// Check that the timeline exists
|
||||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||||
|
|
||||||
@@ -446,15 +508,24 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
let copy_data_bytes = match msg? {
|
let copy_data_bytes = match msg? {
|
||||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||||
|
Some(FeMessage::Sync) => {
|
||||||
|
// TODO what now?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Some(m) => {
|
Some(m) => {
|
||||||
bail!("unexpected message: {m:?} during COPY");
|
bail!("unexpected message: {m:?} during COPY");
|
||||||
}
|
}
|
||||||
None => break, // client disconnected
|
None => break, // client disconnected
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Trace request if needed
|
||||||
|
if let Some(t) = tracer.as_mut() {
|
||||||
|
t.trace(©_data_bytes)
|
||||||
|
}
|
||||||
|
|
||||||
trace!("query: {copy_data_bytes:?}");
|
trace!("query: {copy_data_bytes:?}");
|
||||||
|
|
||||||
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
|
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||||
|
|
||||||
let response = match neon_fe_msg {
|
let response = match neon_fe_msg {
|
||||||
PagestreamFeMessage::Exists(req) => {
|
PagestreamFeMessage::Exists(req) => {
|
||||||
|
|||||||
@@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task(
|
|||||||
(storage, remote_index_clone, sync_queue),
|
(storage, remote_index_clone, sync_queue),
|
||||||
max_sync_errors,
|
max_sync_errors,
|
||||||
)
|
)
|
||||||
|
.instrument(info_span!("storage_sync_loop"))
|
||||||
.await;
|
.await;
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ use crate::tenant_config::TenantConfOpt;
|
|||||||
use crate::virtual_file::VirtualFile;
|
use crate::virtual_file::VirtualFile;
|
||||||
use crate::walredo::WalRedoManager;
|
use crate::walredo::WalRedoManager;
|
||||||
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
|
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
|
||||||
|
pub use pageserver_api::models::TenantState;
|
||||||
|
|
||||||
use toml_edit;
|
use toml_edit;
|
||||||
use utils::{
|
use utils::{
|
||||||
@@ -118,18 +119,6 @@ pub struct Tenant {
|
|||||||
upload_layers: bool,
|
upload_layers: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A state of a tenant in pageserver's memory.
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
|
||||||
pub enum TenantState {
|
|
||||||
/// Tenant is fully operational, its background jobs might be running or not.
|
|
||||||
Active { background_jobs_running: bool },
|
|
||||||
/// A tenant is recognized by pageserver, but not yet ready to operate:
|
|
||||||
/// e.g. not present locally and being downloaded or being read into memory from the file system.
|
|
||||||
Paused,
|
|
||||||
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
|
|
||||||
Broken,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A repository corresponds to one .neon directory. One repository holds multiple
|
/// A repository corresponds to one .neon directory. One repository holds multiple
|
||||||
/// timelines, forked off from the same initial call to 'initdb'.
|
/// timelines, forked off from the same initial call to 'initdb'.
|
||||||
impl Tenant {
|
impl Tenant {
|
||||||
@@ -400,16 +389,19 @@ impl Tenant {
|
|||||||
timeline_id,
|
timeline_id,
|
||||||
metadata.pg_version()
|
metadata.pg_version()
|
||||||
);
|
);
|
||||||
let timeline = self
|
let ancestor = metadata
|
||||||
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
|
.ancestor_timeline()
|
||||||
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
|
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
|
||||||
|
.cloned();
|
||||||
match timelines_accessor.entry(timeline.timeline_id) {
|
match timelines_accessor.entry(timeline_id) {
|
||||||
Entry::Occupied(_) => bail!(
|
Entry::Occupied(_) => warn!(
|
||||||
"Found freshly initialized timeline {} in the tenant map",
|
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
|
||||||
timeline.timeline_id
|
self.tenant_id, timeline_id
|
||||||
),
|
),
|
||||||
Entry::Vacant(v) => {
|
Entry::Vacant(v) => {
|
||||||
|
let timeline = self
|
||||||
|
.initialize_new_timeline(timeline_id, metadata, ancestor)
|
||||||
|
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
|
||||||
v.insert(timeline);
|
v.insert(timeline);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -601,6 +593,14 @@ impl Tenant {
|
|||||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO is there a need for all this boilerplate?
|
||||||
|
pub fn get_trace_read_requests(&self) -> bool {
|
||||||
|
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||||
|
tenant_conf
|
||||||
|
.trace_read_requests
|
||||||
|
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||||
}
|
}
|
||||||
@@ -609,21 +609,14 @@ impl Tenant {
|
|||||||
&self,
|
&self,
|
||||||
new_timeline_id: TimelineId,
|
new_timeline_id: TimelineId,
|
||||||
new_metadata: TimelineMetadata,
|
new_metadata: TimelineMetadata,
|
||||||
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
ancestor: Option<Arc<Timeline>>,
|
||||||
) -> anyhow::Result<Arc<Timeline>> {
|
) -> anyhow::Result<Arc<Timeline>> {
|
||||||
let ancestor = match new_metadata.ancestor_timeline() {
|
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
|
||||||
Some(ancestor_timeline_id) => Some(
|
anyhow::ensure!(
|
||||||
timelines
|
ancestor.is_some(),
|
||||||
.get(&ancestor_timeline_id)
|
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
|
||||||
.cloned()
|
)
|
||||||
.with_context(|| {
|
}
|
||||||
format!(
|
|
||||||
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
|
|
||||||
)
|
|
||||||
})?,
|
|
||||||
),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
|
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
|
||||||
let pg_version = new_metadata.pg_version();
|
let pg_version = new_metadata.pg_version();
|
||||||
@@ -1080,8 +1073,12 @@ impl Tenant {
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let ancestor = new_metadata
|
||||||
|
.ancestor_timeline()
|
||||||
|
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
|
||||||
|
.cloned();
|
||||||
let new_timeline = self
|
let new_timeline = self
|
||||||
.initialize_new_timeline(new_timeline_id, new_metadata, timelines)
|
.initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
format!(
|
format!(
|
||||||
"Failed to initialize timeline {}/{}",
|
"Failed to initialize timeline {}/{}",
|
||||||
|
|||||||
@@ -627,7 +627,7 @@ impl Timeline {
|
|||||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
||||||
drop(tenant_conf_guard);
|
drop(tenant_conf_guard);
|
||||||
let self_clone = Arc::clone(self);
|
let self_clone = Arc::clone(self);
|
||||||
let _ = spawn_connection_manager_task(
|
spawn_connection_manager_task(
|
||||||
self.conf.broker_etcd_prefix.clone(),
|
self.conf.broker_etcd_prefix.clone(),
|
||||||
self_clone,
|
self_clone,
|
||||||
walreceiver_connect_timeout,
|
walreceiver_connect_timeout,
|
||||||
|
|||||||
@@ -82,6 +82,10 @@ pub struct TenantConf {
|
|||||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||||
/// to avoid eager reconnects.
|
/// to avoid eager reconnects.
|
||||||
pub max_lsn_wal_lag: NonZeroU64,
|
pub max_lsn_wal_lag: NonZeroU64,
|
||||||
|
/// If enabled, records all read requests for this ... TODO
|
||||||
|
/// Even though read traces are small, there's no delete meachanism so they can
|
||||||
|
/// pile up. So we disable this by default.
|
||||||
|
pub trace_read_requests: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Same as TenantConf, but this struct preserves the information about
|
/// Same as TenantConf, but this struct preserves the information about
|
||||||
@@ -105,6 +109,7 @@ pub struct TenantConfOpt {
|
|||||||
#[serde(with = "humantime_serde")]
|
#[serde(with = "humantime_serde")]
|
||||||
pub lagging_wal_timeout: Option<Duration>,
|
pub lagging_wal_timeout: Option<Duration>,
|
||||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
|
pub trace_read_requests: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TenantConfOpt {
|
impl TenantConfOpt {
|
||||||
@@ -138,6 +143,9 @@ impl TenantConfOpt {
|
|||||||
.lagging_wal_timeout
|
.lagging_wal_timeout
|
||||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||||
|
trace_read_requests: self
|
||||||
|
.trace_read_requests
|
||||||
|
.unwrap_or(global_conf.trace_read_requests),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,6 +215,7 @@ impl TenantConf {
|
|||||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||||
|
trace_read_requests: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -232,6 +241,7 @@ impl TenantConf {
|
|||||||
.unwrap(),
|
.unwrap(),
|
||||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
|
trace_read_requests: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -107,6 +107,13 @@ pub fn init_tenant_mgr(
|
|||||||
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
|
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
|
||||||
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
|
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
|
||||||
/// and the load continues.
|
/// and the load continues.
|
||||||
|
///
|
||||||
|
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
|
||||||
|
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
|
||||||
|
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
|
||||||
|
///
|
||||||
|
/// Attach happens on startup and sucessful timeline downloads
|
||||||
|
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
|
||||||
pub fn attach_local_tenants(
|
pub fn attach_local_tenants(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
remote_index: &RemoteIndex,
|
remote_index: &RemoteIndex,
|
||||||
@@ -122,18 +129,20 @@ pub fn attach_local_tenants(
|
|||||||
);
|
);
|
||||||
debug!("Timelines to attach: {local_timelines:?}");
|
debug!("Timelines to attach: {local_timelines:?}");
|
||||||
|
|
||||||
let tenant = load_local_tenant(conf, tenant_id, remote_index);
|
let mut tenants_accessor = tenants_state::write_tenants();
|
||||||
{
|
let tenant = match tenants_accessor.entry(tenant_id) {
|
||||||
match tenants_state::write_tenants().entry(tenant_id) {
|
hash_map::Entry::Occupied(o) => {
|
||||||
hash_map::Entry::Occupied(_) => {
|
info!("Tenant {tenant_id} was found in pageserver's memory");
|
||||||
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
|
Arc::clone(o.get())
|
||||||
continue;
|
|
||||||
}
|
|
||||||
hash_map::Entry::Vacant(v) => {
|
|
||||||
v.insert(Arc::clone(&tenant));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
hash_map::Entry::Vacant(v) => {
|
||||||
|
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
|
||||||
|
let tenant = load_local_tenant(conf, tenant_id, remote_index);
|
||||||
|
v.insert(Arc::clone(&tenant));
|
||||||
|
tenant
|
||||||
|
}
|
||||||
|
};
|
||||||
|
drop(tenants_accessor);
|
||||||
|
|
||||||
if tenant.current_state() == TenantState::Broken {
|
if tenant.current_state() == TenantState::Broken {
|
||||||
warn!("Skipping timeline load for broken tenant {tenant_id}")
|
warn!("Skipping timeline load for broken tenant {tenant_id}")
|
||||||
@@ -168,16 +177,28 @@ fn load_local_tenant(
|
|||||||
remote_index.clone(),
|
remote_index.clone(),
|
||||||
conf.remote_storage_config.is_some(),
|
conf.remote_storage_config.is_some(),
|
||||||
));
|
));
|
||||||
match Tenant::load_tenant_config(conf, tenant_id) {
|
|
||||||
Ok(tenant_conf) => {
|
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
|
||||||
tenant.update_tenant_config(tenant_conf);
|
if !tenant_timelines_dir.is_dir() {
|
||||||
tenant.activate(false);
|
error!(
|
||||||
}
|
"Tenant {} has no timelines directory at {}",
|
||||||
Err(e) => {
|
tenant_id,
|
||||||
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
tenant_timelines_dir.display()
|
||||||
tenant.set_state(TenantState::Broken);
|
);
|
||||||
|
tenant.set_state(TenantState::Broken);
|
||||||
|
} else {
|
||||||
|
match Tenant::load_tenant_config(conf, tenant_id) {
|
||||||
|
Ok(tenant_conf) => {
|
||||||
|
tenant.update_tenant_config(tenant_conf);
|
||||||
|
tenant.activate(false);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
||||||
|
tenant.set_state(TenantState::Broken);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tenant
|
tenant
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -625,14 +646,10 @@ fn collect_timelines_for_tenant(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if tenant_timelines.is_empty() {
|
if tenant_timelines.is_empty() {
|
||||||
match remove_if_empty(&timelines_dir) {
|
// this is normal, we've removed all broken, empty and temporary timeline dirs
|
||||||
Ok(true) => info!(
|
// but should allow the tenant to stay functional and allow creating new timelines
|
||||||
"Removed empty tenant timelines directory {}",
|
// on a restart, we require tenants to have the timelines dir, so leave it on disk
|
||||||
timelines_dir.display()
|
debug!("Tenant {tenant_id} has no timelines loaded");
|
||||||
),
|
|
||||||
Ok(false) => (),
|
|
||||||
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((tenant_id, tenant_timelines))
|
Ok((tenant_id, tenant_timelines))
|
||||||
|
|||||||
36
pageserver/src/trace.rs
Normal file
36
pageserver/src/trace.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use bytes::Bytes;
|
||||||
|
use std::{
|
||||||
|
fs::{create_dir_all, File},
|
||||||
|
io::{BufWriter, Write},
|
||||||
|
path::PathBuf,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Tracer {
|
||||||
|
writer: BufWriter<File>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Tracer {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tracer {
|
||||||
|
pub fn new(path: PathBuf) -> Self {
|
||||||
|
let parent = path.parent().expect("failed to parse parent path");
|
||||||
|
create_dir_all(parent).expect("failed to create trace dir");
|
||||||
|
|
||||||
|
let file = File::create(path).expect("failed to create trace file");
|
||||||
|
Tracer {
|
||||||
|
writer: BufWriter::new(file),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn trace(&mut self, msg: &Bytes) {
|
||||||
|
self.writer.write(msg).expect("failed to write trace");
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush(&mut self) {
|
||||||
|
self.writer.flush().expect("failed to flush trace file");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -264,12 +264,76 @@ pageserver_flush(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Entry of the page_cache
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
// HACK Just xor of bytes lol
|
||||||
|
char request_hash;
|
||||||
|
|
||||||
|
// Points directly to a NeonResponse. We can't just own the
|
||||||
|
// NeonResponse because it's a "supertype", so it's not Sized.
|
||||||
|
char* response;
|
||||||
|
int len;
|
||||||
|
} NeonRequestResponse;
|
||||||
|
|
||||||
|
#define MAX_PAGE_CACHE_SIZE 50
|
||||||
|
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
|
||||||
|
int page_cache_size = 0;
|
||||||
|
int page_cache_head = 0;
|
||||||
|
|
||||||
static NeonResponse *
|
static NeonResponse *
|
||||||
pageserver_call(NeonRequest * request)
|
pageserver_call(NeonRequest * request)
|
||||||
{
|
{
|
||||||
|
// Compute hash
|
||||||
|
char hash = 0;
|
||||||
|
StringInfoData req_buff;
|
||||||
|
req_buff = nm_pack_request(request);
|
||||||
|
for (int i = 0; i < req_buff.len; i++) {
|
||||||
|
hash ^= req_buff.data[i];
|
||||||
|
}
|
||||||
|
pfree(req_buff.data);
|
||||||
|
|
||||||
|
// If result is cached, memcpy and return
|
||||||
|
for (int i = 0; i < page_cache_size; i++) {
|
||||||
|
if (page_cache[i].request_hash == hash) {
|
||||||
|
int len = page_cache[0].response->len;
|
||||||
|
NeonResponse *resp = palloc0(len);
|
||||||
|
// I'd rather Rc than memcpy, but this is not rust :(
|
||||||
|
memcpy(resp, page_cache[0].response->data, len);
|
||||||
|
elog(LOG, "cache hit !!!");
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send request, get response
|
||||||
pageserver_send(request);
|
pageserver_send(request);
|
||||||
pageserver_flush();
|
pageserver_flush();
|
||||||
return pageserver_receive();
|
NeonResponse *resp = pageserver_receive();
|
||||||
|
|
||||||
|
// Get length
|
||||||
|
int len = -1;
|
||||||
|
switch (resp->tag) {
|
||||||
|
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
|
||||||
|
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
|
||||||
|
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
|
||||||
|
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
|
||||||
|
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache result
|
||||||
|
page_cache[page_cache_head].request_hash = hash;
|
||||||
|
if (page_cache_head < page_cache_size) {
|
||||||
|
pfree(page_cache[page_cache_head].response);
|
||||||
|
}
|
||||||
|
page_cache[page_cache_head].len = len;
|
||||||
|
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
|
||||||
|
memcpy(page_cache[page_cache_head].response, resp, len);
|
||||||
|
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
|
||||||
|
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
|
||||||
|
page_cache_size += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
page_server_api api = {
|
page_server_api api = {
|
||||||
|
|||||||
@@ -129,13 +129,13 @@ mod tests {
|
|||||||
assert!(CANCEL_MAP.contains(&session));
|
assert!(CANCEL_MAP.contains(&session));
|
||||||
|
|
||||||
tx.send(()).expect("failed to send");
|
tx.send(()).expect("failed to send");
|
||||||
let () = futures::future::pending().await; // sleep forever
|
futures::future::pending::<()>().await; // sleep forever
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Wait until the task has been spawned.
|
// Wait until the task has been spawned.
|
||||||
let () = rx.await.context("failed to hear from the task")?;
|
rx.await.context("failed to hear from the task")?;
|
||||||
|
|
||||||
// Drop the session's entry by cancelling the task.
|
// Drop the session's entry by cancelling the task.
|
||||||
task.abort();
|
task.abort();
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ filterwarnings =
|
|||||||
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
|
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
|
||||||
addopts =
|
addopts =
|
||||||
-m 'not remote_cluster'
|
-m 'not remote_cluster'
|
||||||
|
--ignore=test_runner/performance
|
||||||
markers =
|
markers =
|
||||||
remote_cluster
|
remote_cluster
|
||||||
testpaths =
|
testpaths =
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
[toolchain]
|
[toolchain]
|
||||||
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people
|
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people
|
||||||
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
|
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
|
||||||
# version, we can consider updating. As of this writing, 1.60 is available on Debian 'experimental' but not yet on
|
# version, we can consider updating.
|
||||||
# 'testing' or even 'unstable', which is a bit more cutting-edge than we'd like. Hopefully the 1.60 packages reach
|
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package,
|
||||||
# 'testing' soon (and similarly for the other distributions).
|
# we use "unstable" version number as the highest version used in the project by default.
|
||||||
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package.
|
channel = "1.61" # do update GitHub CI cache values for rust builds, when changing this value
|
||||||
channel = "1.60" # do update GitHub CI cache values for rust builds, when changing this value
|
|
||||||
profile = "default"
|
profile = "default"
|
||||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ toml_edit = { version = "0.13", features = ["easy"] }
|
|||||||
thiserror = "1"
|
thiserror = "1"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.1"
|
||||||
|
|
||||||
|
safekeeper_api = { path = "../libs/safekeeper_api" }
|
||||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||||
metrics = { path = "../libs/metrics" }
|
metrics = { path = "../libs/metrics" }
|
||||||
utils = { path = "../libs/utils" }
|
utils = { path = "../libs/utils" }
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
pub mod models;
|
|
||||||
pub mod routes;
|
pub mod routes;
|
||||||
pub use routes::make_router;
|
pub use routes::make_router;
|
||||||
|
|
||||||
|
pub use safekeeper_api::models;
|
||||||
|
|||||||
@@ -27,14 +27,13 @@ mod timelines_global_map;
|
|||||||
pub use timelines_global_map::GlobalTimelines;
|
pub use timelines_global_map::GlobalTimelines;
|
||||||
|
|
||||||
pub mod defaults {
|
pub mod defaults {
|
||||||
use const_format::formatcp;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
|
pub use safekeeper_api::{
|
||||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||||
|
DEFAULT_PG_LISTEN_PORT,
|
||||||
|
};
|
||||||
|
|
||||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
|
|
||||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
|
||||||
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
|
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
|
||||||
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
|
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,6 +56,14 @@ If you want to run all tests that have the string "bench" in their names:
|
|||||||
|
|
||||||
`./scripts/pytest -k bench`
|
`./scripts/pytest -k bench`
|
||||||
|
|
||||||
|
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
|
||||||
|
|
||||||
|
`./scripts/pytest -n4`
|
||||||
|
|
||||||
|
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
|
||||||
|
|
||||||
|
`./scripts/pytest test_runner/performance`
|
||||||
|
|
||||||
Useful environment variables:
|
Useful environment variables:
|
||||||
|
|
||||||
`NEON_BIN`: The directory where neon binaries can be found.
|
`NEON_BIN`: The directory where neon binaries can be found.
|
||||||
|
|||||||
@@ -91,10 +91,25 @@ class NeonCompare(PgCompare):
|
|||||||
self._pg_bin = pg_bin
|
self._pg_bin = pg_bin
|
||||||
self.pageserver_http_client = self.env.pageserver.http_client()
|
self.pageserver_http_client = self.env.pageserver.http_client()
|
||||||
|
|
||||||
|
self.tenant, _ = self.env.neon_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
"trace_read_requests": "true",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# HACK enable request tracing, as an experiment
|
||||||
|
# NOTE This must be done before the pg starts pagestream
|
||||||
|
# TODO why does it not work?
|
||||||
|
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
|
||||||
|
# "trace_read_requests": "true",
|
||||||
|
# })
|
||||||
|
|
||||||
# We only use one branch and one timeline
|
# We only use one branch and one timeline
|
||||||
self.env.neon_cli.create_branch(branch_name, "empty")
|
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
|
||||||
self._pg = self.env.postgres.create_start(branch_name)
|
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||||
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||||
|
self._pg = self.env.postgres.create_start(
|
||||||
|
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pg(self):
|
def pg(self):
|
||||||
@@ -109,10 +124,10 @@ class NeonCompare(PgCompare):
|
|||||||
return self._pg_bin
|
return self._pg_bin
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
|
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
|
||||||
|
|
||||||
def compact(self):
|
def compact(self):
|
||||||
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
|
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
|
||||||
|
|
||||||
def report_peak_memory_use(self) -> None:
|
def report_peak_memory_use(self) -> None:
|
||||||
self.zenbenchmark.record(
|
self.zenbenchmark.record(
|
||||||
@@ -124,7 +139,7 @@ class NeonCompare(PgCompare):
|
|||||||
|
|
||||||
def report_size(self) -> None:
|
def report_size(self) -> None:
|
||||||
timeline_size = self.zenbenchmark.get_timeline_size(
|
timeline_size = self.zenbenchmark.get_timeline_size(
|
||||||
self.env.repo_dir, self.env.initial_tenant, self.timeline
|
self.env.repo_dir, self.tenant, self.timeline
|
||||||
)
|
)
|
||||||
self.zenbenchmark.record(
|
self.zenbenchmark.record(
|
||||||
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
||||||
|
|||||||
@@ -455,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
|
|||||||
LOCAL_FS = "local_fs"
|
LOCAL_FS = "local_fs"
|
||||||
MOCK_S3 = "mock_s3"
|
MOCK_S3 = "mock_s3"
|
||||||
REAL_S3 = "real_s3"
|
REAL_S3 = "real_s3"
|
||||||
|
# Pass to tests that are generic to remote storage
|
||||||
|
# to ensure the test pass with or without the remote storage
|
||||||
|
NOOP = "noop"
|
||||||
|
|
||||||
|
|
||||||
def available_remote_storages() -> List[RemoteStorageKind]:
|
def available_remote_storages() -> List[RemoteStorageKind]:
|
||||||
@@ -583,7 +586,9 @@ class NeonEnvBuilder:
|
|||||||
test_name: str,
|
test_name: str,
|
||||||
force_enable: bool = True,
|
force_enable: bool = True,
|
||||||
):
|
):
|
||||||
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||||
|
return
|
||||||
|
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
||||||
@@ -1744,6 +1749,37 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
|
|||||||
return PgBin(test_output_dir, pg_version)
|
return PgBin(test_output_dir, pg_version)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ReplayBin:
|
||||||
|
"""A helper class for replaying pageserver wal and read traces."""
|
||||||
|
|
||||||
|
traces_dir: str
|
||||||
|
|
||||||
|
def replay_all(self, pageserver_connstr):
|
||||||
|
replay_binpath = os.path.join(str(neon_binpath), "replay")
|
||||||
|
args = [
|
||||||
|
replay_binpath,
|
||||||
|
self.traces_dir,
|
||||||
|
pageserver_connstr,
|
||||||
|
]
|
||||||
|
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
|
||||||
|
subprocess.run(args)
|
||||||
|
|
||||||
|
def draw_all(self):
|
||||||
|
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
|
||||||
|
args = [
|
||||||
|
draw_binpath,
|
||||||
|
self.traces_dir,
|
||||||
|
]
|
||||||
|
subprocess.run(args)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def replay_bin(test_output_dir):
|
||||||
|
traces_dir = os.path.join(test_output_dir, "repo", "traces")
|
||||||
|
return ReplayBin(traces_dir)
|
||||||
|
|
||||||
|
|
||||||
class VanillaPostgres(PgProtocol):
|
class VanillaPostgres(PgProtocol):
|
||||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||||
super().__init__(host="localhost", port=port, dbname="postgres")
|
super().__init__(host="localhost", port=port, dbname="postgres")
|
||||||
|
|||||||
@@ -175,6 +175,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
|||||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||||
|
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||||
|
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||||
|
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||||
|
|
||||||
# Run the pgbench tests, and generate a flamegraph from it
|
# Run the pgbench tests, and generate a flamegraph from it
|
||||||
# This requires that the pageserver was built with the 'profiling' feature.
|
# This requires that the pageserver was built with the 'profiling' feature.
|
||||||
#
|
#
|
||||||
|
|||||||
57
test_runner/performance/test_trace_replay.py
Normal file
57
test_runner/performance/test_trace_replay.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
from contextlib import closing
|
||||||
|
|
||||||
|
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||||
|
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
|
||||||
|
|
||||||
|
|
||||||
|
# This test is a demonstration of how to do trace playback. With the current
|
||||||
|
# workload it uses it's not testing anything meaningful.
|
||||||
|
def test_trace_replay(
|
||||||
|
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
|
||||||
|
):
|
||||||
|
neon_env_builder.num_safekeepers = 1
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
tenant, _ = env.neon_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
"trace_read_requests": "true",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||||
|
# env.neon_cli.config_tenant(tenant, conf={
|
||||||
|
# "trace_read_requests": "true",
|
||||||
|
# })
|
||||||
|
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
|
||||||
|
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
|
||||||
|
|
||||||
|
with zenbenchmark.record_duration("run"):
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
|
||||||
|
with closing(pg.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("create table t (i integer);")
|
||||||
|
cur.execute(f"insert into t values (generate_series(1,{10000}));")
|
||||||
|
cur.execute("select count(*) from t;")
|
||||||
|
|
||||||
|
# Stop pg so we drop the connection and flush the traces
|
||||||
|
pg.stop()
|
||||||
|
|
||||||
|
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||||
|
# env.neon_cli.config_tenant(tenant, conf={
|
||||||
|
# "trace_read_requests": "false",
|
||||||
|
# })
|
||||||
|
|
||||||
|
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
|
||||||
|
# assert trace_path.exists()
|
||||||
|
|
||||||
|
replay_bin.draw_all()
|
||||||
|
return
|
||||||
|
|
||||||
|
print("replaying")
|
||||||
|
ps_connstr = env.pageserver.connstr()
|
||||||
|
with zenbenchmark.record_duration("replay"):
|
||||||
|
output = replay_bin.replay_all(ps_connstr)
|
||||||
|
print(output)
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
|
|
||||||
import psycopg2.extras
|
import psycopg2.extras
|
||||||
|
import pytest
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -7,8 +8,13 @@ from typing import List
|
|||||||
import pytest
|
import pytest
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
|
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
|
||||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
from fixtures.neon_fixtures import (
|
||||||
from fixtures.types import Lsn, TenantId
|
NeonEnv,
|
||||||
|
NeonEnvBuilder,
|
||||||
|
RemoteStorageKind,
|
||||||
|
available_remote_storages,
|
||||||
|
)
|
||||||
|
from fixtures.types import Lsn, TenantId, TimelineId
|
||||||
from prometheus_client.samples import Sample
|
from prometheus_client.samples import Sample
|
||||||
|
|
||||||
|
|
||||||
@@ -201,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
|
|||||||
|
|
||||||
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
|
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
|
||||||
assert post_detach_samples == set()
|
assert post_detach_samples == set()
|
||||||
|
|
||||||
|
|
||||||
|
# Check that empty tenants work with or without the remote storage
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
|
||||||
|
)
|
||||||
|
def test_pageserver_with_empty_tenants(
|
||||||
|
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||||
|
):
|
||||||
|
neon_env_builder.enable_remote_storage(
|
||||||
|
remote_storage_kind=remote_storage_kind,
|
||||||
|
test_name="test_pageserver_with_empty_tenants",
|
||||||
|
)
|
||||||
|
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
client = env.pageserver.http_client()
|
||||||
|
|
||||||
|
tenant_without_timelines_dir = env.initial_tenant
|
||||||
|
log.info(
|
||||||
|
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
|
||||||
|
)
|
||||||
|
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
|
||||||
|
|
||||||
|
tenant_with_empty_timelines_dir = client.tenant_create()
|
||||||
|
log.info(
|
||||||
|
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
|
||||||
|
)
|
||||||
|
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
|
||||||
|
for temp_timeline in temp_timelines:
|
||||||
|
client.timeline_delete(
|
||||||
|
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
|
||||||
|
)
|
||||||
|
files_in_timelines_dir = sum(
|
||||||
|
1
|
||||||
|
for _p in Path.iterdir(
|
||||||
|
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
files_in_timelines_dir == 0
|
||||||
|
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
|
||||||
|
|
||||||
|
# Trigger timeline reinitialization after pageserver restart
|
||||||
|
env.postgres.stop_all()
|
||||||
|
env.pageserver.stop()
|
||||||
|
env.pageserver.start()
|
||||||
|
|
||||||
|
client = env.pageserver.http_client()
|
||||||
|
tenants = client.tenant_list()
|
||||||
|
|
||||||
|
assert (
|
||||||
|
len(tenants) == 1
|
||||||
|
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
|
||||||
|
loaded_tenant = tenants[0]
|
||||||
|
assert loaded_tenant["id"] == str(
|
||||||
|
tenant_with_empty_timelines_dir
|
||||||
|
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
|
||||||
|
assert loaded_tenant["state"] == {
|
||||||
|
"Active": {"background_jobs_running": False}
|
||||||
|
}, "Empty tenant should be loaded and ready for timeline creation"
|
||||||
|
|||||||
@@ -7,19 +7,25 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
from typing import List, Tuple
|
from typing import List, Tuple
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from fixtures.log_helper import log
|
||||||
from fixtures.neon_fixtures import (
|
from fixtures.neon_fixtures import (
|
||||||
NeonEnv,
|
NeonEnv,
|
||||||
NeonEnvBuilder,
|
NeonEnvBuilder,
|
||||||
|
NeonPageserverHttpClient,
|
||||||
Postgres,
|
Postgres,
|
||||||
RemoteStorageKind,
|
RemoteStorageKind,
|
||||||
available_remote_storages,
|
available_remote_storages,
|
||||||
wait_for_last_record_lsn,
|
wait_for_last_record_lsn,
|
||||||
wait_for_upload,
|
wait_for_upload,
|
||||||
|
wait_until,
|
||||||
)
|
)
|
||||||
from fixtures.types import Lsn, TenantId, TimelineId
|
from fixtures.types import Lsn, TenantId, TimelineId
|
||||||
|
from fixtures.utils import query_scalar
|
||||||
|
|
||||||
|
|
||||||
async def tenant_workload(env: NeonEnv, pg: Postgres):
|
async def tenant_workload(env: NeonEnv, pg: Postgres):
|
||||||
@@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
|
|||||||
# run final checkpoint manually to flush all the data to remote storage
|
# run final checkpoint manually to flush all the data to remote storage
|
||||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
|
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||||
|
def test_tenants_attached_after_download(
|
||||||
|
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||||
|
):
|
||||||
|
neon_env_builder.enable_remote_storage(
|
||||||
|
remote_storage_kind=remote_storage_kind,
|
||||||
|
test_name="remote_storage_kind",
|
||||||
|
)
|
||||||
|
|
||||||
|
data_id = 1
|
||||||
|
data_secret = "very secret secret"
|
||||||
|
|
||||||
|
##### First start, insert secret data and upload it to the remote storage
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
pageserver_http = env.pageserver.http_client()
|
||||||
|
pg = env.postgres.create_start("main")
|
||||||
|
|
||||||
|
client = env.pageserver.http_client()
|
||||||
|
|
||||||
|
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||||
|
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||||
|
|
||||||
|
for checkpoint_number in range(1, 3):
|
||||||
|
with pg.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
f"""
|
||||||
|
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
|
||||||
|
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||||
|
|
||||||
|
# wait until pageserver receives that data
|
||||||
|
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
|
||||||
|
|
||||||
|
# run checkpoint manually to be sure that data landed in remote storage
|
||||||
|
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||||
|
|
||||||
|
log.info(f"waiting for checkpoint {checkpoint_number} upload")
|
||||||
|
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||||
|
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||||
|
log.info(f"upload of checkpoint {checkpoint_number} is done")
|
||||||
|
|
||||||
|
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
|
||||||
|
env.postgres.stop_all()
|
||||||
|
env.pageserver.stop()
|
||||||
|
|
||||||
|
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||||
|
local_layer_deleted = False
|
||||||
|
for path in Path.iterdir(timeline_dir):
|
||||||
|
if path.name.startswith("00000"):
|
||||||
|
# Looks like a layer file. Remove it
|
||||||
|
os.remove(path)
|
||||||
|
local_layer_deleted = True
|
||||||
|
break
|
||||||
|
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
|
||||||
|
|
||||||
|
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
|
||||||
|
env.pageserver.start()
|
||||||
|
client = env.pageserver.http_client()
|
||||||
|
|
||||||
|
wait_until(
|
||||||
|
number_of_iterations=5,
|
||||||
|
interval=1,
|
||||||
|
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
restored_timelines = client.timeline_list(tenant_id)
|
||||||
|
assert (
|
||||||
|
len(restored_timelines) == 1
|
||||||
|
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
|
||||||
|
retored_timeline = restored_timelines[0]
|
||||||
|
assert retored_timeline["timeline_id"] == str(
|
||||||
|
timeline_id
|
||||||
|
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
|
||||||
|
|
||||||
|
|
||||||
|
def expect_tenant_to_download_timeline(
|
||||||
|
client: NeonPageserverHttpClient,
|
||||||
|
tenant_id: TenantId,
|
||||||
|
):
|
||||||
|
for tenant in client.tenant_list():
|
||||||
|
if tenant["id"] == str(tenant_id):
|
||||||
|
assert not tenant.get(
|
||||||
|
"has_in_progress_downloads", True
|
||||||
|
), f"Tenant {tenant_id} should have no downloads in progress"
|
||||||
|
return
|
||||||
|
assert False, f"Tenant {tenant_id} is missing on pageserver"
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
|
|||||||
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
|
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
|
||||||
bytes = { version = "1", features = ["serde", "std"] }
|
bytes = { version = "1", features = ["serde", "std"] }
|
||||||
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
|
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
|
||||||
|
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
|
||||||
either = { version = "1", features = ["use_std"] }
|
either = { version = "1", features = ["use_std"] }
|
||||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||||
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
||||||
|
|||||||
Reference in New Issue
Block a user