Compare commits

..

9 Commits

Author SHA1 Message Date
Konstantin Knizhnik
10b90506a0 Use B-Tree instead of R-Tree 2022-10-07 14:57:25 +03:00
Konstantin Knizhnik
5ee4524caa Fix indentation 2022-10-06 22:31:11 +03:00
Konstantin Knizhnik
c5245a9e4f Make clippy happy 2022-10-06 21:47:26 +03:00
Konstantin Knizhnik
9f10195d7b Change LSN range assignment rue for partial image layers 2022-10-06 20:16:13 +03:00
Konstantin Knizhnik
51aa53ab90 Check that image contains search key in LayerMap::search 2022-10-06 16:06:39 +03:00
Konstantin Knizhnik
2359106a9d Fix writer creation in reconstruct_level0 2022-10-06 10:05:35 +03:00
Konstantin Knizhnik
885033ad42 Make reconstruction more intensive 2022-10-06 09:37:37 +03:00
Konstantin Knizhnik
487ec20085 Fix indentation 2022-10-05 15:30:24 +03:00
Konstantin Knizhnik
898937d500 Store partria image layers instead of compaction 2022-10-04 20:42:34 +03:00
47 changed files with 398 additions and 1629 deletions

View File

@@ -768,5 +768,5 @@ jobs:
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

105
Cargo.lock generated
View File

@@ -497,10 +497,8 @@ dependencies = [
"chrono",
"clap 3.2.16",
"env_logger",
"futures",
"hyper",
"log",
"notify",
"postgres",
"regex",
"serde",
@@ -542,11 +540,11 @@ dependencies = [
"git-version",
"nix",
"once_cell",
"pageserver_api",
"pageserver",
"postgres",
"regex",
"reqwest",
"safekeeper_api",
"safekeeper",
"serde",
"serde_with",
"tar",
@@ -1074,15 +1072,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures"
version = "0.3.21"
@@ -1504,26 +1493,6 @@ dependencies = [
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -1583,26 +1552,6 @@ dependencies = [
"simple_asn1",
]
[[package]]
name = "kqueue"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "kstring"
version = "1.0.6"
@@ -1848,24 +1797,6 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -2044,7 +1975,6 @@ dependencies = [
"nix",
"num-traits",
"once_cell",
"pageserver_api",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -2059,7 +1989,6 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"svg_fmt",
"tar",
"tempfile",
"thiserror",
@@ -2074,17 +2003,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -2973,7 +2891,6 @@ dependencies = [
"postgres_ffi",
"regex",
"remote_storage",
"safekeeper_api",
"serde",
"serde_json",
"serde_with",
@@ -2989,17 +2906,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "safekeeper_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "same-file"
version = "1.0.6"
@@ -3334,12 +3240,6 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "svg_fmt"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
[[package]]
name = "symbolic-common"
version = "8.8.0"
@@ -4242,7 +4142,6 @@ dependencies = [
"bstr",
"bytes",
"chrono",
"crossbeam-utils",
"either",
"fail",
"hashbrown",

View File

@@ -8,10 +8,8 @@ anyhow = "1.0"
chrono = "0.4"
clap = "3.0"
env_logger = "0.9"
futures = "0.3.13"
hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] }
notify = "5.0.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -258,7 +258,14 @@ impl ComputeNode {
.spawn()
.expect("cannot start postgres process");
wait_for_postgres(&mut pg, pgdata_path)?;
// Try default Postgres port if it is not provided
let port = self
.spec
.cluster
.settings
.find("port")
.unwrap_or_else(|| "5432".to_string());
wait_for_postgres(&mut pg, &port, pgdata_path)?;
// If connection fails,
// it may be the old node with `zenith_admin` superuser.

View File

@@ -1,19 +1,18 @@
use std::fmt::Write;
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::net::{SocketAddr, TcpStream};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Child;
use std::time::{Duration, Instant};
use std::str::FromStr;
use std::{fs, thread, time};
use anyhow::{bail, Result};
use postgres::{Client, Transaction};
use serde::Deserialize;
use notify::{RecursiveMode, Watcher};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
@@ -231,85 +230,52 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
Ok(postgres_dbs)
}
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
/// Wait for Postgres to become ready to accept connections:
/// - state should be `ready` in the `pgdata/postmaster.pid`
/// - and we should be able to connect to 127.0.0.1:5432
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");
let mut slept: u64 = 0; // ms
let pause = time::Duration::from_millis(100);
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
// completed initialization and is ready to accept connections. We want to
// react quickly and perform the rest of our initialization as soon as
// PostgreSQL starts accepting connections. Use 'notify' to be notified
// whenever the PID file is changed, and whenever it changes, read it to
// check if it's now "ready".
//
// You cannot actually watch a file before it exists, so we first watch the
// data directory, and once the postmaster.pid file appears, we switch to
// watch the file instead. We also wake up every 100 ms to poll, just in
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res| {
let _ = tx.send(res);
})?;
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
let timeout = time::Duration::from_millis(10);
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
let started_at = Instant::now();
let mut postmaster_pid_seen = false;
loop {
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
// but postgres starts listening almost immediately, even if it is not really
// ready to accept connections).
if slept >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
if let Ok(Some(status)) = pg.try_wait() {
// Postgres exited, that is not what we expected, bail out earlier.
let code = status.code().unwrap_or(-1);
bail!("Postgres exited unexpectedly with code {}", code);
}
let res = rx.recv_timeout(Duration::from_millis(100));
log::debug!("woken up by notify: {res:?}");
// If there are multiple events in the channel already, we only need to be
// check once. Swallow the extra events before we go ahead to check the
// pid file.
while let Ok(res) = rx.try_recv() {
log::debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
log::debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
watcher
.watch(&pid_path, RecursiveMode::NonRecursive)
.expect("Failed to add postmaster.pid file watch");
postmaster_pid_seen = true;
}
let file = BufReader::new(file);
let last_line = file.lines().last();
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
log::debug!("last line of postmaster.pid: {status:?}");
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
// Now Postgres is ready to accept connections
if status == "ready" {
if status == "ready" && can_connect {
break;
}
}
}
// Give up after POSTGRES_WAIT_TIMEOUT.
let duration = started_at.elapsed();
if duration >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
thread::sleep(pause);
slept += 100;
}
log::info!("PostgreSQL is now running, continuing to configure it");
Ok(())
}

View File

@@ -19,9 +19,7 @@ thiserror = "1"
nix = "0.23"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.
pageserver_api = { path = "../libs/pageserver_api" }
safekeeper_api = { path = "../libs/safekeeper_api" }
pageserver = { path = "../pageserver" }
safekeeper = { path = "../safekeeper" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
use pageserver::config::defaults::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use safekeeper_api::{
use pageserver::http::models::TimelineInfo;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};

View File

@@ -284,7 +284,7 @@ impl PostgresNode {
conf.append("max_wal_senders", "10");
// wal_log_hints is mandatory when running against pageserver (see gh issue#192)
// TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE?
conf.append("wal_log_hints", "on");
conf.append("wal_log_hints", "off");
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");

View File

@@ -12,7 +12,7 @@ use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper::http::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
connstring::connection_address,

View File

@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver_api::models::{
use pageserver::http::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres::{Config, NoTls};
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
let url = self.url().to_owned();
Err(PageserverHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Response error: {}", err_body.msg),
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
@@ -181,15 +181,14 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
.context("Failed to create tenant")?;
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info = self.timeline_create(
initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
).context("Failed to create timeline")?;
)?;
Ok(initial_timeline_info.timeline_id)
}
@@ -420,11 +419,6 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -1,12 +0,0 @@
[package]
name = "pageserver_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,9 +0,0 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -1,12 +0,0 @@
[package]
name = "safekeeper_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,10 +0,0 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -58,7 +58,6 @@ rstar = "0.9.3"
num-traits = "0.2.15"
amplify_num = "0.4.1"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
@@ -67,7 +66,6 @@ remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -1,185 +0,0 @@
use clap::{App, Arg};
use futures::TryFutureExt;
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
use std::io::Write;
use std::{
fs::{read_dir, File},
io::BufReader,
str::FromStr,
};
use svg_fmt::*;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
(set.len(), map)
}
fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace visualization tool")
.about("Makes a svg file that displays the read pattern")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.get_matches();
// (blkno, lsn)
let mut dots = Vec::<(u32, Lsn)>::new();
let mut dump_file = File::create("dump.txt").expect("can't make file");
let mut deltas = HashMap::<i32, u32>::new();
let mut prev1: u32 = 0;
let mut prev2: u32 = 0;
let mut prev3: u32 = 0;
println!("scanning trace ...");
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("tenant: {tenant_id}");
println!("opening {path:?}");
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("hi");
// println!("timeline: {timeline_id}");
println!("opening {path:?}");
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
println!("opening {path:?}");
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
// println!("Parsed message {:?}", msg);
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetPage(req) => {
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
// dots.push((req.blkno, req.lsn));
// HACK
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
let delta1 = (req.blkno as i32) - (prev1 as i32);
let delta2 = (req.blkno as i32) - (prev2 as i32);
let delta3 = (req.blkno as i32) - (prev3 as i32);
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
delta1
} else {
delta2
};
if i32::abs(delta3) < i32::abs(delta) {
delta = delta3;
}
let delta = delta1;
prev3 = prev2;
prev2 = prev1;
prev1 = req.blkno;
match deltas.get_mut(&delta) {
Some(c) => {*c += 1;},
None => {deltas.insert(delta, 1);},
};
if delta == 9 {
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
}
},
PagestreamFeMessage::DbSize(_) => {}
};
// HACK
// if dots.len() > 1000 {
// break;
// }
}
}
}
}
let mut other = deltas.len();
deltas.retain(|_, count| *count > 3);
other -= deltas.len();
dbg!(other);
dbg!(deltas);
// Collect all coordinates
let mut keys: Vec<u32> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for dot in &dots {
keys.push(dot.0);
lsns.push(dot.1);
}
// Analyze
let (key_max, key_map) = analyze(keys);
let (lsn_max, lsn_map) = analyze(lsns);
// Draw
println!("drawing trace ...");
let mut svg_file = File::create("out.svg").expect("can't make file");
writeln!(
&mut svg_file,
"{}",
BeginSvg {
w: (key_max + 1) as f32,
h: (lsn_max + 1) as f32,
}
)?;
for (key, lsn) in &dots {
let key = key_map.get(&key).unwrap();
let lsn = lsn_map.get(&lsn).unwrap();
writeln!(
&mut svg_file,
" {}",
rectangle(
*key as f32,
*lsn as f32,
10.0,
10.0
)
.fill(Fill::Color(red()))
.stroke(Stroke::Color(black(), 0.0))
.border_radius(0.5)
)?;
// println!(" {}",
// rectangle(key_start as f32 + stretch * margin,
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
// key_diff as f32 - stretch * 2.0 * margin,
// stretch * (lsn_diff - 2.0 * margin))
// // .fill(rgb(200, 200, 200))
// .fill(fill)
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
// .border_radius(0.4)
// );
}
writeln!(&mut svg_file, "{}", EndSvg)?;
Ok(())
}

View File

@@ -1,133 +0,0 @@
use bytes::BytesMut;
use pageserver::page_service::PagestreamFeMessage;
use std::{
fs::{read_dir, File},
io::BufReader,
path::PathBuf,
str::FromStr,
};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use clap::{App, Arg};
use utils::{
id::{TenantId, TimelineId},
pq_proto::{BeMessage, FeMessage},
};
// TODO put this in library, dedup with stuff in control_plane
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream,
}
impl PagestreamApi {
async fn connect(
connstr: &str,
tenant: &TenantId,
timeline: &TimelineId,
) -> anyhow::Result<PagestreamApi> {
// Parse connstr
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
// Connect
let mut stream = TcpStream::connect(tcp_addr).await?;
let (client, conn) = config
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant, timeline);
tokio::select! {
_ = conn => panic!("connection closed during pagestream initialization"),
_ = client.query(init_query.as_str(), &[]) => (),
};
Ok(PagestreamApi { stream })
}
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let request = {
let msg_bytes = msg.serialize();
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&msg_bytes);
// TODO it's actually a fe message but it doesn't have a serializer yet
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
self.stream.write_all(&request).await?;
// TODO It's actually a be message, but it doesn't have a parser.
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
let _response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
Ok(())
}
}
async fn replay_trace<R: std::io::Read>(
reader: &mut R,
mut pagestream: PagestreamApi,
) -> anyhow::Result<()> {
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
println!("Parsed message {:?}", msg);
pagestream.make_request(msg).await?;
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace replay tool")
.about("Replays wal or read traces to test pageserver performance")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.arg(
Arg::new("pageserver_connstr")
.takes_value(true)
.help("Pageserver pg endpoint to connect to"),
)
.get_matches();
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// TODO The pageserver deletes existing traces?
// LOL yes because I use tenant ID as trace id
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
// let len = file.metadata().unwrap().len();
// println!("replaying {:?} trace {} bytes", path, len);
replay_trace(&mut reader, pagestream).await?;
}
}
}
Ok(())
}

View File

@@ -30,10 +30,10 @@ pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;
pub use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -364,23 +364,6 @@ impl PageServerConf {
self.timelines_path(tenant_id).join(timeline_id.to_string())
}
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}
pub fn trace_path(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &TimelineId, // TODO make a new type
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {

View File

@@ -1,4 +1,3 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;

View File

@@ -7,17 +7,7 @@ use utils::{
lsn::Lsn,
};
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
use crate::tenant::TenantState;
#[serde_as]
#[derive(Serialize, Deserialize)]
@@ -52,7 +42,6 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
#[serde_as]

View File

@@ -337,16 +337,9 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
info!("Handling tenant attach {tenant_id}");
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
Ok(tenant) => {
if tenant.list_timelines().is_empty() {
info!("Attaching to tenant {tenant_id} with zero timelines");
Ok(())
} else {
Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
))
}
}
Ok(_) => Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
)),
Err(_) => Ok(()),
})
.await
@@ -617,9 +610,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
@@ -820,7 +810,9 @@ async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Bod
.get_timeline(timeline_id)
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
timeline.compact().map_err(ApiError::InternalServerError)?;
timeline
.reconstruct()
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}

View File

@@ -16,8 +16,6 @@ pub mod tenant;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_tasks;
// pub mod timelines;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -10,9 +10,7 @@
//
use anyhow::{bail, ensure, Context, Result};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::Buf;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
@@ -44,7 +42,6 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
@@ -52,9 +49,7 @@ use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
// TODO these should be in a library outside the pageserver
#[derive(Debug)]
pub enum PagestreamFeMessage {
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
@@ -62,7 +57,7 @@ pub enum PagestreamFeMessage {
}
// Wrapped in libpq CopyData
pub enum PagestreamBeMessage {
enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
@@ -71,152 +66,106 @@ pub enum PagestreamBeMessage {
}
#[derive(Debug)]
pub struct PagestreamExistsRequest {
struct PagestreamExistsRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamNblocksRequest {
struct PagestreamNblocksRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamGetPageRequest {
struct PagestreamGetPageRequest {
latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
lsn: Lsn,
rel: RelTag,
blkno: u32,
}
#[derive(Debug)]
pub struct PagestreamDbSizeRequest {
struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
struct PagestreamExistsResponse {
exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
struct PagestreamNblocksResponse {
n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
struct PagestreamGetPageResponse {
page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.read_u32::<BigEndian>()?,
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
@@ -473,17 +422,6 @@ impl PageServerHandler {
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let mut tracer = if tenant.get_trace_read_requests() {
let path = tenant
.conf
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
Some(Tracer::new(path))
} else {
None
};
// Check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
@@ -508,24 +446,15 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Sync) => {
// TODO what now?
continue;
}
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {

View File

@@ -45,7 +45,6 @@ use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
use toml_edit;
use utils::{
@@ -119,6 +118,18 @@ pub struct Tenant {
upload_layers: bool,
}
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Tenant {
@@ -299,7 +310,7 @@ impl Tenant {
for (timeline_id, timeline) in &timelines_to_compact {
let _entered = info_span!("compact_timeline", timeline = %timeline_id).entered();
timeline.compact()?;
timeline.reconstruct()?;
}
Ok(())
@@ -593,14 +604,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
// TODO is there a need for all this boilerplate?
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
@@ -1738,7 +1741,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.reconstruct()?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
@@ -1746,7 +1749,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.reconstruct()?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
@@ -1754,7 +1757,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.reconstruct()?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
@@ -1762,7 +1765,7 @@ mod tests {
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.reconstruct()?;
assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10"));
assert_eq!(tline.get(*TEST_KEY, Lsn(0x1f))?, TEST_IMG("foo at 0x10"));
@@ -1810,7 +1813,7 @@ mod tests {
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.reconstruct()?;
tline.gc()?;
}
@@ -1880,7 +1883,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.reconstruct()?;
tline.gc()?;
}
@@ -1959,7 +1962,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.reconstruct()?;
tline.gc()?;
}

View File

@@ -95,6 +95,9 @@ impl From<&DeltaLayer> for Summary {
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
// Flag indicating page image
const IS_IMAGE: u64 = 2;
///
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
@@ -109,15 +112,22 @@ impl BlobRef {
(self.0 & WILL_INIT) != 0
}
pub fn pos(&self) -> u64 {
self.0 >> 1
pub fn is_image(&self) -> bool {
(self.0 & IS_IMAGE) != 0
}
pub fn new(pos: u64, will_init: bool) -> BlobRef {
let mut blob_ref = pos << 1;
pub fn pos(&self) -> u64 {
self.0 >> 2
}
pub fn new(pos: u64, will_init: bool, is_image: bool) -> BlobRef {
let mut blob_ref = pos << 2;
if will_init {
blob_ref |= WILL_INIT;
}
if is_image {
blob_ref |= IS_IMAGE;
}
BlobRef(blob_ref)
}
}
@@ -314,13 +324,13 @@ impl Layer for DeltaLayer {
}
}
fn key_iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'a> {
fn key_iter<'a>(&'a self, skip_images: bool) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'a> {
let inner = match self.load() {
Ok(inner) => inner,
Err(e) => panic!("Failed to load a delta layer: {e:?}"),
};
match DeltaKeyIter::new(inner) {
match DeltaKeyIter::new(inner, skip_images) {
Ok(iter) => Box::new(iter),
Err(e) => panic!("Layer index is corrupted: {e:?}"),
}
@@ -414,6 +424,30 @@ impl Layer for DeltaLayer {
Ok(())
}
fn contains(&self, key: &Key) -> Result<bool> {
// Open the file and lock the metadata in memory
let inner = self.load()?;
// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(key, Lsn(0));
let mut found = false;
reader.visit(
&search_key.0,
VisitDirection::Forwards,
|delta_key, _val| {
found = DeltaKey::extract_key_from_buf(delta_key) == *key;
false
},
)?;
Ok(found)
}
}
impl DeltaLayer {
@@ -671,7 +705,13 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
self.put_value_bytes(
key,
lsn,
&Value::ser(&val)?,
val.will_init(),
val.is_image(),
)
}
pub fn put_value_bytes(
@@ -680,12 +720,12 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: &[u8],
will_init: bool,
is_image: bool,
) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(val)?;
let blob_ref = BlobRef::new(off, will_init);
let blob_ref = BlobRef::new(off, will_init, is_image);
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;
@@ -874,7 +914,7 @@ impl Iterator for DeltaKeyIter {
}
impl<'a> DeltaKeyIter {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>, skip_images: bool) -> Result<Self> {
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
@@ -883,29 +923,33 @@ impl<'a> DeltaKeyIter {
);
let mut all_keys: Vec<(DeltaKey, u64)> = Vec::new();
let mut last_pos = 0u64;
let mut last_delta: Option<DeltaKey> = None;
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, value| {
let delta_key = DeltaKey::from_slice(key);
let pos = BlobRef(value).pos();
if let Some(last) = all_keys.last_mut() {
if last.0.key() == delta_key.key() {
return true;
} else {
// subtract offset of new key BLOB and first blob of this key
// to get total size if values associated with this key
let first_pos = last.1;
last.1 = pos - first_pos;
let blob_ref = BlobRef(value);
if !blob_ref.is_image() || !skip_images {
let next_delta = DeltaKey::from_slice(key);
let pos = blob_ref.pos();
if let Some(prev_delta) = last_delta.take() {
if prev_delta.key() == next_delta.key() {
last_delta = Some(next_delta);
return true;
}
all_keys.push((prev_delta, pos - last_pos));
}
last_delta = Some(next_delta);
last_pos = pos;
}
all_keys.push((delta_key, pos));
true
},
)?;
if let Some(last) = all_keys.last_mut() {
if let Some(prev_delta) = last_delta.take() {
// Last key occupies all space till end of layer
last.1 = std::fs::metadata(&file.file.path)?.len() - last.1;
let file_size = std::fs::metadata(&file.file.path)?.len();
all_keys.push((prev_delta, file_size - last_pos));
}
let iter = DeltaKeyIter {
all_keys,

View File

@@ -223,6 +223,10 @@ impl Layer for ImageLayer {
Ok(())
}
fn contains(&self, key: &Key) -> Result<bool> {
Ok(self.get_key_range().contains(key))
}
}
impl ImageLayer {

View File

@@ -235,6 +235,11 @@ impl Layer for InMemoryLayer {
Ok(())
}
fn contains(&self, key: &Key) -> Result<bool> {
let inner = self.inner.read().unwrap();
Ok(inner.index.get(key).is_some())
}
}
impl InMemoryLayer {
@@ -358,8 +363,14 @@ impl InMemoryLayer {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
let value = Value::des(&buf)?;
delta_layer_writer.put_value_bytes(
key,
*lsn,
&buf,
value.will_init(),
value.is_image(),
)?;
}
}

View File

@@ -15,19 +15,25 @@ use crate::repository::Key;
use crate::tenant::inmemory_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use crate::tenant::storage_layer::{range_eq, range_overlaps};
use amplify_num::i256;
use anyhow::Result;
use num_traits::identities::{One, Zero};
use num_traits::{Bounded, Num, Signed};
use rstar::{RTree, RTreeObject, AABB};
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
use std::ops::Range;
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
struct BTreeKey {
lsn: Lsn,
seq: usize,
}
impl BTreeKey {
fn new(lsn: Lsn) -> BTreeKey {
BTreeKey { lsn, seq: 0 }
}
}
///
/// LayerMap tracks what layers exist on a timeline.
///
@@ -53,163 +59,11 @@ pub struct LayerMap {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// All the historic layers are kept here
historic_layers: RTree<LayerRTreeObject>,
historic_layers: BTreeMap<BTreeKey, Arc<dyn Layer>>,
layers_seqno: usize,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<dyn Layer>>,
}
struct LayerRTreeObject {
layer: Arc<dyn Layer>,
}
// Representation of Key as numeric type.
// We can not use native implementation of i128, because rstar::RTree
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
// By using i256 as the type, even though all the actual values would fit in i128, we can be
// sure that multiplication doesn't overflow.
//
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
struct IntKey(i256);
impl Copy for IntKey {}
impl IntKey {
fn from(i: i128) -> Self {
IntKey(i256::from(i))
}
}
impl Bounded for IntKey {
fn min_value() -> Self {
IntKey(i256::MIN)
}
fn max_value() -> Self {
IntKey(i256::MAX)
}
}
impl Signed for IntKey {
fn is_positive(&self) -> bool {
self.0 > i256::ZERO
}
fn is_negative(&self) -> bool {
self.0 < i256::ZERO
}
fn signum(&self) -> Self {
match self.0.cmp(&i256::ZERO) {
Ordering::Greater => IntKey(i256::ONE),
Ordering::Less => IntKey(-i256::ONE),
Ordering::Equal => IntKey(i256::ZERO),
}
}
fn abs(&self) -> Self {
IntKey(self.0.abs())
}
fn abs_sub(&self, other: &Self) -> Self {
if self.0 <= other.0 {
IntKey(i256::ZERO)
} else {
IntKey(self.0 - other.0)
}
}
}
impl Neg for IntKey {
type Output = Self;
fn neg(self) -> Self::Output {
IntKey(-self.0)
}
}
impl Rem for IntKey {
type Output = Self;
fn rem(self, rhs: Self) -> Self::Output {
IntKey(self.0 % rhs.0)
}
}
impl Div for IntKey {
type Output = Self;
fn div(self, rhs: Self) -> Self::Output {
IntKey(self.0 / rhs.0)
}
}
impl Add for IntKey {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
IntKey(self.0 + rhs.0)
}
}
impl Sub for IntKey {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
IntKey(self.0 - rhs.0)
}
}
impl Mul for IntKey {
type Output = Self;
fn mul(self, rhs: Self) -> Self::Output {
IntKey(self.0 * rhs.0)
}
}
impl One for IntKey {
fn one() -> Self {
IntKey(i256::ONE)
}
}
impl Zero for IntKey {
fn zero() -> Self {
IntKey(i256::ZERO)
}
fn is_zero(&self) -> bool {
self.0 == i256::ZERO
}
}
impl Num for IntKey {
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
}
}
impl PartialEq for LayerRTreeObject {
fn eq(&self, other: &Self) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(&self.layer, &other.layer)
}
}
impl RTreeObject for LayerRTreeObject {
type Envelope = AABB<[IntKey; 2]>;
fn envelope(&self) -> Self::Envelope {
let key_range = self.layer.get_key_range();
let lsn_range = self.layer.get_lsn_range();
AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
)
}
/// Latest stored delta layer
latest_delta_layer: Option<Arc<dyn Layer>>,
}
/// Return value of LayerMap::search
@@ -234,23 +88,17 @@ impl LayerMap {
// linear search
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<dyn Layer>> = None;
let mut latest_img_lsn: Option<Lsn> = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
[
IntKey::from(key.to_i128()),
IntKey::from(end_lsn.0 as i128 - 1),
],
);
for e in self
let mut latest_img_lsn = Lsn(0);
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
while let Some((_key, l)) = iter.next_back() {
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
if !l.get_key_range().contains(&key) {
continue;
}
let img_lsn = l.get_lsn_range().start;
assert!(img_lsn < end_lsn);
if Lsn(img_lsn.0 + 1) == end_lsn {
@@ -260,23 +108,23 @@ impl LayerMap {
lsn_floor: img_lsn,
}));
}
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
latest_img = Some(Arc::clone(l));
latest_img_lsn = Some(img_lsn);
}
latest_img = Some(Arc::clone(l));
latest_img_lsn = img_lsn;
break;
}
// Search the delta layers
let mut latest_delta: Option<Arc<dyn Layer>> = None;
for e in self
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
while let Some((_key, l)) = iter.next_back() {
if !l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
if !l.get_key_range().contains(&key) {
continue;
}
if l.get_lsn_range().start >= end_lsn {
info!(
"Candidate delta layer {}..{} is too new for lsn {}",
@@ -286,6 +134,9 @@ impl LayerMap {
);
}
assert!(l.get_lsn_range().start < end_lsn);
if l.get_lsn_range().end <= latest_img_lsn {
continue;
}
if l.get_lsn_range().end >= end_lsn {
// this layer contains the requested point in the key/lsn space.
// No need to search any further
@@ -311,10 +162,7 @@ impl LayerMap {
"found (old) layer {} for request on {key} at {end_lsn}",
l.filename().display(),
);
let lsn_floor = std::cmp::max(
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
l.get_lsn_range().start,
);
let lsn_floor = std::cmp::max(Lsn(latest_img_lsn.0 + 1), l.get_lsn_range().start);
Ok(Some(SearchResult {
lsn_floor,
layer: l,
@@ -322,7 +170,7 @@ impl LayerMap {
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Ok(Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
lsn_floor: latest_img_lsn,
layer: l,
}))
} else {
@@ -336,9 +184,28 @@ impl LayerMap {
///
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone());
self.latest_delta_layer = Some(layer.clone());
} else if !layer.is_incremental() {
// If latest delta layer is followed by image layers
// then reset it, preventing generation of partial image layer
if let Some(latest_delta) = &self.latest_delta_layer {
// May be it is more correct to use contains() rather than inrestects
// but one delta layer can be covered by several image layers.
let kr1 = layer.get_key_range();
let kr2 = latest_delta.get_key_range();
if range_overlaps(&kr1, &kr2) {
self.latest_delta_layer = None;
}
}
}
self.historic_layers.insert(LayerRTreeObject { layer });
self.historic_layers.insert(
BTreeKey {
lsn: layer.get_lsn_range().start,
seq: self.layers_seqno,
},
layer,
);
self.layers_seqno += 1;
NUM_ONDISK_LAYERS.inc();
}
@@ -349,21 +216,33 @@ impl LayerMap {
///
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
if layer.get_key_range() == (Key::MIN..Key::MAX) {
let len_before = self.l0_delta_layers.len();
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
self.l0_delta_layers
.retain(|other| !Arc::ptr_eq(other, &layer));
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
if let Some(latest_layer) = &self.latest_delta_layer {
#[allow(clippy::vtable_address_comparisons)]
if Arc::ptr_eq(&layer, latest_layer) {
self.latest_delta_layer = None;
}
}
}
assert!(self
.historic_layers
.remove(&LayerRTreeObject { layer })
.is_some());
let len_before = self.historic_layers.len();
#[allow(clippy::vtable_address_comparisons)]
self.historic_layers
.retain(|_key, other| !Arc::ptr_eq(other, &layer));
if self.historic_layers.len() != len_before - 1 {
assert!(self.historic_layers.len() == len_before);
error!(
"Failed to remove {} layer: {}..{}__{}..{}",
if layer.is_incremental() {
"inremental"
} else {
"image"
},
layer.get_key_range().start,
layer.get_key_range().end,
layer.get_lsn_range().start,
layer.get_lsn_range().end
);
}
assert!(self.historic_layers.len() == len_before - 1);
NUM_ONDISK_LAYERS.dec();
}
@@ -380,21 +259,10 @@ impl LayerMap {
loop {
let mut made_progress = false;
let envelope = AABB::from_corners(
[
IntKey::from(range_remain.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(range_remain.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
for (_key, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
@@ -417,39 +285,30 @@ impl LayerMap {
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
self.historic_layers.iter().map(|e| e.layer.clone())
self.historic_layers
.iter()
.map(|(_key, layer)| layer.clone())
}
/// Find the last image layer that covers 'key', ignoring any image layers
/// newer than 'lsn'.
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
let mut candidate_lsn = Lsn(0);
let mut candidate = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0)],
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
);
for e in self
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1));
while let Some((_key, l)) = iter.next_back() {
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
if this_lsn < candidate_lsn {
// our previous candidate was better
if !l.get_key_range().contains(&key) {
continue;
}
candidate_lsn = this_lsn;
candidate = Some(Arc::clone(l));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
return Some(Arc::clone(l));
}
candidate
None
}
///
@@ -466,18 +325,10 @@ impl LayerMap {
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
let mut points = vec![key_range.start];
let envelope = AABB::from_corners(
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
[
IntKey::from(key_range.end.to_i128()),
IntKey::from(lsn.0 as i128),
],
);
for e in self
for (_lsn, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1))
{
let l = &e.layer;
assert!(l.get_lsn_range().start <= lsn);
let range = l.get_key_range();
if key_range.contains(&range.start) {
@@ -514,26 +365,17 @@ impl LayerMap {
if lsn_range.start >= lsn_range.end {
return Ok(0);
}
let envelope = AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
for (_lsn, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{
let l = &e.layer;
if !l.is_incremental() {
continue;
}
if !range_overlaps(&l.get_key_range(), key_range) {
continue;
}
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
assert!(range_overlaps(&l.get_key_range(), key_range));
// We ignore level0 delta layers. Unless the whole keyspace fits
// into one partition
@@ -549,8 +391,8 @@ impl LayerMap {
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<dyn Layer>>> {
Ok(self.l0_delta_layers.clone())
pub fn get_latest_delta_layer(&mut self) -> Option<Arc<dyn Layer>> {
self.latest_delta_layer.take()
}
/// debugging function to print out the contents of the layer map
@@ -569,8 +411,8 @@ impl LayerMap {
}
println!("historic_layers:");
for e in self.historic_layers.iter() {
e.layer.dump(verbose)?;
for (_key, layer) in self.historic_layers.iter() {
layer.dump(verbose)?;
}
println!("End dump LayerMap");
Ok(())

View File

@@ -139,9 +139,9 @@ pub trait Layer: Send + Sync {
/// Iterate through all keys and values stored in the layer
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_>;
/// Iterate through all keys stored in the layer. Returns key, lsn and value size
/// It is used only for compaction and so is currently implemented only for DeltaLayer
fn key_iter(&self) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + '_> {
/// Iterate through all keys stored in the layer. Returns key, lsn and value size.
/// It is used only for reconstruction and so is currently implemented only for DeltaLayer
fn key_iter(&self, _skip_images: bool) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + '_> {
panic!("Not implemented")
}
@@ -150,4 +150,7 @@ pub trait Layer: Send + Sync {
/// Dump summary of the contents of the layer to stdout
fn dump(&self, verbose: bool) -> Result<()>;
// Check if ayer contains particular key
fn contains(&self, key: &Key) -> Result<bool>;
}

View File

@@ -3,7 +3,6 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use tokio::task::spawn_blocking;
use tracing::*;
@@ -119,7 +118,7 @@ pub struct Timeline {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// This lock is acquired in [`Timeline::gc`], [`Timeline::reconstruct`],
/// and [`Tenant::delete_timeline`].
layer_removal_cs: Mutex<()>,
@@ -469,7 +468,7 @@ impl Timeline {
CheckpointConfig::Forced => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)?;
self.compact()
self.reconstruct()
}
}
}
@@ -510,13 +509,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
@@ -597,7 +589,7 @@ impl Timeline {
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result.repartition_threshold = result.get_checkpoint_distance() * 3;
result
}
@@ -731,7 +723,7 @@ impl Timeline {
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
self.layer_removal_cs
.try_lock()
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
.map_err(|e| anyhow!("cannot lock reconstruction critical section {e}"))
}
/// Retrieve current logical size of the timeline.
@@ -1333,17 +1325,17 @@ impl Timeline {
Ok(new_delta_path)
}
pub fn compact(&self) -> anyhow::Result<()> {
pub fn reconstruct(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
warn!("Skipping reconstruction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
}
//
// High level strategy for compaction / image creation:
// High level strategy for reconstruction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
@@ -1367,13 +1359,13 @@ impl Timeline {
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// 3. After that, we reconstruct all level0 delta files if there
// are too many of them. While reconstructing, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// Below are functions reconstruct_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
@@ -1400,21 +1392,21 @@ impl Timeline {
None,
);
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
error!("could not reconstruct, repartitioning keyspace failed: {err:?}");
}
};
// 3. Reconstruct
let timer = self.metrics.reconstruct_time_histo.start_timer();
self.reconstruct_level0(target_file_size)?;
timer.stop_and_record();
Ok(())
}
@@ -1514,7 +1506,7 @@ impl Timeline {
// We must also fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// Compaction creates multiple image layers. It would be better to create them all
// Reconstruction creates multiple image layers. It would be better to create them all
// and fsync them all in parallel.
let mut all_paths = Vec::from_iter(layer_paths_to_upload.clone());
all_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
@@ -1534,230 +1526,46 @@ impl Timeline {
}
///
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// Collect a bunch of Level 0 layer files, and reconstruct and reshuffle them as
/// as Level 1 files.
///
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
let layers = self.layers.read().unwrap();
let mut level0_deltas = layers.get_level0_deltas()?;
fn reconstruct_level0(&self, target_file_size: u64) -> Result<()> {
let mut layers = self.layers.write().unwrap();
let latest_delta_layer = layers.get_latest_delta_layer();
drop(layers);
// Only compact if enough layers have accumulated.
if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() {
return Ok(());
}
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
// level 0 files that form a contiguous sequence, such that the end
// LSN of previous file matches the start LSN of the next file.
//
// Note that if the files don't form such a sequence, we might
// "compact" just a single file. That's a bit pointless, but it allows
// us to get rid of the level 0 file, and compact the other files on
// the next iteration. This could probably made smarter, but such
// "gaps" in the sequence of level 0 files should only happen in case
// of a crash, partial download from cloud storage, or something like
// that, so it's not a big deal in practice.
level0_deltas.sort_by_key(|l| l.get_lsn_range().start);
let mut level0_deltas_iter = level0_deltas.iter();
let first_level0_delta = level0_deltas_iter.next().unwrap();
let mut prev_lsn_end = first_level0_delta.get_lsn_range().end;
let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
for l in level0_deltas_iter {
let lsn_range = l.get_lsn_range();
if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(Arc::clone(l));
prev_lsn_end = lsn_range.end;
}
let lsn_range = Range {
start: deltas_to_compact.first().unwrap().get_lsn_range().start,
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
};
info!(
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
lsn_range.start,
lsn_range.end,
deltas_to_compact.len(),
level0_deltas.len()
);
for l in deltas_to_compact.iter() {
info!("compact includes {}", l.filename().display());
}
// We don't need the original list of layers anymore. Drop it so that
// we don't accidentally use it later in the function.
drop(level0_deltas);
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = deltas_to_compact
.iter()
.map(|l| l.iter())
.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
true
}
});
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = deltas_to_compact
.iter()
.map(|l| l.key_iter())
.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
});
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
// It's possible that there is a single key with so many page versions that storing all of them in a single layer file
// would be too large. In that case, we also split on the LSN dimension.
//
// LSN
// ^
// |
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
//
//
// If one key (X) has a lot of page versions:
//
// LSN
// ^
// | (X)
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | +--+ |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | +--+ |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
let mut new_layers = Vec::new();
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
for x in all_values_iter {
let (key, lsn, value) = x?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
let mut next_key_size = 0u64;
let is_dup_layer = dup_end_lsn.is_valid();
dup_start_lsn = Lsn::INVALID;
if !same_key {
dup_end_lsn = Lsn::INVALID;
}
// Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
next_key_size = next_size;
if key != next_key {
if dup_end_lsn.is_valid() {
// We are writting segment with duplicates:
// place all remaining values of this key in separate segment
dup_start_lsn = dup_end_lsn; // new segments starts where old stops
dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
}
break;
}
key_values_total_size += next_size;
// Check if it is time to split segment: if total keys size is larger than target file size.
// We need to avoid generation of empty segments if next_size > target_file_size.
if key_values_total_size > target_file_size && lsn != next_lsn {
// Split key between multiple layers: such layer can contain only single key
dup_start_lsn = if dup_end_lsn.is_valid() {
dup_end_lsn // new segment with duplicates starts where old one stops
} else {
lsn // start with the first LSN for this key
};
dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
break;
}
}
// handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
let mut new_layers = Vec::new();
let mut last_key: Option<Key> = None;
if let Some(last_delta_layer) = latest_delta_layer {
let end_lsn = last_delta_layer.get_lsn_range().end;
let lsn_range = end_lsn..end_lsn + 1;
for (key, lsn, _) in last_delta_layer.key_iter(true) {
let value = self.get(key, lsn)?;
if let Some(curr_writer) = &writer {
if curr_writer.size() > target_file_size {
new_layers.push(writer.take().unwrap().finish(key)?);
writer = None;
}
}
// Remember size of key value because at next iteration we will access next item
key_values_total_size = next_key_size;
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
dup_start_lsn..dup_end_lsn
} else {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
)?);
if writer.is_none() {
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
lsn_range.clone(),
)?);
}
writer
.as_mut()
.unwrap()
.put_value(key, end_lsn, Value::Image(value))?;
last_key = Some(key);
}
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
new_layers.push(writer.finish(last_key.unwrap().next())?);
}
// Sync layers
@@ -1787,23 +1595,6 @@ impl Timeline {
new_layer_paths.insert(new_delta_path);
layers.insert_historic(Arc::new(l));
}
// Now that we have reshuffled the data to set of new delta layers, we can
// delete the old ones
let mut layer_paths_do_delete = HashSet::with_capacity(deltas_to_compact.len());
drop(all_keys_iter);
for l in deltas_to_compact {
if let Some(path) = l.local_path() {
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_paths_do_delete.insert(path);
}
l.delete()?;
layers.remove_historic(l);
}
drop(layers);
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_upload(
self.tenant_id,
@@ -1811,11 +1602,6 @@ impl Timeline {
new_layer_paths,
None,
);
storage_sync::schedule_layer_delete(
self.tenant_id,
self.timeline_id,
layer_paths_do_delete,
);
}
Ok(())
@@ -1823,10 +1609,10 @@ impl Timeline {
/// Update information about which layer files need to be retained on
/// garbage collection. This is separate from actually performing the GC,
/// and is updated more frequently, so that compaction can remove obsolete
/// and is updated more frequently, so that reconstruction can remove obsolete
/// page versions more aggressively.
///
/// TODO: that's wishful thinking, compaction doesn't actually do that
/// TODO: that's wishful thinking, reconstruction doesn't actually do that
/// currently.
///
/// The caller specifies how much history is needed with the 3 arguments:

View File

@@ -82,10 +82,6 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
/// If enabled, records all read requests for this ... TODO
/// Even though read traces are small, there's no delete meachanism so they can
/// pile up. So we disable this by default.
pub trace_read_requests: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -109,7 +105,6 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
impl TenantConfOpt {
@@ -143,9 +138,6 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
}
}
@@ -215,7 +207,6 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
}
}
@@ -241,7 +232,6 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
trace_read_requests: false,
}
}
}

View File

@@ -108,10 +108,6 @@ pub fn init_tenant_mgr(
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
/// and the load continues.
///
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
///
/// Attach happens on startup and sucessful timeline downloads
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
pub fn attach_local_tenants(
@@ -177,28 +173,16 @@ fn load_local_tenant(
remote_index.clone(),
conf.remote_storage_config.is_some(),
));
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
if !tenant_timelines_dir.is_dir() {
error!(
"Tenant {} has no timelines directory at {}",
tenant_id,
tenant_timelines_dir.display()
);
tenant.set_state(TenantState::Broken);
} else {
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
}
tenant
}
@@ -646,10 +630,14 @@ fn collect_timelines_for_tenant(
}
if tenant_timelines.is_empty() {
// this is normal, we've removed all broken, empty and temporary timeline dirs
// but should allow the tenant to stay functional and allow creating new timelines
// on a restart, we require tenants to have the timelines dir, so leave it on disk
debug!("Tenant {tenant_id} has no timelines loaded");
match remove_if_empty(&timelines_dir) {
Ok(true) => info!(
"Removed empty tenant timelines directory {}",
timelines_dir.display()
),
Ok(false) => (),
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
}
}
Ok((tenant_id, tenant_timelines))

View File

@@ -1,36 +0,0 @@
use bytes::Bytes;
use std::{
fs::{create_dir_all, File},
io::{BufWriter, Write},
path::PathBuf,
};
pub struct Tracer {
writer: BufWriter<File>,
}
impl Drop for Tracer {
fn drop(&mut self) {
self.flush()
}
}
impl Tracer {
pub fn new(path: PathBuf) -> Self {
let parent = path.parent().expect("failed to parse parent path");
create_dir_all(parent).expect("failed to create trace dir");
let file = File::create(path).expect("failed to create trace file");
Tracer {
writer: BufWriter::new(file),
}
}
pub fn trace(&mut self, msg: &Bytes) {
self.writer.write(msg).expect("failed to write trace");
}
pub fn flush(&mut self) {
self.writer.flush().expect("failed to flush trace file");
}
}

View File

@@ -264,76 +264,12 @@ pageserver_flush(void)
}
}
// Entry of the page_cache
typedef struct
{
// HACK Just xor of bytes lol
char request_hash;
// Points directly to a NeonResponse. We can't just own the
// NeonResponse because it's a "supertype", so it's not Sized.
char* response;
int len;
} NeonRequestResponse;
#define MAX_PAGE_CACHE_SIZE 50
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
int page_cache_size = 0;
int page_cache_head = 0;
static NeonResponse *
pageserver_call(NeonRequest * request)
{
// Compute hash
char hash = 0;
StringInfoData req_buff;
req_buff = nm_pack_request(request);
for (int i = 0; i < req_buff.len; i++) {
hash ^= req_buff.data[i];
}
pfree(req_buff.data);
// If result is cached, memcpy and return
for (int i = 0; i < page_cache_size; i++) {
if (page_cache[i].request_hash == hash) {
int len = page_cache[0].response->len;
NeonResponse *resp = palloc0(len);
// I'd rather Rc than memcpy, but this is not rust :(
memcpy(resp, page_cache[0].response->data, len);
elog(LOG, "cache hit !!!");
return resp;
}
}
// Send request, get response
pageserver_send(request);
pageserver_flush();
NeonResponse *resp = pageserver_receive();
// Get length
int len = -1;
switch (resp->tag) {
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
}
// Cache result
page_cache[page_cache_head].request_hash = hash;
if (page_cache_head < page_cache_size) {
pfree(page_cache[page_cache_head].response);
}
page_cache[page_cache_head].len = len;
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
memcpy(page_cache[page_cache_head].response, resp, len);
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
page_cache_size += 1;
}
return resp;
return pageserver_receive();
}
page_server_api api = {

View File

@@ -5,7 +5,6 @@ filterwarnings =
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
addopts =
-m 'not remote_cluster'
--ignore=test_runner/performance
markers =
remote_cluster
testpaths =

View File

@@ -33,7 +33,6 @@ toml_edit = { version = "0.13", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }

View File

@@ -1,4 +1,3 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use safekeeper_api::models;

View File

@@ -27,13 +27,14 @@ mod timelines_global_map;
pub use timelines_global_map::GlobalTimelines;
pub mod defaults {
use const_format::formatcp;
use std::time::Duration;
pub use safekeeper_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
}

View File

@@ -56,14 +56,6 @@ If you want to run all tests that have the string "bench" in their names:
`./scripts/pytest -k bench`
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
`./scripts/pytest -n4`
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
`./scripts/pytest test_runner/performance`
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.

View File

@@ -91,25 +91,10 @@ class NeonCompare(PgCompare):
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
self.tenant, _ = self.env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# HACK enable request tracing, as an experiment
# NOTE This must be done before the pg starts pagestream
# TODO why does it not work?
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
# "trace_read_requests": "true",
# })
# We only use one branch and one timeline
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
self._pg = self.env.postgres.create_start(
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
self.env.neon_cli.create_branch(branch_name, "empty")
self._pg = self.env.postgres.create_start(branch_name)
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
@property
def pg(self):
@@ -124,10 +109,10 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
def compact(self):
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
def report_peak_memory_use(self) -> None:
self.zenbenchmark.record(
@@ -139,7 +124,7 @@ class NeonCompare(PgCompare):
def report_size(self) -> None:
timeline_size = self.zenbenchmark.get_timeline_size(
self.env.repo_dir, self.tenant, self.timeline
self.env.repo_dir, self.env.initial_tenant, self.timeline
)
self.zenbenchmark.record(
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER

View File

@@ -455,9 +455,6 @@ class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]:
@@ -586,9 +583,7 @@ class NeonEnvBuilder:
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
@@ -1749,37 +1744,6 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
@dataclass
class ReplayBin:
"""A helper class for replaying pageserver wal and read traces."""
traces_dir: str
def replay_all(self, pageserver_connstr):
replay_binpath = os.path.join(str(neon_binpath), "replay")
args = [
replay_binpath,
self.traces_dir,
pageserver_connstr,
]
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
subprocess.run(args)
def draw_all(self):
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
args = [
draw_binpath,
self.traces_dir,
]
subprocess.run(args)
@pytest.fixture(scope="function")
def replay_bin(test_output_dir):
traces_dir = os.path.join(test_output_dir, "repo", "traces")
return ReplayBin(traces_dir)
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host="localhost", port=port, dbname="postgres")

View File

@@ -175,11 +175,6 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#

View File

@@ -1,57 +0,0 @@
from contextlib import closing
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
# This test is a demonstration of how to do trace playback. With the current
# workload it uses it's not testing anything meaningful.
def test_trace_replay(
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant, _ = env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "true",
# })
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
with zenbenchmark.record_duration("run"):
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
# Stop pg so we drop the connection and flush the traces
pg.stop()
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "false",
# })
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
# assert trace_path.exists()
replay_bin.draw_all()
return
print("replaying")
ps_connstr = env.pageserver.connstr()
with zenbenchmark.record_duration("replay"):
output = replay_bin.replay_all(ps_connstr)
print(output)

View File

@@ -1,7 +1,6 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder

View File

@@ -1,5 +1,4 @@
import os
import shutil
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -8,13 +7,8 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
from prometheus_client.samples import Sample
@@ -207,63 +201,3 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_with_empty_tenants",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_without_timelines_dir = env.initial_tenant
log.info(
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
tenant_with_empty_timelines_dir = client.tenant_create()
log.info(
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
)
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline reinitialization after pageserver restart
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert (
len(tenants) == 1
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
loaded_tenant = tenants[0]
assert loaded_tenant["id"] == str(
tenant_with_empty_timelines_dir
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Empty tenant should be loaded and ready for timeline creation"

View File

@@ -19,7 +19,6 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }