Compare commits

...

34 Commits

Author SHA1 Message Date
Bojan Serafimov
0d7bce5f72 Fix segfault 2022-10-06 15:35:59 -04:00
Bojan Serafimov
42a0f5be00 WIP 2022-10-06 13:18:31 -04:00
Bojan Serafimov
d4065f2e85 Merge branch 'main' into ps-trace 2022-10-05 13:24:06 -04:00
Bojan Serafimov
5315614974 WIP 2022-10-05 13:21:42 -04:00
Sergey Melnikov
b99bed510d Move proxies to neon-proxy namespace (#2555) 2022-10-05 16:14:09 +03:00
sharnoff
580584c8fc Remove control_plane deps on pageserver/safekeeper (#2513)
Creates new `pageserver_api` and `safekeeper_api` crates to serve as the
shared dependencies. Should reduce both recompile times and cold compile
times.

Decreases the size of the optimized `neon_local` binary: 380M -> 179M.
No significant changes for anything else (mostly as expected).
2022-10-04 11:14:45 -07:00
Kirill Bulatov
d823e84ed5 Allow attaching tenants with zero timelines 2022-10-04 18:13:51 +03:00
Kirill Bulatov
231dfbaed6 Do not remove empty timelines/ directory for tenants 2022-10-04 18:13:51 +03:00
Dmitry Rodionov
5cf53786f9 Improve pytest ergonomics
1. Disable perf tests by default
2. Add instruction to run tests in parallel
2022-10-04 14:53:01 +03:00
Heikki Linnakangas
9b9bbad462 Use 'notify' crate to wait for PostgreSQL startup.
Compute node startup time is very important. After launching
PostgreSQL, use 'notify' to be notified immediately when it has
updated the PID file, instead of polling. The polling loop had 100 ms
interval so this shaves up to 100 ms from the startup time.
2022-10-04 13:00:15 +03:00
Heikki Linnakangas
537b2c1ae6 Remove unnecessary check for open PostgreSQL TCP port.
The loop checked if the TCP port is open for connections, by trying to
connect to it. That seems unnecessary. By the time the postmaster.pid
file says that it's ready, the port should be open. Remove that check.
2022-10-04 12:09:13 +03:00
Joonas Koivunen
31123d1fa8 Silence clippies, minor doc fix (#2543)
* doc: remove stray backtick

* chore: clippy::let_unit_value

* chore: silence useless_transmute, duplicate_mod

* chore: remove allowing deref_nullptr

not needed since bindgen 0.60.0.

* chore: remove repeated allowed lints

they are already allowed from the crate root.
2022-10-03 17:44:17 +03:00
Kirill Bulatov
4f2ac51bdd Bump rustc to 1.61 2022-10-03 16:36:03 +03:00
Kirill Bulatov
7b2f9dc908 Reuse existing tenants during attach (#2540) 2022-10-03 13:33:55 +03:00
Bojan Serafimov
293bc69e4f Implement drawer 2022-09-15 21:57:16 -04:00
Bojan Serafimov
e9b158e8f5 Add draw binary 2022-09-15 21:14:07 -04:00
Bojan Serafimov
8c09ff1764 Add todo 2022-09-15 20:53:26 -04:00
Bojan Serafimov
a66a3b4ed8 Fix merge bugs 2022-09-15 20:38:09 -04:00
Bojan Serafimov
6913bedc09 Merge branch 'main' into ps-trace 2022-09-15 20:15:00 -04:00
Bojan Serafimov
8a43dfd573 Hack read traces into NeonCompare 2022-09-14 14:21:04 -04:00
Bojan Serafimov
065adcf75b Measure duration 2022-09-13 16:38:46 -04:00
Bojan Serafimov
a180273dc5 Add todo 2022-09-13 16:34:56 -04:00
Bojan Serafimov
6c6aa04ce4 Fix bugs 2022-09-13 16:24:09 -04:00
Bojan Serafimov
cd8c96233f Actually connect 2022-09-13 15:29:36 -04:00
Bojan Serafimov
ecbda94790 Connect via pagestream 2022-09-13 14:09:15 -04:00
Bojan Serafimov
0d6d8fefd3 Add replay binary 2022-09-12 16:10:19 -04:00
Bojan Serafimov
d296a76e3e Pass test 2022-09-12 15:28:00 -04:00
Bojan Serafimov
646e0f3581 Write traces to file 2022-09-12 14:43:07 -04:00
Bojan Serafimov
0bfc422eb3 Add trace file path 2022-09-12 13:38:55 -04:00
Bojan Serafimov
1209572cec Pass msg to tracer 2022-09-12 10:36:56 -04:00
Bojan Serafimov
42f603cb13 Fix type error 2022-09-12 10:32:48 -04:00
Bojan Serafimov
6bcfd6441f Add tracer 2022-09-12 10:26:31 -04:00
Bojan Serafimov
dedb03bb5a Parse setting 2022-09-09 14:45:24 -04:00
Bojan Serafimov
abb07df028 Plan request tracing implementation 2022-09-09 00:46:13 -04:00
49 changed files with 1215 additions and 195 deletions

View File

@@ -127,8 +127,8 @@ jobs:
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-
- name: Cache postgres v14 build
id: cache_pg_14
@@ -389,7 +389,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git/
target/
key: v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
key: v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact
uses: ./.github/actions/download
@@ -768,5 +768,5 @@ jobs:
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -106,7 +106,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git
target
key: v4-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
key: v5-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
- name: Run cargo clippy
run: ./run_clippy.sh

105
Cargo.lock generated
View File

@@ -497,8 +497,10 @@ dependencies = [
"chrono",
"clap 3.2.16",
"env_logger",
"futures",
"hyper",
"log",
"notify",
"postgres",
"regex",
"serde",
@@ -540,11 +542,11 @@ dependencies = [
"git-version",
"nix",
"once_cell",
"pageserver",
"pageserver_api",
"postgres",
"regex",
"reqwest",
"safekeeper",
"safekeeper_api",
"serde",
"serde_with",
"tar",
@@ -1072,6 +1074,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures"
version = "0.3.21"
@@ -1493,6 +1504,26 @@ dependencies = [
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -1552,6 +1583,26 @@ dependencies = [
"simple_asn1",
]
[[package]]
name = "kqueue"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "kstring"
version = "1.0.6"
@@ -1797,6 +1848,24 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -1975,6 +2044,7 @@ dependencies = [
"nix",
"num-traits",
"once_cell",
"pageserver_api",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -1989,6 +2059,7 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"svg_fmt",
"tar",
"tempfile",
"thiserror",
@@ -2003,6 +2074,17 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -2891,6 +2973,7 @@ dependencies = [
"postgres_ffi",
"regex",
"remote_storage",
"safekeeper_api",
"serde",
"serde_json",
"serde_with",
@@ -2906,6 +2989,17 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "safekeeper_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "same-file"
version = "1.0.6"
@@ -3240,6 +3334,12 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "svg_fmt"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
[[package]]
name = "symbolic-common"
version = "8.8.0"
@@ -4142,6 +4242,7 @@ dependencies = [
"bstr",
"bytes",
"chrono",
"crossbeam-utils",
"either",
"fail",
"hashbrown",

View File

@@ -8,8 +8,10 @@ anyhow = "1.0"
chrono = "0.4"
clap = "3.0"
env_logger = "0.9"
futures = "0.3.13"
hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] }
notify = "5.0.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -258,14 +258,7 @@ impl ComputeNode {
.spawn()
.expect("cannot start postgres process");
// Try default Postgres port if it is not provided
let port = self
.spec
.cluster
.settings
.find("port")
.unwrap_or_else(|| "5432".to_string());
wait_for_postgres(&mut pg, &port, pgdata_path)?;
wait_for_postgres(&mut pg, pgdata_path)?;
// If connection fails,
// it may be the old node with `zenith_admin` superuser.

View File

@@ -1,18 +1,19 @@
use std::fmt::Write;
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::net::{SocketAddr, TcpStream};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Child;
use std::str::FromStr;
use std::{fs, thread, time};
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use postgres::{Client, Transaction};
use serde::Deserialize;
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
use notify::{RecursiveMode, Watcher};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
@@ -230,52 +231,85 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
Ok(postgres_dbs)
}
/// Wait for Postgres to become ready to accept connections:
/// - state should be `ready` in the `pgdata/postmaster.pid`
/// - and we should be able to connect to 127.0.0.1:5432
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");
let mut slept: u64 = 0; // ms
let pause = time::Duration::from_millis(100);
let timeout = time::Duration::from_millis(10);
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
// completed initialization and is ready to accept connections. We want to
// react quickly and perform the rest of our initialization as soon as
// PostgreSQL starts accepting connections. Use 'notify' to be notified
// whenever the PID file is changed, and whenever it changes, read it to
// check if it's now "ready".
//
// You cannot actually watch a file before it exists, so we first watch the
// data directory, and once the postmaster.pid file appears, we switch to
// watch the file instead. We also wake up every 100 ms to poll, just in
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res| {
let _ = tx.send(res);
})?;
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
let started_at = Instant::now();
let mut postmaster_pid_seen = false;
loop {
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
// but postgres starts listening almost immediately, even if it is not really
// ready to accept connections).
if slept >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
if let Ok(Some(status)) = pg.try_wait() {
// Postgres exited, that is not what we expected, bail out earlier.
let code = status.code().unwrap_or(-1);
bail!("Postgres exited unexpectedly with code {}", code);
}
let res = rx.recv_timeout(Duration::from_millis(100));
log::debug!("woken up by notify: {res:?}");
// If there are multiple events in the channel already, we only need to be
// check once. Swallow the extra events before we go ahead to check the
// pid file.
while let Ok(res) = rx.try_recv() {
log::debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
log::debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
watcher
.watch(&pid_path, RecursiveMode::NonRecursive)
.expect("Failed to add postmaster.pid file watch");
postmaster_pid_seen = true;
}
let file = BufReader::new(file);
let last_line = file.lines().last();
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
log::debug!("last line of postmaster.pid: {status:?}");
// Now Postgres is ready to accept connections
if status == "ready" && can_connect {
if status == "ready" {
break;
}
}
}
thread::sleep(pause);
slept += 100;
// Give up after POSTGRES_WAIT_TIMEOUT.
let duration = started_at.elapsed();
if duration >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
}
log::info!("PostgreSQL is now running, continuing to configure it");
Ok(())
}

View File

@@ -19,7 +19,9 @@ thiserror = "1"
nix = "0.23"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
pageserver = { path = "../pageserver" }
safekeeper = { path = "../safekeeper" }
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.
pageserver_api = { path = "../libs/pageserver_api" }
safekeeper_api = { path = "../libs/safekeeper_api" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env};
use pageserver::config::defaults::{
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use pageserver::http::models::TimelineInfo;
use safekeeper::defaults::{
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};

View File

@@ -12,7 +12,7 @@ use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use safekeeper::http::models::TimelineCreateRequest;
use safekeeper_api::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
connstring::connection_address,

View File

@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{
use pageserver_api::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres::{Config, NoTls};
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
let url = self.url().to_owned();
Err(PageserverHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Error: {}", err_body.msg),
Ok(err_body) => format!("Response error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
@@ -181,14 +181,15 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
.context("Failed to create tenant")?;
let initial_timeline_info = self.timeline_create(
initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
).context("Failed to create timeline")?;
Ok(initial_timeline_info.timeline_id)
}
@@ -419,6 +420,11 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -96,7 +96,7 @@ A single virtual environment with all dependencies is described in the single `P
sudo apt install python3.9
```
- Install `poetry`
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation)`.
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation).
- Install dependencies via `./scripts/pysync`.
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
so if you have different version some linting tools can yield different result locally vs in the CI.

View File

@@ -0,0 +1,12 @@
[package]
name = "pageserver_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,9 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -7,7 +7,17 @@ use utils::{
lsn::Lsn,
};
use crate::tenant::TenantState;
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
@@ -42,6 +52,7 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
#[serde_as]

View File

@@ -3,9 +3,11 @@
#![allow(non_snake_case)]
// bindgen creates some unsafe code with no doc comments.
#![allow(clippy::missing_safety_doc)]
// suppress warnings on rust 1.53 due to bindgen unit tests.
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
#![allow(clippy::useless_transmute)]
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;

View File

@@ -57,12 +57,10 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
}
#[allow(non_snake_case)]
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,
offset: u32,
@@ -71,7 +69,6 @@ pub fn XLogSegNoOffsetToRecPtr(
segno * (wal_segsz_bytes as u64) + (offset as u64)
}
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
format!(
"{:>08X}{:>08X}{:>08X}",
@@ -81,7 +78,6 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}
#[allow(non_snake_case)]
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
@@ -89,12 +85,10 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
}
#[allow(non_snake_case)]
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
}
#[allow(non_snake_case)]
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
}

View File

@@ -0,0 +1,12 @@
[package]
name = "safekeeper_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,10 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -58,6 +58,7 @@ rstar = "0.9.3"
num-traits = "0.2.15"
amplify_num = "0.4.1"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
@@ -66,6 +67,7 @@ remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -0,0 +1,185 @@
use clap::{App, Arg};
use futures::TryFutureExt;
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
use std::io::Write;
use std::{
fs::{read_dir, File},
io::BufReader,
str::FromStr,
};
use svg_fmt::*;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
(set.len(), map)
}
fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace visualization tool")
.about("Makes a svg file that displays the read pattern")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.get_matches();
// (blkno, lsn)
let mut dots = Vec::<(u32, Lsn)>::new();
let mut dump_file = File::create("dump.txt").expect("can't make file");
let mut deltas = HashMap::<i32, u32>::new();
let mut prev1: u32 = 0;
let mut prev2: u32 = 0;
let mut prev3: u32 = 0;
println!("scanning trace ...");
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("tenant: {tenant_id}");
println!("opening {path:?}");
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("hi");
// println!("timeline: {timeline_id}");
println!("opening {path:?}");
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
println!("opening {path:?}");
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
// println!("Parsed message {:?}", msg);
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetPage(req) => {
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
// dots.push((req.blkno, req.lsn));
// HACK
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
let delta1 = (req.blkno as i32) - (prev1 as i32);
let delta2 = (req.blkno as i32) - (prev2 as i32);
let delta3 = (req.blkno as i32) - (prev3 as i32);
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
delta1
} else {
delta2
};
if i32::abs(delta3) < i32::abs(delta) {
delta = delta3;
}
let delta = delta1;
prev3 = prev2;
prev2 = prev1;
prev1 = req.blkno;
match deltas.get_mut(&delta) {
Some(c) => {*c += 1;},
None => {deltas.insert(delta, 1);},
};
if delta == 9 {
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
}
},
PagestreamFeMessage::DbSize(_) => {}
};
// HACK
// if dots.len() > 1000 {
// break;
// }
}
}
}
}
let mut other = deltas.len();
deltas.retain(|_, count| *count > 3);
other -= deltas.len();
dbg!(other);
dbg!(deltas);
// Collect all coordinates
let mut keys: Vec<u32> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for dot in &dots {
keys.push(dot.0);
lsns.push(dot.1);
}
// Analyze
let (key_max, key_map) = analyze(keys);
let (lsn_max, lsn_map) = analyze(lsns);
// Draw
println!("drawing trace ...");
let mut svg_file = File::create("out.svg").expect("can't make file");
writeln!(
&mut svg_file,
"{}",
BeginSvg {
w: (key_max + 1) as f32,
h: (lsn_max + 1) as f32,
}
)?;
for (key, lsn) in &dots {
let key = key_map.get(&key).unwrap();
let lsn = lsn_map.get(&lsn).unwrap();
writeln!(
&mut svg_file,
" {}",
rectangle(
*key as f32,
*lsn as f32,
10.0,
10.0
)
.fill(Fill::Color(red()))
.stroke(Stroke::Color(black(), 0.0))
.border_radius(0.5)
)?;
// println!(" {}",
// rectangle(key_start as f32 + stretch * margin,
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
// key_diff as f32 - stretch * 2.0 * margin,
// stretch * (lsn_diff - 2.0 * margin))
// // .fill(rgb(200, 200, 200))
// .fill(fill)
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
// .border_radius(0.4)
// );
}
writeln!(&mut svg_file, "{}", EndSvg)?;
Ok(())
}

View File

@@ -0,0 +1,133 @@
use bytes::BytesMut;
use pageserver::page_service::PagestreamFeMessage;
use std::{
fs::{read_dir, File},
io::BufReader,
path::PathBuf,
str::FromStr,
};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use clap::{App, Arg};
use utils::{
id::{TenantId, TimelineId},
pq_proto::{BeMessage, FeMessage},
};
// TODO put this in library, dedup with stuff in control_plane
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream,
}
impl PagestreamApi {
async fn connect(
connstr: &str,
tenant: &TenantId,
timeline: &TimelineId,
) -> anyhow::Result<PagestreamApi> {
// Parse connstr
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
// Connect
let mut stream = TcpStream::connect(tcp_addr).await?;
let (client, conn) = config
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant, timeline);
tokio::select! {
_ = conn => panic!("connection closed during pagestream initialization"),
_ = client.query(init_query.as_str(), &[]) => (),
};
Ok(PagestreamApi { stream })
}
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let request = {
let msg_bytes = msg.serialize();
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&msg_bytes);
// TODO it's actually a fe message but it doesn't have a serializer yet
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
self.stream.write_all(&request).await?;
// TODO It's actually a be message, but it doesn't have a parser.
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
let _response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
Ok(())
}
}
async fn replay_trace<R: std::io::Read>(
reader: &mut R,
mut pagestream: PagestreamApi,
) -> anyhow::Result<()> {
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
println!("Parsed message {:?}", msg);
pagestream.make_request(msg).await?;
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace replay tool")
.about("Replays wal or read traces to test pageserver performance")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.arg(
Arg::new("pageserver_connstr")
.takes_value(true)
.help("Pageserver pg endpoint to connect to"),
)
.get_matches();
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// TODO The pageserver deletes existing traces?
// LOL yes because I use tenant ID as trace id
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
// let len = file.metadata().unwrap().len();
// println!("replaying {:?} trace {} bytes", path, len);
replay_trace(&mut reader, pagestream).await?;
}
}
}
Ok(())
}

View File

@@ -30,10 +30,10 @@ pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -364,6 +364,23 @@ impl PageServerConf {
self.timelines_path(tenant_id).join(timeline_id.to_string())
}
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}
pub fn trace_path(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &TimelineId, // TODO make a new type
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;

View File

@@ -337,9 +337,16 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
info!("Handling tenant attach {tenant_id}");
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
Ok(_) => Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
)),
Ok(tenant) => {
if tenant.list_timelines().is_empty() {
info!("Attaching to tenant {tenant_id} with zero timelines");
Ok(())
} else {
Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
))
}
}
Err(_) => Ok(()),
})
.await
@@ -610,6 +617,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {

View File

@@ -16,6 +16,8 @@ pub mod tenant;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_tasks;
// pub mod timelines;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -10,7 +10,9 @@
//
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::Buf;
use bytes::{BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
@@ -42,6 +44,7 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
@@ -49,7 +52,9 @@ use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
// TODO these should be in a library outside the pageserver
#[derive(Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
@@ -57,7 +62,7 @@ enum PagestreamFeMessage {
}
// Wrapped in libpq CopyData
enum PagestreamBeMessage {
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
@@ -66,106 +71,152 @@ enum PagestreamBeMessage {
}
#[derive(Debug)]
struct PagestreamExistsRequest {
pub struct PagestreamExistsRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamNblocksRequest {
pub struct PagestreamNblocksRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamGetPageRequest {
pub struct PagestreamGetPageRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
blkno: u32,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug)]
struct PagestreamDbSizeRequest {
pub struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
struct PagestreamExistsResponse {
pub struct PagestreamExistsResponse {
exists: bool,
}
#[derive(Debug)]
struct PagestreamNblocksResponse {
pub struct PagestreamNblocksResponse {
n_blocks: u32,
}
#[derive(Debug)]
struct PagestreamGetPageResponse {
pub struct PagestreamGetPageResponse {
page: Bytes,
}
#[derive(Debug)]
struct PagestreamErrorResponse {
pub struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
struct PagestreamDbSizeResponse {
pub struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.get_u8();
let msg_tag = body.read_u8()?;
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.get_u32(),
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
}
@@ -422,6 +473,17 @@ impl PageServerHandler {
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let mut tracer = if tenant.get_trace_read_requests() {
let path = tenant
.conf
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
Some(Tracer::new(path))
} else {
None
};
// Check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
@@ -446,15 +508,24 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Sync) => {
// TODO what now?
continue;
}
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {

View File

@@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task(
(storage, remote_index_clone, sync_queue),
max_sync_errors,
)
.instrument(info_span!("storage_sync_loop"))
.await;
Ok(())
},

View File

@@ -45,6 +45,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
use toml_edit;
use utils::{
@@ -118,18 +119,6 @@ pub struct Tenant {
upload_layers: bool,
}
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Tenant {
@@ -400,16 +389,19 @@ impl Tenant {
timeline_id,
metadata.pg_version()
);
let timeline = self
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
match timelines_accessor.entry(timeline.timeline_id) {
Entry::Occupied(_) => bail!(
"Found freshly initialized timeline {} in the tenant map",
timeline.timeline_id
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
match timelines_accessor.entry(timeline_id) {
Entry::Occupied(_) => warn!(
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
self.tenant_id, timeline_id
),
Entry::Vacant(v) => {
let timeline = self
.initialize_new_timeline(timeline_id, metadata, ancestor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
v.insert(timeline);
}
}
@@ -601,6 +593,14 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
// TODO is there a need for all this boilerplate?
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
@@ -609,21 +609,14 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
new_metadata: TimelineMetadata,
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<Arc<Timeline>> {
let ancestor = match new_metadata.ancestor_timeline() {
Some(ancestor_timeline_id) => Some(
timelines
.get(&ancestor_timeline_id)
.cloned()
.with_context(|| {
format!(
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
})?,
),
None => None,
};
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
anyhow::ensure!(
ancestor.is_some(),
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
}
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version();
@@ -1080,8 +1073,12 @@ impl Tenant {
)
})?;
let ancestor = new_metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
.cloned();
let new_timeline = self
.initialize_new_timeline(new_timeline_id, new_metadata, timelines)
.initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
.with_context(|| {
format!(
"Failed to initialize timeline {}/{}",

View File

@@ -627,7 +627,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
let _ = spawn_connection_manager_task(
spawn_connection_manager_task(
self.conf.broker_etcd_prefix.clone(),
self_clone,
walreceiver_connect_timeout,

View File

@@ -82,6 +82,10 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
/// If enabled, records all read requests for this ... TODO
/// Even though read traces are small, there's no delete meachanism so they can
/// pile up. So we disable this by default.
pub trace_read_requests: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -105,6 +109,7 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
impl TenantConfOpt {
@@ -138,6 +143,9 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
}
}
@@ -207,6 +215,7 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
}
}
@@ -232,6 +241,7 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
trace_read_requests: false,
}
}
}

View File

@@ -107,6 +107,13 @@ pub fn init_tenant_mgr(
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
/// and the load continues.
///
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
///
/// Attach happens on startup and sucessful timeline downloads
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
pub fn attach_local_tenants(
conf: &'static PageServerConf,
remote_index: &RemoteIndex,
@@ -122,18 +129,20 @@ pub fn attach_local_tenants(
);
debug!("Timelines to attach: {local_timelines:?}");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
{
match tenants_state::write_tenants().entry(tenant_id) {
hash_map::Entry::Occupied(_) => {
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
continue;
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::clone(&tenant));
}
let mut tenants_accessor = tenants_state::write_tenants();
let tenant = match tenants_accessor.entry(tenant_id) {
hash_map::Entry::Occupied(o) => {
info!("Tenant {tenant_id} was found in pageserver's memory");
Arc::clone(o.get())
}
}
hash_map::Entry::Vacant(v) => {
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
v.insert(Arc::clone(&tenant));
tenant
}
};
drop(tenants_accessor);
if tenant.current_state() == TenantState::Broken {
warn!("Skipping timeline load for broken tenant {tenant_id}")
@@ -168,16 +177,28 @@ fn load_local_tenant(
remote_index.clone(),
conf.remote_storage_config.is_some(),
));
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
if !tenant_timelines_dir.is_dir() {
error!(
"Tenant {} has no timelines directory at {}",
tenant_id,
tenant_timelines_dir.display()
);
tenant.set_state(TenantState::Broken);
} else {
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
}
}
tenant
}
@@ -625,14 +646,10 @@ fn collect_timelines_for_tenant(
}
if tenant_timelines.is_empty() {
match remove_if_empty(&timelines_dir) {
Ok(true) => info!(
"Removed empty tenant timelines directory {}",
timelines_dir.display()
),
Ok(false) => (),
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
}
// this is normal, we've removed all broken, empty and temporary timeline dirs
// but should allow the tenant to stay functional and allow creating new timelines
// on a restart, we require tenants to have the timelines dir, so leave it on disk
debug!("Tenant {tenant_id} has no timelines loaded");
}
Ok((tenant_id, tenant_timelines))

36
pageserver/src/trace.rs Normal file
View File

@@ -0,0 +1,36 @@
use bytes::Bytes;
use std::{
fs::{create_dir_all, File},
io::{BufWriter, Write},
path::PathBuf,
};
pub struct Tracer {
writer: BufWriter<File>,
}
impl Drop for Tracer {
fn drop(&mut self) {
self.flush()
}
}
impl Tracer {
pub fn new(path: PathBuf) -> Self {
let parent = path.parent().expect("failed to parse parent path");
create_dir_all(parent).expect("failed to create trace dir");
let file = File::create(path).expect("failed to create trace file");
Tracer {
writer: BufWriter::new(file),
}
}
pub fn trace(&mut self, msg: &Bytes) {
self.writer.write(msg).expect("failed to write trace");
}
pub fn flush(&mut self) {
self.writer.flush().expect("failed to flush trace file");
}
}

View File

@@ -264,12 +264,76 @@ pageserver_flush(void)
}
}
// Entry of the page_cache
typedef struct
{
// HACK Just xor of bytes lol
char request_hash;
// Points directly to a NeonResponse. We can't just own the
// NeonResponse because it's a "supertype", so it's not Sized.
char* response;
int len;
} NeonRequestResponse;
#define MAX_PAGE_CACHE_SIZE 50
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
int page_cache_size = 0;
int page_cache_head = 0;
static NeonResponse *
pageserver_call(NeonRequest * request)
{
// Compute hash
char hash = 0;
StringInfoData req_buff;
req_buff = nm_pack_request(request);
for (int i = 0; i < req_buff.len; i++) {
hash ^= req_buff.data[i];
}
pfree(req_buff.data);
// If result is cached, memcpy and return
for (int i = 0; i < page_cache_size; i++) {
if (page_cache[i].request_hash == hash) {
int len = page_cache[0].response->len;
NeonResponse *resp = palloc0(len);
// I'd rather Rc than memcpy, but this is not rust :(
memcpy(resp, page_cache[0].response->data, len);
elog(LOG, "cache hit !!!");
return resp;
}
}
// Send request, get response
pageserver_send(request);
pageserver_flush();
return pageserver_receive();
NeonResponse *resp = pageserver_receive();
// Get length
int len = -1;
switch (resp->tag) {
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
}
// Cache result
page_cache[page_cache_head].request_hash = hash;
if (page_cache_head < page_cache_size) {
pfree(page_cache[page_cache_head].response);
}
page_cache[page_cache_head].len = len;
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
memcpy(page_cache[page_cache_head].response, resp, len);
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
page_cache_size += 1;
}
return resp;
}
page_server_api api = {

View File

@@ -129,13 +129,13 @@ mod tests {
assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send");
let () = futures::future::pending().await; // sleep forever
futures::future::pending::<()>().await; // sleep forever
Ok(())
}));
// Wait until the task has been spawned.
let () = rx.await.context("failed to hear from the task")?;
rx.await.context("failed to hear from the task")?;
// Drop the session's entry by cancelling the task.
task.abort();

View File

@@ -5,6 +5,7 @@ filterwarnings =
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
addopts =
-m 'not remote_cluster'
--ignore=test_runner/performance
markers =
remote_cluster
testpaths =

View File

@@ -1,11 +1,10 @@
[toolchain]
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
# version, we can consider updating. As of this writing, 1.60 is available on Debian 'experimental' but not yet on
# 'testing' or even 'unstable', which is a bit more cutting-edge than we'd like. Hopefully the 1.60 packages reach
# 'testing' soon (and similarly for the other distributions).
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package.
channel = "1.60" # do update GitHub CI cache values for rust builds, when changing this value
# version, we can consider updating.
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package,
# we use "unstable" version number as the highest version used in the project by default.
channel = "1.61" # do update GitHub CI cache values for rust builds, when changing this value
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -33,6 +33,7 @@ toml_edit = { version = "0.13", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use safekeeper_api::models;

View File

@@ -27,14 +27,13 @@ mod timelines_global_map;
pub use timelines_global_map::GlobalTimelines;
pub mod defaults {
use const_format::formatcp;
use std::time::Duration;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub use safekeeper_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
}

View File

@@ -56,6 +56,14 @@ If you want to run all tests that have the string "bench" in their names:
`./scripts/pytest -k bench`
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
`./scripts/pytest -n4`
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
`./scripts/pytest test_runner/performance`
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.

View File

@@ -91,10 +91,25 @@ class NeonCompare(PgCompare):
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
self.tenant, _ = self.env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# HACK enable request tracing, as an experiment
# NOTE This must be done before the pg starts pagestream
# TODO why does it not work?
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
# "trace_read_requests": "true",
# })
# We only use one branch and one timeline
self.env.neon_cli.create_branch(branch_name, "empty")
self._pg = self.env.postgres.create_start(branch_name)
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
self._pg = self.env.postgres.create_start(
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
@property
def pg(self):
@@ -109,10 +124,10 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
def compact(self):
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
def report_peak_memory_use(self) -> None:
self.zenbenchmark.record(
@@ -124,7 +139,7 @@ class NeonCompare(PgCompare):
def report_size(self) -> None:
timeline_size = self.zenbenchmark.get_timeline_size(
self.env.repo_dir, self.env.initial_tenant, self.timeline
self.env.repo_dir, self.tenant, self.timeline
)
self.zenbenchmark.record(
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER

View File

@@ -455,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]:
@@ -583,7 +586,9 @@ class NeonEnvBuilder:
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
@@ -1744,6 +1749,37 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
@dataclass
class ReplayBin:
"""A helper class for replaying pageserver wal and read traces."""
traces_dir: str
def replay_all(self, pageserver_connstr):
replay_binpath = os.path.join(str(neon_binpath), "replay")
args = [
replay_binpath,
self.traces_dir,
pageserver_connstr,
]
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
subprocess.run(args)
def draw_all(self):
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
args = [
draw_binpath,
self.traces_dir,
]
subprocess.run(args)
@pytest.fixture(scope="function")
def replay_bin(test_output_dir):
traces_dir = os.path.join(test_output_dir, "repo", "traces")
return ReplayBin(traces_dir)
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host="localhost", port=port, dbname="postgres")

View File

@@ -175,6 +175,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#

View File

@@ -0,0 +1,57 @@
from contextlib import closing
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
# This test is a demonstration of how to do trace playback. With the current
# workload it uses it's not testing anything meaningful.
def test_trace_replay(
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant, _ = env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "true",
# })
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
with zenbenchmark.record_duration("run"):
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
# Stop pg so we drop the connection and flush the traces
pg.stop()
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "false",
# })
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
# assert trace_path.exists()
replay_bin.draw_all()
return
print("replaying")
ps_connstr = env.pageserver.connstr()
with zenbenchmark.record_duration("replay"):
output = replay_bin.replay_all(ps_connstr)
print(output)

View File

@@ -1,6 +1,7 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder

View File

@@ -1,4 +1,5 @@
import os
import shutil
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -7,8 +8,13 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from prometheus_client.samples import Sample
@@ -201,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_with_empty_tenants",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_without_timelines_dir = env.initial_tenant
log.info(
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
tenant_with_empty_timelines_dir = client.tenant_create()
log.info(
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
)
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline reinitialization after pageserver restart
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert (
len(tenants) == 1
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
loaded_tenant = tenants[0]
assert loaded_tenant["id"] == str(
tenant_with_empty_timelines_dir
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Empty tenant should be loaded and ready for timeline creation"

View File

@@ -7,19 +7,25 @@
#
import asyncio
import os
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
Postgres,
RemoteStorageKind,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_upload,
wait_until,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
async def tenant_workload(env: NeonEnv, pg: Postgres):
@@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
# run final checkpoint manually to flush all the data to remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_tenants_attached_after_download(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="remote_storage_kind",
)
data_id = 1
data_secret = "very secret secret"
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
for checkpoint_number in range(1, 3):
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(path)
local_layer_deleted = True
break
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
env.pageserver.start()
client = env.pageserver.http_client()
wait_until(
number_of_iterations=5,
interval=1,
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
)
restored_timelines = client.timeline_list(tenant_id)
assert (
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
retored_timeline = restored_timelines[0]
assert retored_timeline["timeline_id"] == str(
timeline_id
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
def expect_tenant_to_download_timeline(
client: NeonPageserverHttpClient,
tenant_id: TenantId,
):
for tenant in client.tenant_list():
if tenant["id"] == str(tenant_id):
assert not tenant.get(
"has_in_progress_downloads", True
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"

View File

@@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }