mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-11 20:30:37 +00:00
Compare commits
31 Commits
partial_im
...
ps-trace
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d7bce5f72 | ||
|
|
42a0f5be00 | ||
|
|
d4065f2e85 | ||
|
|
5315614974 | ||
|
|
b99bed510d | ||
|
|
580584c8fc | ||
|
|
d823e84ed5 | ||
|
|
231dfbaed6 | ||
|
|
5cf53786f9 | ||
|
|
9b9bbad462 | ||
|
|
537b2c1ae6 | ||
|
|
293bc69e4f | ||
|
|
e9b158e8f5 | ||
|
|
8c09ff1764 | ||
|
|
a66a3b4ed8 | ||
|
|
6913bedc09 | ||
|
|
8a43dfd573 | ||
|
|
065adcf75b | ||
|
|
a180273dc5 | ||
|
|
6c6aa04ce4 | ||
|
|
cd8c96233f | ||
|
|
ecbda94790 | ||
|
|
0d6d8fefd3 | ||
|
|
d296a76e3e | ||
|
|
646e0f3581 | ||
|
|
0bfc422eb3 | ||
|
|
1209572cec | ||
|
|
42f603cb13 | ||
|
|
6bcfd6441f | ||
|
|
dedb03bb5a | ||
|
|
abb07df028 |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -768,5 +768,5 @@ jobs:
|
||||
- name: Re-deploy proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
105
Cargo.lock
generated
105
Cargo.lock
generated
@@ -497,8 +497,10 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap 3.2.16",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"hyper",
|
||||
"log",
|
||||
"notify",
|
||||
"postgres",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -540,11 +542,11 @@ dependencies = [
|
||||
"git-version",
|
||||
"nix",
|
||||
"once_cell",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"postgres",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"safekeeper",
|
||||
"safekeeper_api",
|
||||
"serde",
|
||||
"serde_with",
|
||||
"tar",
|
||||
@@ -1072,6 +1074,15 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fsevent-sys"
|
||||
version = "4.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.21"
|
||||
@@ -1493,6 +1504,26 @@ dependencies = [
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify-sys"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.12"
|
||||
@@ -1552,6 +1583,26 @@ dependencies = [
|
||||
"simple_asn1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
|
||||
dependencies = [
|
||||
"kqueue-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue-sys"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kstring"
|
||||
version = "1.0.6"
|
||||
@@ -1797,6 +1848,24 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "5.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"crossbeam-channel",
|
||||
"filetime",
|
||||
"fsevent-sys",
|
||||
"inotify",
|
||||
"kqueue",
|
||||
"libc",
|
||||
"mio",
|
||||
"walkdir",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
@@ -1975,6 +2044,7 @@ dependencies = [
|
||||
"nix",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -1989,6 +2059,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"svg_fmt",
|
||||
"tar",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
@@ -2003,6 +2074,17 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"const_format",
|
||||
"serde",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.2"
|
||||
@@ -2891,6 +2973,7 @@ dependencies = [
|
||||
"postgres_ffi",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"safekeeper_api",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
@@ -2906,6 +2989,17 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "safekeeper_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"const_format",
|
||||
"serde",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "same-file"
|
||||
version = "1.0.6"
|
||||
@@ -3240,6 +3334,12 @@ version = "2.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||
|
||||
[[package]]
|
||||
name = "svg_fmt"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "8.8.0"
|
||||
@@ -4142,6 +4242,7 @@ dependencies = [
|
||||
"bstr",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crossbeam-utils",
|
||||
"either",
|
||||
"fail",
|
||||
"hashbrown",
|
||||
|
||||
@@ -8,8 +8,10 @@ anyhow = "1.0"
|
||||
chrono = "0.4"
|
||||
clap = "3.0"
|
||||
env_logger = "0.9"
|
||||
futures = "0.3.13"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
log = { version = "0.4", features = ["std", "serde"] }
|
||||
notify = "5.0.0"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
regex = "1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
||||
@@ -258,14 +258,7 @@ impl ComputeNode {
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
|
||||
// Try default Postgres port if it is not provided
|
||||
let port = self
|
||||
.spec
|
||||
.cluster
|
||||
.settings
|
||||
.find("port")
|
||||
.unwrap_or_else(|| "5432".to_string());
|
||||
wait_for_postgres(&mut pg, &port, pgdata_path)?;
|
||||
wait_for_postgres(&mut pg, pgdata_path)?;
|
||||
|
||||
// If connection fails,
|
||||
// it may be the old node with `zenith_admin` superuser.
|
||||
|
||||
@@ -1,18 +1,19 @@
|
||||
use std::fmt::Write;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
use std::str::FromStr;
|
||||
use std::{fs, thread, time};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use postgres::{Client, Transaction};
|
||||
use serde::Deserialize;
|
||||
|
||||
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
|
||||
use notify::{RecursiveMode, Watcher};
|
||||
|
||||
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||
|
||||
/// Rust representation of Postgres role info with only those fields
|
||||
/// that matter for us.
|
||||
@@ -230,52 +231,85 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
|
||||
Ok(postgres_dbs)
|
||||
}
|
||||
|
||||
/// Wait for Postgres to become ready to accept connections:
|
||||
/// - state should be `ready` in the `pgdata/postmaster.pid`
|
||||
/// - and we should be able to connect to 127.0.0.1:5432
|
||||
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
|
||||
/// Wait for Postgres to become ready to accept connections. It's ready to
|
||||
/// accept connections when the state-field in `pgdata/postmaster.pid` says
|
||||
/// 'ready'.
|
||||
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
let pid_path = pgdata.join("postmaster.pid");
|
||||
let mut slept: u64 = 0; // ms
|
||||
let pause = time::Duration::from_millis(100);
|
||||
|
||||
let timeout = time::Duration::from_millis(10);
|
||||
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
|
||||
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
|
||||
// completed initialization and is ready to accept connections. We want to
|
||||
// react quickly and perform the rest of our initialization as soon as
|
||||
// PostgreSQL starts accepting connections. Use 'notify' to be notified
|
||||
// whenever the PID file is changed, and whenever it changes, read it to
|
||||
// check if it's now "ready".
|
||||
//
|
||||
// You cannot actually watch a file before it exists, so we first watch the
|
||||
// data directory, and once the postmaster.pid file appears, we switch to
|
||||
// watch the file instead. We also wake up every 100 ms to poll, just in
|
||||
// case we miss some events for some reason. Not strictly necessary, but
|
||||
// better safe than sorry.
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let mut watcher = notify::recommended_watcher(move |res| {
|
||||
let _ = tx.send(res);
|
||||
})?;
|
||||
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
|
||||
|
||||
let started_at = Instant::now();
|
||||
let mut postmaster_pid_seen = false;
|
||||
loop {
|
||||
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
|
||||
// but postgres starts listening almost immediately, even if it is not really
|
||||
// ready to accept connections).
|
||||
if slept >= POSTGRES_WAIT_TIMEOUT {
|
||||
bail!("timed out while waiting for Postgres to start");
|
||||
}
|
||||
|
||||
if let Ok(Some(status)) = pg.try_wait() {
|
||||
// Postgres exited, that is not what we expected, bail out earlier.
|
||||
let code = status.code().unwrap_or(-1);
|
||||
bail!("Postgres exited unexpectedly with code {}", code);
|
||||
}
|
||||
|
||||
let res = rx.recv_timeout(Duration::from_millis(100));
|
||||
log::debug!("woken up by notify: {res:?}");
|
||||
// If there are multiple events in the channel already, we only need to be
|
||||
// check once. Swallow the extra events before we go ahead to check the
|
||||
// pid file.
|
||||
while let Ok(res) = rx.try_recv() {
|
||||
log::debug!("swallowing extra event: {res:?}");
|
||||
}
|
||||
|
||||
// Check that we can open pid file first.
|
||||
if let Ok(file) = File::open(&pid_path) {
|
||||
if !postmaster_pid_seen {
|
||||
log::debug!("postmaster.pid appeared");
|
||||
watcher
|
||||
.unwatch(pgdata)
|
||||
.expect("Failed to remove pgdata dir watch");
|
||||
watcher
|
||||
.watch(&pid_path, RecursiveMode::NonRecursive)
|
||||
.expect("Failed to add postmaster.pid file watch");
|
||||
postmaster_pid_seen = true;
|
||||
}
|
||||
|
||||
let file = BufReader::new(file);
|
||||
let last_line = file.lines().last();
|
||||
|
||||
// Pid file could be there and we could read it, but it could be empty, for example.
|
||||
if let Some(Ok(line)) = last_line {
|
||||
let status = line.trim();
|
||||
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
|
||||
log::debug!("last line of postmaster.pid: {status:?}");
|
||||
|
||||
// Now Postgres is ready to accept connections
|
||||
if status == "ready" && can_connect {
|
||||
if status == "ready" {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
thread::sleep(pause);
|
||||
slept += 100;
|
||||
// Give up after POSTGRES_WAIT_TIMEOUT.
|
||||
let duration = started_at.elapsed();
|
||||
if duration >= POSTGRES_WAIT_TIMEOUT {
|
||||
bail!("timed out while waiting for Postgres to start");
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("PostgreSQL is now running, continuing to configure it");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,9 @@ thiserror = "1"
|
||||
nix = "0.23"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
||||
|
||||
pageserver = { path = "../pageserver" }
|
||||
safekeeper = { path = "../safekeeper" }
|
||||
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
|
||||
# instead, so that recompile times are better.
|
||||
pageserver_api = { path = "../libs/pageserver_api" }
|
||||
safekeeper_api = { path = "../libs/safekeeper_api" }
|
||||
utils = { path = "../libs/utils" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage::PageServerNode;
|
||||
use control_plane::{etcd, local_env};
|
||||
use pageserver::config::defaults::{
|
||||
use pageserver_api::models::TimelineInfo;
|
||||
use pageserver_api::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
|
||||
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
|
||||
};
|
||||
use pageserver::http::models::TimelineInfo;
|
||||
use safekeeper::defaults::{
|
||||
use safekeeper_api::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
};
|
||||
|
||||
@@ -12,7 +12,7 @@ use nix::unistd::Pid;
|
||||
use postgres::Config;
|
||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use safekeeper::http::models::TimelineCreateRequest;
|
||||
use safekeeper_api::models::TimelineCreateRequest;
|
||||
use thiserror::Error;
|
||||
use utils::{
|
||||
connstring::connection_address,
|
||||
|
||||
@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
|
||||
use nix::errno::Errno;
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
use pageserver::http::models::{
|
||||
use pageserver_api::models::{
|
||||
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use postgres::{Config, NoTls};
|
||||
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
|
||||
let url = self.url().to_owned();
|
||||
Err(PageserverHttpError::Response(
|
||||
match self.json::<HttpErrorBody>() {
|
||||
Ok(err_body) => format!("Error: {}", err_body.msg),
|
||||
Ok(err_body) => format!("Response error: {}", err_body.msg),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
},
|
||||
))
|
||||
@@ -181,14 +181,15 @@ impl PageServerNode {
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<TimelineId> {
|
||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
|
||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
|
||||
.context("Failed to create tenant")?;
|
||||
let initial_timeline_info = self.timeline_create(
|
||||
initial_tenant_id,
|
||||
new_timeline_id,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
)?;
|
||||
).context("Failed to create timeline")?;
|
||||
Ok(initial_timeline_info.timeline_id)
|
||||
}
|
||||
|
||||
@@ -419,6 +420,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
12
libs/pageserver_api/Cargo.toml
Normal file
12
libs/pageserver_api/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "pageserver_api"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_with = "1.12.0"
|
||||
const_format = "0.2.21"
|
||||
|
||||
utils = { path = "../utils" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
9
libs/pageserver_api/src/lib.rs
Normal file
9
libs/pageserver_api/src/lib.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use const_format::formatcp;
|
||||
|
||||
/// Public API types
|
||||
pub mod models;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
@@ -7,7 +7,17 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::tenant::TenantState;
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TenantState {
|
||||
/// Tenant is fully operational, its background jobs might be running or not.
|
||||
Active { background_jobs_running: bool },
|
||||
/// A tenant is recognized by pageserver, but not yet ready to operate:
|
||||
/// e.g. not present locally and being downloaded or being read into memory from the file system.
|
||||
Paused,
|
||||
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
|
||||
Broken,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -42,6 +52,7 @@ pub struct TenantCreateRequest {
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
12
libs/safekeeper_api/Cargo.toml
Normal file
12
libs/safekeeper_api/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "safekeeper_api"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_with = "1.12.0"
|
||||
const_format = "0.2.21"
|
||||
|
||||
utils = { path = "../utils" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
10
libs/safekeeper_api/src/lib.rs
Normal file
10
libs/safekeeper_api/src/lib.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
use const_format::formatcp;
|
||||
|
||||
/// Public API types
|
||||
pub mod models;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
@@ -58,6 +58,7 @@ rstar = "0.9.3"
|
||||
num-traits = "0.2.15"
|
||||
amplify_num = "0.4.1"
|
||||
|
||||
pageserver_api = { path = "../libs/pageserver_api" }
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
etcd_broker = { path = "../libs/etcd_broker" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
@@ -66,6 +67,7 @@ remote_storage = { path = "../libs/remote_storage" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
close_fds = "0.3.2"
|
||||
walkdir = "2.3.2"
|
||||
svg_fmt = "0.4.1"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.3"
|
||||
|
||||
185
pageserver/src/bin/draw_trace.rs
Normal file
185
pageserver/src/bin/draw_trace.rs
Normal file
@@ -0,0 +1,185 @@
|
||||
use clap::{App, Arg};
|
||||
use futures::TryFutureExt;
|
||||
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
|
||||
|
||||
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
|
||||
use std::io::Write;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
io::BufReader,
|
||||
str::FromStr,
|
||||
};
|
||||
use svg_fmt::*;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
};
|
||||
|
||||
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
|
||||
let set: BTreeSet<T> = coords.into_iter().collect();
|
||||
|
||||
let mut map: BTreeMap<T, usize> = BTreeMap::new();
|
||||
for (i, e) in set.iter().enumerate() {
|
||||
map.insert(*e, i);
|
||||
}
|
||||
|
||||
(set.len(), map)
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
// TODO upgrade to struct macro arg parsing
|
||||
let arg_matches = App::new("Pageserver trace visualization tool")
|
||||
.about("Makes a svg file that displays the read pattern")
|
||||
.arg(
|
||||
Arg::new("traces_dir")
|
||||
.takes_value(true)
|
||||
.help("Directory where the read traces are stored"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
// (blkno, lsn)
|
||||
let mut dots = Vec::<(u32, Lsn)>::new();
|
||||
|
||||
|
||||
let mut dump_file = File::create("dump.txt").expect("can't make file");
|
||||
let mut deltas = HashMap::<i32, u32>::new();
|
||||
let mut prev1: u32 = 0;
|
||||
let mut prev2: u32 = 0;
|
||||
let mut prev3: u32 = 0;
|
||||
|
||||
println!("scanning trace ...");
|
||||
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||
for tenant_dir in read_dir(traces_dir)? {
|
||||
let entry = tenant_dir?;
|
||||
let path = entry.path();
|
||||
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
// println!("tenant: {tenant_id}");
|
||||
|
||||
println!("opening {path:?}");
|
||||
for timeline_dir in read_dir(path)? {
|
||||
let entry = timeline_dir?;
|
||||
let path = entry.path();
|
||||
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
// println!("hi");
|
||||
// println!("timeline: {timeline_id}");
|
||||
|
||||
println!("opening {path:?}");
|
||||
for trace_dir in read_dir(path)? {
|
||||
let entry = trace_dir?;
|
||||
let path = entry.path();
|
||||
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
println!("opening {path:?}");
|
||||
let file = File::open(path.clone())?;
|
||||
let mut reader = BufReader::new(file);
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
|
||||
// println!("Parsed message {:?}", msg);
|
||||
match msg {
|
||||
PagestreamFeMessage::Exists(_) => {}
|
||||
PagestreamFeMessage::Nblocks(_) => {}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
|
||||
// dots.push((req.blkno, req.lsn));
|
||||
// HACK
|
||||
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
|
||||
|
||||
let delta1 = (req.blkno as i32) - (prev1 as i32);
|
||||
let delta2 = (req.blkno as i32) - (prev2 as i32);
|
||||
let delta3 = (req.blkno as i32) - (prev3 as i32);
|
||||
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
|
||||
delta1
|
||||
} else {
|
||||
delta2
|
||||
};
|
||||
if i32::abs(delta3) < i32::abs(delta) {
|
||||
delta = delta3;
|
||||
}
|
||||
let delta = delta1;
|
||||
|
||||
prev3 = prev2;
|
||||
prev2 = prev1;
|
||||
prev1 = req.blkno;
|
||||
|
||||
match deltas.get_mut(&delta) {
|
||||
Some(c) => {*c += 1;},
|
||||
None => {deltas.insert(delta, 1);},
|
||||
};
|
||||
|
||||
if delta == 9 {
|
||||
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
|
||||
}
|
||||
},
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
|
||||
// HACK
|
||||
// if dots.len() > 1000 {
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut other = deltas.len();
|
||||
deltas.retain(|_, count| *count > 3);
|
||||
other -= deltas.len();
|
||||
dbg!(other);
|
||||
dbg!(deltas);
|
||||
|
||||
// Collect all coordinates
|
||||
let mut keys: Vec<u32> = vec![];
|
||||
let mut lsns: Vec<Lsn> = vec![];
|
||||
for dot in &dots {
|
||||
keys.push(dot.0);
|
||||
lsns.push(dot.1);
|
||||
}
|
||||
|
||||
// Analyze
|
||||
let (key_max, key_map) = analyze(keys);
|
||||
let (lsn_max, lsn_map) = analyze(lsns);
|
||||
|
||||
// Draw
|
||||
println!("drawing trace ...");
|
||||
let mut svg_file = File::create("out.svg").expect("can't make file");
|
||||
writeln!(
|
||||
&mut svg_file,
|
||||
"{}",
|
||||
BeginSvg {
|
||||
w: (key_max + 1) as f32,
|
||||
h: (lsn_max + 1) as f32,
|
||||
}
|
||||
)?;
|
||||
for (key, lsn) in &dots {
|
||||
let key = key_map.get(&key).unwrap();
|
||||
let lsn = lsn_map.get(&lsn).unwrap();
|
||||
writeln!(
|
||||
&mut svg_file,
|
||||
" {}",
|
||||
rectangle(
|
||||
*key as f32,
|
||||
*lsn as f32,
|
||||
10.0,
|
||||
10.0
|
||||
)
|
||||
.fill(Fill::Color(red()))
|
||||
.stroke(Stroke::Color(black(), 0.0))
|
||||
.border_radius(0.5)
|
||||
)?;
|
||||
// println!(" {}",
|
||||
// rectangle(key_start as f32 + stretch * margin,
|
||||
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
|
||||
// key_diff as f32 - stretch * 2.0 * margin,
|
||||
// stretch * (lsn_diff - 2.0 * margin))
|
||||
// // .fill(rgb(200, 200, 200))
|
||||
// .fill(fill)
|
||||
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
|
||||
// .border_radius(0.4)
|
||||
// );
|
||||
}
|
||||
|
||||
writeln!(&mut svg_file, "{}", EndSvg)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
133
pageserver/src/bin/replay.rs
Normal file
133
pageserver/src/bin/replay.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use bytes::BytesMut;
|
||||
use pageserver::page_service::PagestreamFeMessage;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
io::BufReader,
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
};
|
||||
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||
|
||||
use clap::{App, Arg};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
};
|
||||
|
||||
// TODO put this in library, dedup with stuff in control_plane
|
||||
/// Client for the pageserver's pagestream API
|
||||
struct PagestreamApi {
|
||||
stream: TcpStream,
|
||||
}
|
||||
|
||||
impl PagestreamApi {
|
||||
async fn connect(
|
||||
connstr: &str,
|
||||
tenant: &TenantId,
|
||||
timeline: &TimelineId,
|
||||
) -> anyhow::Result<PagestreamApi> {
|
||||
// Parse connstr
|
||||
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
|
||||
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
|
||||
|
||||
// Connect
|
||||
let mut stream = TcpStream::connect(tcp_addr).await?;
|
||||
let (client, conn) = config
|
||||
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||
.await?;
|
||||
|
||||
// Enter pagestream protocol
|
||||
let init_query = format!("pagestream {} {}", tenant, timeline);
|
||||
tokio::select! {
|
||||
_ = conn => panic!("connection closed during pagestream initialization"),
|
||||
_ = client.query(init_query.as_str(), &[]) => (),
|
||||
};
|
||||
|
||||
Ok(PagestreamApi { stream })
|
||||
}
|
||||
|
||||
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||
let request = {
|
||||
let msg_bytes = msg.serialize();
|
||||
let mut buf = BytesMut::new();
|
||||
let copy_msg = BeMessage::CopyData(&msg_bytes);
|
||||
|
||||
// TODO it's actually a fe message but it doesn't have a serializer yet
|
||||
BeMessage::write(&mut buf, ©_msg)?;
|
||||
buf.freeze()
|
||||
};
|
||||
self.stream.write_all(&request).await?;
|
||||
|
||||
// TODO It's actually a be message, but it doesn't have a parser.
|
||||
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
|
||||
let _response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||
Some(FeMessage::CopyData(page)) => page,
|
||||
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn replay_trace<R: std::io::Read>(
|
||||
reader: &mut R,
|
||||
mut pagestream: PagestreamApi,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
|
||||
println!("Parsed message {:?}", msg);
|
||||
pagestream.make_request(msg).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// TODO upgrade to struct macro arg parsing
|
||||
let arg_matches = App::new("Pageserver trace replay tool")
|
||||
.about("Replays wal or read traces to test pageserver performance")
|
||||
.arg(
|
||||
Arg::new("traces_dir")
|
||||
.takes_value(true)
|
||||
.help("Directory where the read traces are stored"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("pageserver_connstr")
|
||||
.takes_value(true)
|
||||
.help("Pageserver pg endpoint to connect to"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
|
||||
|
||||
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||
for tenant_dir in read_dir(traces_dir)? {
|
||||
let entry = tenant_dir?;
|
||||
let path = entry.path();
|
||||
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for timeline_dir in read_dir(path)? {
|
||||
let entry = timeline_dir?;
|
||||
let path = entry.path();
|
||||
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for trace_dir in read_dir(path)? {
|
||||
let entry = trace_dir?;
|
||||
let path = entry.path();
|
||||
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
// TODO The pageserver deletes existing traces?
|
||||
// LOL yes because I use tenant ID as trace id
|
||||
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
|
||||
|
||||
let file = File::open(path.clone())?;
|
||||
let mut reader = BufReader::new(file);
|
||||
// let len = file.metadata().unwrap().len();
|
||||
// println!("replaying {:?} trace {} bytes", path, len);
|
||||
replay_trace(&mut reader, pagestream).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -30,10 +30,10 @@ pub mod defaults {
|
||||
use crate::tenant_config::defaults::*;
|
||||
use const_format::formatcp;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
pub use pageserver_api::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
@@ -364,6 +364,23 @@ impl PageServerConf {
|
||||
self.timelines_path(tenant_id).join(timeline_id.to_string())
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> PathBuf {
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
pub fn trace_path(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
connection_id: &TimelineId, // TODO make a new type
|
||||
) -> PathBuf {
|
||||
self.traces_path()
|
||||
.join(tenant_id.to_string())
|
||||
.join(timeline_id.to_string())
|
||||
.join(connection_id.to_string())
|
||||
|
||||
}
|
||||
|
||||
/// Points to a place in pageserver's local directory,
|
||||
/// where certain timeline's metadata file should be located.
|
||||
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod models;
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use pageserver_api::models;
|
||||
|
||||
@@ -337,9 +337,16 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
info!("Handling tenant attach {tenant_id}");
|
||||
|
||||
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
|
||||
Ok(_) => Err(ApiError::Conflict(
|
||||
"Tenant is already present locally".to_owned(),
|
||||
)),
|
||||
Ok(tenant) => {
|
||||
if tenant.list_timelines().is_empty() {
|
||||
info!("Attaching to tenant {tenant_id} with zero timelines");
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::Conflict(
|
||||
"Tenant is already present locally".to_owned(),
|
||||
))
|
||||
}
|
||||
}
|
||||
Err(_) => Ok(()),
|
||||
})
|
||||
.await
|
||||
@@ -610,6 +617,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(trace_read_requests) = request_data.trace_read_requests {
|
||||
tenant_conf.trace_read_requests = Some(trace_read_requests);
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
|
||||
@@ -16,6 +16,8 @@ pub mod tenant;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_tasks;
|
||||
// pub mod timelines;
|
||||
pub mod trace;
|
||||
pub mod virtual_file;
|
||||
pub mod walingest;
|
||||
pub mod walreceiver;
|
||||
|
||||
@@ -10,7 +10,9 @@
|
||||
//
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::Buf;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use futures::{Stream, StreamExt};
|
||||
use regex::Regex;
|
||||
use std::io;
|
||||
@@ -42,6 +44,7 @@ use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant_mgr;
|
||||
use crate::trace::Tracer;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
@@ -49,7 +52,9 @@ use postgres_ffi::to_pg_timestamp;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamFeMessage {
|
||||
// TODO these should be in a library outside the pageserver
|
||||
#[derive(Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
@@ -57,7 +62,7 @@ enum PagestreamFeMessage {
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamBeMessage {
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
@@ -66,106 +71,152 @@ enum PagestreamBeMessage {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamExistsRequest {
|
||||
pub struct PagestreamExistsRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamNblocksRequest {
|
||||
pub struct PagestreamNblocksRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageRequest {
|
||||
pub struct PagestreamGetPageRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
blkno: u32,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamDbSizeRequest {
|
||||
pub struct PagestreamDbSizeRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamExistsResponse {
|
||||
pub struct PagestreamExistsResponse {
|
||||
exists: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamNblocksResponse {
|
||||
pub struct PagestreamNblocksResponse {
|
||||
n_blocks: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageResponse {
|
||||
pub struct PagestreamGetPageResponse {
|
||||
page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamErrorResponse {
|
||||
pub struct PagestreamErrorResponse {
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamDbSizeResponse {
|
||||
pub struct PagestreamDbSizeResponse {
|
||||
db_size: i64,
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
|
||||
pub fn serialize(&self) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(1);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(2);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
}
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(3);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// TODO these gets can fail
|
||||
|
||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||
//
|
||||
// TODO: consider using protobuf or serde bincode for less error prone
|
||||
// serialization.
|
||||
let msg_tag = body.get_u8();
|
||||
let msg_tag = body.read_u8()?;
|
||||
match msg_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
})),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
})),
|
||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.get_u32(),
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
dbnode: body.get_u32(),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -422,6 +473,17 @@ impl PageServerHandler {
|
||||
// so there is no need to reset the association
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
// Make request tracer if needed
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
let path = tenant
|
||||
.conf
|
||||
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
|
||||
Some(Tracer::new(path))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||
|
||||
@@ -446,15 +508,24 @@ impl PageServerHandler {
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Sync) => {
|
||||
// TODO what now?
|
||||
continue;
|
||||
}
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
t.trace(©_data_bytes)
|
||||
}
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
|
||||
@@ -45,6 +45,7 @@ use crate::tenant_config::TenantConfOpt;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
|
||||
pub use pageserver_api::models::TenantState;
|
||||
|
||||
use toml_edit;
|
||||
use utils::{
|
||||
@@ -118,18 +119,6 @@ pub struct Tenant {
|
||||
upload_layers: bool,
|
||||
}
|
||||
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TenantState {
|
||||
/// Tenant is fully operational, its background jobs might be running or not.
|
||||
Active { background_jobs_running: bool },
|
||||
/// A tenant is recognized by pageserver, but not yet ready to operate:
|
||||
/// e.g. not present locally and being downloaded or being read into memory from the file system.
|
||||
Paused,
|
||||
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
|
||||
Broken,
|
||||
}
|
||||
|
||||
/// A repository corresponds to one .neon directory. One repository holds multiple
|
||||
/// timelines, forked off from the same initial call to 'initdb'.
|
||||
impl Tenant {
|
||||
@@ -604,6 +593,14 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||
}
|
||||
|
||||
// TODO is there a need for all this boilerplate?
|
||||
pub fn get_trace_read_requests(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.trace_read_requests
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||
}
|
||||
|
||||
@@ -82,6 +82,10 @@ pub struct TenantConf {
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
/// If enabled, records all read requests for this ... TODO
|
||||
/// Even though read traces are small, there's no delete meachanism so they can
|
||||
/// pile up. So we disable this by default.
|
||||
pub trace_read_requests: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -105,6 +109,7 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -138,6 +143,9 @@ impl TenantConfOpt {
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: self
|
||||
.trace_read_requests
|
||||
.unwrap_or(global_conf.trace_read_requests),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,6 +215,7 @@ impl TenantConf {
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,6 +241,7 @@ impl TenantConf {
|
||||
.unwrap(),
|
||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.unwrap(),
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,6 +108,10 @@ pub fn init_tenant_mgr(
|
||||
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
|
||||
/// and the load continues.
|
||||
///
|
||||
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
|
||||
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
|
||||
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
|
||||
///
|
||||
/// Attach happens on startup and sucessful timeline downloads
|
||||
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
|
||||
pub fn attach_local_tenants(
|
||||
@@ -173,16 +177,28 @@ fn load_local_tenant(
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
match Tenant::load_tenant_config(conf, tenant_id) {
|
||||
Ok(tenant_conf) => {
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
tenant.activate(false);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
|
||||
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
|
||||
if !tenant_timelines_dir.is_dir() {
|
||||
error!(
|
||||
"Tenant {} has no timelines directory at {}",
|
||||
tenant_id,
|
||||
tenant_timelines_dir.display()
|
||||
);
|
||||
tenant.set_state(TenantState::Broken);
|
||||
} else {
|
||||
match Tenant::load_tenant_config(conf, tenant_id) {
|
||||
Ok(tenant_conf) => {
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
tenant.activate(false);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tenant
|
||||
}
|
||||
|
||||
@@ -630,14 +646,10 @@ fn collect_timelines_for_tenant(
|
||||
}
|
||||
|
||||
if tenant_timelines.is_empty() {
|
||||
match remove_if_empty(&timelines_dir) {
|
||||
Ok(true) => info!(
|
||||
"Removed empty tenant timelines directory {}",
|
||||
timelines_dir.display()
|
||||
),
|
||||
Ok(false) => (),
|
||||
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
|
||||
}
|
||||
// this is normal, we've removed all broken, empty and temporary timeline dirs
|
||||
// but should allow the tenant to stay functional and allow creating new timelines
|
||||
// on a restart, we require tenants to have the timelines dir, so leave it on disk
|
||||
debug!("Tenant {tenant_id} has no timelines loaded");
|
||||
}
|
||||
|
||||
Ok((tenant_id, tenant_timelines))
|
||||
|
||||
36
pageserver/src/trace.rs
Normal file
36
pageserver/src/trace.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use bytes::Bytes;
|
||||
use std::{
|
||||
fs::{create_dir_all, File},
|
||||
io::{BufWriter, Write},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
pub struct Tracer {
|
||||
writer: BufWriter<File>,
|
||||
}
|
||||
|
||||
impl Drop for Tracer {
|
||||
fn drop(&mut self) {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl Tracer {
|
||||
pub fn new(path: PathBuf) -> Self {
|
||||
let parent = path.parent().expect("failed to parse parent path");
|
||||
create_dir_all(parent).expect("failed to create trace dir");
|
||||
|
||||
let file = File::create(path).expect("failed to create trace file");
|
||||
Tracer {
|
||||
writer: BufWriter::new(file),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trace(&mut self, msg: &Bytes) {
|
||||
self.writer.write(msg).expect("failed to write trace");
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) {
|
||||
self.writer.flush().expect("failed to flush trace file");
|
||||
}
|
||||
}
|
||||
@@ -264,12 +264,76 @@ pageserver_flush(void)
|
||||
}
|
||||
}
|
||||
|
||||
// Entry of the page_cache
|
||||
typedef struct
|
||||
{
|
||||
// HACK Just xor of bytes lol
|
||||
char request_hash;
|
||||
|
||||
// Points directly to a NeonResponse. We can't just own the
|
||||
// NeonResponse because it's a "supertype", so it's not Sized.
|
||||
char* response;
|
||||
int len;
|
||||
} NeonRequestResponse;
|
||||
|
||||
#define MAX_PAGE_CACHE_SIZE 50
|
||||
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
|
||||
int page_cache_size = 0;
|
||||
int page_cache_head = 0;
|
||||
|
||||
static NeonResponse *
|
||||
pageserver_call(NeonRequest * request)
|
||||
{
|
||||
// Compute hash
|
||||
char hash = 0;
|
||||
StringInfoData req_buff;
|
||||
req_buff = nm_pack_request(request);
|
||||
for (int i = 0; i < req_buff.len; i++) {
|
||||
hash ^= req_buff.data[i];
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
|
||||
// If result is cached, memcpy and return
|
||||
for (int i = 0; i < page_cache_size; i++) {
|
||||
if (page_cache[i].request_hash == hash) {
|
||||
int len = page_cache[0].response->len;
|
||||
NeonResponse *resp = palloc0(len);
|
||||
// I'd rather Rc than memcpy, but this is not rust :(
|
||||
memcpy(resp, page_cache[0].response->data, len);
|
||||
elog(LOG, "cache hit !!!");
|
||||
return resp;
|
||||
}
|
||||
}
|
||||
|
||||
// Send request, get response
|
||||
pageserver_send(request);
|
||||
pageserver_flush();
|
||||
return pageserver_receive();
|
||||
NeonResponse *resp = pageserver_receive();
|
||||
|
||||
// Get length
|
||||
int len = -1;
|
||||
switch (resp->tag) {
|
||||
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
|
||||
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
|
||||
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
|
||||
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
|
||||
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
|
||||
}
|
||||
|
||||
// Cache result
|
||||
page_cache[page_cache_head].request_hash = hash;
|
||||
if (page_cache_head < page_cache_size) {
|
||||
pfree(page_cache[page_cache_head].response);
|
||||
}
|
||||
page_cache[page_cache_head].len = len;
|
||||
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
|
||||
memcpy(page_cache[page_cache_head].response, resp, len);
|
||||
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
|
||||
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
|
||||
page_cache_size += 1;
|
||||
}
|
||||
|
||||
return resp;
|
||||
}
|
||||
|
||||
page_server_api api = {
|
||||
|
||||
@@ -5,6 +5,7 @@ filterwarnings =
|
||||
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
|
||||
addopts =
|
||||
-m 'not remote_cluster'
|
||||
--ignore=test_runner/performance
|
||||
markers =
|
||||
remote_cluster
|
||||
testpaths =
|
||||
|
||||
@@ -33,6 +33,7 @@ toml_edit = { version = "0.13", features = ["easy"] }
|
||||
thiserror = "1"
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
safekeeper_api = { path = "../libs/safekeeper_api" }
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
utils = { path = "../libs/utils" }
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod models;
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use safekeeper_api::models;
|
||||
|
||||
@@ -27,14 +27,13 @@ mod timelines_global_map;
|
||||
pub use timelines_global_map::GlobalTimelines;
|
||||
|
||||
pub mod defaults {
|
||||
use const_format::formatcp;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub use safekeeper_api::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
|
||||
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
|
||||
}
|
||||
|
||||
@@ -56,6 +56,14 @@ If you want to run all tests that have the string "bench" in their names:
|
||||
|
||||
`./scripts/pytest -k bench`
|
||||
|
||||
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
|
||||
|
||||
`./scripts/pytest -n4`
|
||||
|
||||
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
|
||||
|
||||
`./scripts/pytest test_runner/performance`
|
||||
|
||||
Useful environment variables:
|
||||
|
||||
`NEON_BIN`: The directory where neon binaries can be found.
|
||||
|
||||
@@ -91,10 +91,25 @@ class NeonCompare(PgCompare):
|
||||
self._pg_bin = pg_bin
|
||||
self.pageserver_http_client = self.env.pageserver.http_client()
|
||||
|
||||
self.tenant, _ = self.env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
|
||||
# HACK enable request tracing, as an experiment
|
||||
# NOTE This must be done before the pg starts pagestream
|
||||
# TODO why does it not work?
|
||||
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
|
||||
# "trace_read_requests": "true",
|
||||
# })
|
||||
|
||||
# We only use one branch and one timeline
|
||||
self.env.neon_cli.create_branch(branch_name, "empty")
|
||||
self._pg = self.env.postgres.create_start(branch_name)
|
||||
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
|
||||
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||
self._pg = self.env.postgres.create_start(
|
||||
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
|
||||
|
||||
@property
|
||||
def pg(self):
|
||||
@@ -109,10 +124,10 @@ class NeonCompare(PgCompare):
|
||||
return self._pg_bin
|
||||
|
||||
def flush(self):
|
||||
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
|
||||
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
|
||||
|
||||
def compact(self):
|
||||
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
|
||||
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
|
||||
|
||||
def report_peak_memory_use(self) -> None:
|
||||
self.zenbenchmark.record(
|
||||
@@ -124,7 +139,7 @@ class NeonCompare(PgCompare):
|
||||
|
||||
def report_size(self) -> None:
|
||||
timeline_size = self.zenbenchmark.get_timeline_size(
|
||||
self.env.repo_dir, self.env.initial_tenant, self.timeline
|
||||
self.env.repo_dir, self.tenant, self.timeline
|
||||
)
|
||||
self.zenbenchmark.record(
|
||||
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
||||
|
||||
@@ -455,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
|
||||
LOCAL_FS = "local_fs"
|
||||
MOCK_S3 = "mock_s3"
|
||||
REAL_S3 = "real_s3"
|
||||
# Pass to tests that are generic to remote storage
|
||||
# to ensure the test pass with or without the remote storage
|
||||
NOOP = "noop"
|
||||
|
||||
|
||||
def available_remote_storages() -> List[RemoteStorageKind]:
|
||||
@@ -583,7 +586,9 @@ class NeonEnvBuilder:
|
||||
test_name: str,
|
||||
force_enable: bool = True,
|
||||
):
|
||||
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||
return
|
||||
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
||||
@@ -1744,6 +1749,37 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_version)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReplayBin:
|
||||
"""A helper class for replaying pageserver wal and read traces."""
|
||||
|
||||
traces_dir: str
|
||||
|
||||
def replay_all(self, pageserver_connstr):
|
||||
replay_binpath = os.path.join(str(neon_binpath), "replay")
|
||||
args = [
|
||||
replay_binpath,
|
||||
self.traces_dir,
|
||||
pageserver_connstr,
|
||||
]
|
||||
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
|
||||
subprocess.run(args)
|
||||
|
||||
def draw_all(self):
|
||||
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
|
||||
args = [
|
||||
draw_binpath,
|
||||
self.traces_dir,
|
||||
]
|
||||
subprocess.run(args)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def replay_bin(test_output_dir):
|
||||
traces_dir = os.path.join(test_output_dir, "repo", "traces")
|
||||
return ReplayBin(traces_dir)
|
||||
|
||||
|
||||
class VanillaPostgres(PgProtocol):
|
||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||
super().__init__(host="localhost", port=port, dbname="postgres")
|
||||
|
||||
@@ -175,6 +175,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
|
||||
# Run the pgbench tests, and generate a flamegraph from it
|
||||
# This requires that the pageserver was built with the 'profiling' feature.
|
||||
#
|
||||
|
||||
57
test_runner/performance/test_trace_replay.py
Normal file
57
test_runner/performance/test_trace_replay.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
|
||||
|
||||
|
||||
# This test is a demonstration of how to do trace playback. With the current
|
||||
# workload it uses it's not testing anything meaningful.
|
||||
def test_trace_replay(
|
||||
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
|
||||
):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||
# env.neon_cli.config_tenant(tenant, conf={
|
||||
# "trace_read_requests": "true",
|
||||
# })
|
||||
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
|
||||
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
|
||||
|
||||
with zenbenchmark.record_duration("run"):
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("create table t (i integer);")
|
||||
cur.execute(f"insert into t values (generate_series(1,{10000}));")
|
||||
cur.execute("select count(*) from t;")
|
||||
|
||||
# Stop pg so we drop the connection and flush the traces
|
||||
pg.stop()
|
||||
|
||||
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||
# env.neon_cli.config_tenant(tenant, conf={
|
||||
# "trace_read_requests": "false",
|
||||
# })
|
||||
|
||||
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
|
||||
# assert trace_path.exists()
|
||||
|
||||
replay_bin.draw_all()
|
||||
return
|
||||
|
||||
print("replaying")
|
||||
ps_connstr = env.pageserver.connstr()
|
||||
with zenbenchmark.record_duration("replay"):
|
||||
output = replay_bin.replay_all(ps_connstr)
|
||||
print(output)
|
||||
@@ -1,6 +1,7 @@
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2.extras
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import shutil
|
||||
from contextlib import closing
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
@@ -7,8 +8,13 @@ from typing import List
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.types import Lsn, TenantId
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
@@ -201,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
|
||||
|
||||
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
|
||||
assert post_detach_samples == set()
|
||||
|
||||
|
||||
# Check that empty tenants work with or without the remote storage
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
|
||||
)
|
||||
def test_pageserver_with_empty_tenants(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_pageserver_with_empty_tenants",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_without_timelines_dir = env.initial_tenant
|
||||
log.info(
|
||||
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
|
||||
)
|
||||
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
|
||||
|
||||
tenant_with_empty_timelines_dir = client.tenant_create()
|
||||
log.info(
|
||||
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
|
||||
)
|
||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
|
||||
for temp_timeline in temp_timelines:
|
||||
client.timeline_delete(
|
||||
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
|
||||
)
|
||||
files_in_timelines_dir = sum(
|
||||
1
|
||||
for _p in Path.iterdir(
|
||||
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
|
||||
)
|
||||
)
|
||||
assert (
|
||||
files_in_timelines_dir == 0
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
|
||||
|
||||
# Trigger timeline reinitialization after pageserver restart
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
tenants = client.tenant_list()
|
||||
|
||||
assert (
|
||||
len(tenants) == 1
|
||||
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
|
||||
loaded_tenant = tenants[0]
|
||||
assert loaded_tenant["id"] == str(
|
||||
tenant_with_empty_timelines_dir
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
|
||||
assert loaded_tenant["state"] == {
|
||||
"Active": {"background_jobs_running": False}
|
||||
}, "Empty tenant should be loaded and ready for timeline creation"
|
||||
|
||||
@@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
|
||||
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
|
||||
bytes = { version = "1", features = ["serde", "std"] }
|
||||
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
|
||||
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
|
||||
either = { version = "1", features = ["use_std"] }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
||||
|
||||
Reference in New Issue
Block a user