Compare commits

..

10 Commits

Author SHA1 Message Date
Konstantin Knizhnik
121c19fcd6 Replace R-Tree with B-Tree in layer map 2022-10-07 13:11:24 +03:00
Arthur Petukhovsky
687ba81366 Display sync safekeepers output in compute_ctl (#2571)
Pipe postgres output to compute_ctl stdout and create a test to check that compute_ctl works and prints postgres logs.
2022-10-06 13:53:52 +00:00
Andrés
47bae68a2e Make get_lsn_by_timestamp available in mgmt API (#2536) (#2560)
Co-authored-by: andres <andres.rodriguez@outlook.es>
2022-10-06 12:42:50 +03:00
Joonas Koivunen
e8b195acb7 fix: apply notify workaround on m1 mac docker (#2564)
workaround as discussed in the notify repository.
2022-10-06 11:13:40 +03:00
Anastasia Lubennikova
254cb7dc4f Update CI script to push compute-node-v15 to dockerhub 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
ed85d97f17 bump vendor/postgres-v15. Rebase it to Stamp 15rc2 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
4a216c5f7f Use PostGIS 3.3.1 that is compatible with pg 15 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
c5a428a61a Update Dockerfile.compute-node-v15 to match v14 version.
Fix build script to promote the image for v15 to neon dockerhub
2022-10-06 10:50:08 +03:00
Konstantin Knizhnik
ff8c481777 Normalize last_record LSN in wal receiver (#2529)
* Add test for branching on page boundary

* Normalize start recovery point

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>

Co-authored-by:  Thang Pham <thang@neon.tech>
2022-10-06 09:01:56 +03:00
Arthur Petukhovsky
f25dd75be9 Fix deadlock in safekeeper metrics (#2566)
We had a problem where almost all of the threads were waiting on a futex syscall. More specifically:
- `/metrics` handler was inside `TimelineCollector::collect()`, waiting on a mutex for a single Timeline
- This exact timeline was inside `control_file::FileStorage::persist()`, waiting on a mutex for Lazy initialization of `PERSIST_CONTROL_FILE_SECONDS`
- `PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram>` was blocked on `prometheus::register`
- `prometheus::register` calls `DEFAULT_REGISTRY.write().register()` to take a write lock on Registry and add a new metric
- `DEFAULT_REGISTRY` lock was already taken inside `DEFAULT_REGISTRY.gather()`, which was called by `/metrics` handler to collect all metrics

This commit creates another Registry with a separate lock, to avoid deadlock in a case where `TimelineCollector` triggers registration of new metrics inside default registry.
2022-10-06 01:07:02 +03:00
33 changed files with 656 additions and 1043 deletions

View File

@@ -564,7 +564,7 @@ jobs:
promote-images:
runs-on: dev
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-tools-image ]
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli
strategy:
@@ -573,7 +573,7 @@ jobs:
# compute-node uses postgres 14, which is default now
# cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated
name: [ neon, compute-node, compute-node-v14, compute-tools ]
name: [ neon, compute-node, compute-node-v14, compute-node-v15, compute-tools ]
steps:
- name: Promote image to latest
@@ -608,6 +608,9 @@ jobs:
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
@@ -638,6 +641,9 @@ jobs:
- name: Push compute node v14 image to Docker Hub
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push compute node v15 image to Docker Hub
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
@@ -650,6 +656,7 @@ jobs:
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
calculate-deploy-targets:
runs-on: [ self-hosted, Linux, k8s-runner ]

7
Cargo.lock generated
View File

@@ -2059,7 +2059,6 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"svg_fmt",
"tar",
"tempfile",
"thiserror",
@@ -3334,12 +3333,6 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "svg_fmt"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
[[package]]
name = "symbolic-common"
version = "8.8.0"

View File

@@ -5,7 +5,7 @@
ARG TAG=pinned
# apparently, ARGs don't get replaced in RUN commands in kaniko
# ARG POSTGIS_VERSION=3.3.0
# ARG POSTGIS_VERSION=3.3.1
# ARG PLV8_VERSION=3.1.4
# ARG PG_VERSION=v15
@@ -13,9 +13,12 @@ ARG TAG=pinned
# Layer "build-deps"
#
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
#
# Layer "pg-build"
@@ -42,11 +45,11 @@ RUN cd postgres && \
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
tar xvzf postgis-3.3.0.tar.gz && \
cd postgis-3.3.0 && \
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
tar xvzf postgis-3.3.1.tar.gz && \
cd postgis-3.3.1 && \
./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure && \
@@ -64,15 +67,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
# Build plv8
#
FROM build-deps AS plv8-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
# https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -84,12 +85,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#
# Layer "h3-pg-build"
# Build h3_pg
#
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
cd h3-4.0.1 && \
mkdir build && \
cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/h3 make install && \
cp -R /h3/usr / && \
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
tar xvzf h3-pg.tgz && \
cd h3-pg-4.0.1 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -137,8 +172,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
chmod 0750 /var/db/postgres/compute && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# TODO: Check if we can make the extension setup more modular versus a linear build
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -178,7 +178,6 @@ impl ComputeNode {
.args(&["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("postgres --sync-safekeepers failed to start");
@@ -191,10 +190,10 @@ impl ComputeNode {
if !sync_output.status.success() {
anyhow::bail!(
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}, stderr: {}",
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
sync_output.status,
String::from_utf8(sync_output.stdout).expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
String::from_utf8(sync_output.stderr).expect("postgres --sync-safekeepers exited, and stderr is not utf-8"),
String::from_utf8(sync_output.stdout)
.expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
);
}

View File

@@ -250,9 +250,36 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res| {
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
let _ = tx.send(res);
})?;
}) {
Ok(watcher) => (Box::new(watcher), rx),
Err(e) => {
match e.kind {
notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
// docker on m1 macs does not support recommended_watcher
// but return "Function not implemented (os error 38)"
// see https://github.com/notify-rs/notify/issues/423
let (tx, rx) = std::sync::mpsc::channel();
// let's poll it faster than what we check the results for (100ms)
let config =
notify::Config::default().with_poll_interval(Duration::from_millis(50));
let watcher = notify::PollWatcher::new(
move |res| {
let _ = tx.send(res);
},
config,
)?;
(Box::new(watcher), rx)
}
_ => return Err(e.into()),
}
}
};
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
let started_at = Instant::now();

View File

@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
let url = self.url().to_owned();
Err(PageserverHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Response error: {}", err_body.msg),
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
@@ -181,15 +181,14 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
.context("Failed to create tenant")?;
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info = self.timeline_create(
initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
).context("Failed to create timeline")?;
)?;
Ok(initial_timeline_info.timeline_id)
}
@@ -420,11 +419,6 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -3,7 +3,7 @@
//! Otherwise, we might not see all metrics registered via
//! a default registry.
use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::{core, default_registry, proto};
@@ -17,6 +17,7 @@ pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
use prometheus::{Registry, Result};
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
@@ -32,13 +33,27 @@ macro_rules! register_uint_gauge_vec {
}};
}
/// Special internal registry, to collect metrics independently from the default registry.
/// Was introduced to fix deadlock with lazy registration of metrics in the default registry.
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> Result<()> {
INTERNAL_REGISTRY.register(c)
}
/// Gathers all Prometheus metrics and records the I/O stats just before that.
///
/// Metrics gathering is a relatively simple and standalone operation, so
/// it might be fine to do it this way to keep things simple.
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
update_rusage_metrics();
prometheus::gather()
let mut mfs = prometheus::gather();
let mut internal_mfs = INTERNAL_REGISTRY.gather();
mfs.append(&mut internal_mfs);
mfs
}
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {

View File

@@ -52,7 +52,6 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
#[serde_as]

View File

@@ -9,6 +9,7 @@ use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use tokio::task::JoinError;
use tracing::info;
use std::future::Future;
@@ -35,7 +36,13 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metrics = metrics::gather();
let metrics = tokio::task::spawn_blocking(move || {
// Currently we take a lot of mutexes while collecting metrics, so it's
// better to spawn a blocking task to avoid blocking the event loop.
metrics::gather()
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
encoder.encode(&metrics, &mut buffer).unwrap();
let response = Response::builder()

View File

@@ -67,7 +67,6 @@ remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -1,185 +0,0 @@
use clap::{App, Arg};
use futures::TryFutureExt;
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
use std::io::Write;
use std::{
fs::{read_dir, File},
io::BufReader,
str::FromStr,
};
use svg_fmt::*;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
(set.len(), map)
}
fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace visualization tool")
.about("Makes a svg file that displays the read pattern")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.get_matches();
// (blkno, lsn)
let mut dots = Vec::<(u32, Lsn)>::new();
let mut dump_file = File::create("dump.txt").expect("can't make file");
let mut deltas = HashMap::<i32, u32>::new();
let mut prev1: u32 = 0;
let mut prev2: u32 = 0;
let mut prev3: u32 = 0;
println!("scanning trace ...");
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("tenant: {tenant_id}");
println!("opening {path:?}");
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("hi");
// println!("timeline: {timeline_id}");
println!("opening {path:?}");
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
println!("opening {path:?}");
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
// println!("Parsed message {:?}", msg);
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetPage(req) => {
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
// dots.push((req.blkno, req.lsn));
// HACK
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
let delta1 = (req.blkno as i32) - (prev1 as i32);
let delta2 = (req.blkno as i32) - (prev2 as i32);
let delta3 = (req.blkno as i32) - (prev3 as i32);
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
delta1
} else {
delta2
};
if i32::abs(delta3) < i32::abs(delta) {
delta = delta3;
}
let delta = delta1;
prev3 = prev2;
prev2 = prev1;
prev1 = req.blkno;
match deltas.get_mut(&delta) {
Some(c) => {*c += 1;},
None => {deltas.insert(delta, 1);},
};
if delta == 9 {
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
}
},
PagestreamFeMessage::DbSize(_) => {}
};
// HACK
// if dots.len() > 1000 {
// break;
// }
}
}
}
}
let mut other = deltas.len();
deltas.retain(|_, count| *count > 3);
other -= deltas.len();
dbg!(other);
dbg!(deltas);
// Collect all coordinates
let mut keys: Vec<u32> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for dot in &dots {
keys.push(dot.0);
lsns.push(dot.1);
}
// Analyze
let (key_max, key_map) = analyze(keys);
let (lsn_max, lsn_map) = analyze(lsns);
// Draw
println!("drawing trace ...");
let mut svg_file = File::create("out.svg").expect("can't make file");
writeln!(
&mut svg_file,
"{}",
BeginSvg {
w: (key_max + 1) as f32,
h: (lsn_max + 1) as f32,
}
)?;
for (key, lsn) in &dots {
let key = key_map.get(&key).unwrap();
let lsn = lsn_map.get(&lsn).unwrap();
writeln!(
&mut svg_file,
" {}",
rectangle(
*key as f32,
*lsn as f32,
10.0,
10.0
)
.fill(Fill::Color(red()))
.stroke(Stroke::Color(black(), 0.0))
.border_radius(0.5)
)?;
// println!(" {}",
// rectangle(key_start as f32 + stretch * margin,
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
// key_diff as f32 - stretch * 2.0 * margin,
// stretch * (lsn_diff - 2.0 * margin))
// // .fill(rgb(200, 200, 200))
// .fill(fill)
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
// .border_radius(0.4)
// );
}
writeln!(&mut svg_file, "{}", EndSvg)?;
Ok(())
}

View File

@@ -1,133 +0,0 @@
use bytes::BytesMut;
use pageserver::page_service::PagestreamFeMessage;
use std::{
fs::{read_dir, File},
io::BufReader,
path::PathBuf,
str::FromStr,
};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use clap::{App, Arg};
use utils::{
id::{TenantId, TimelineId},
pq_proto::{BeMessage, FeMessage},
};
// TODO put this in library, dedup with stuff in control_plane
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream,
}
impl PagestreamApi {
async fn connect(
connstr: &str,
tenant: &TenantId,
timeline: &TimelineId,
) -> anyhow::Result<PagestreamApi> {
// Parse connstr
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
// Connect
let mut stream = TcpStream::connect(tcp_addr).await?;
let (client, conn) = config
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant, timeline);
tokio::select! {
_ = conn => panic!("connection closed during pagestream initialization"),
_ = client.query(init_query.as_str(), &[]) => (),
};
Ok(PagestreamApi { stream })
}
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let request = {
let msg_bytes = msg.serialize();
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&msg_bytes);
// TODO it's actually a fe message but it doesn't have a serializer yet
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
self.stream.write_all(&request).await?;
// TODO It's actually a be message, but it doesn't have a parser.
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
let _response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
Ok(())
}
}
async fn replay_trace<R: std::io::Read>(
reader: &mut R,
mut pagestream: PagestreamApi,
) -> anyhow::Result<()> {
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
println!("Parsed message {:?}", msg);
pagestream.make_request(msg).await?;
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace replay tool")
.about("Replays wal or read traces to test pageserver performance")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.arg(
Arg::new("pageserver_connstr")
.takes_value(true)
.help("Pageserver pg endpoint to connect to"),
)
.get_matches();
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// TODO The pageserver deletes existing traces?
// LOL yes because I use tenant ID as trace id
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
// let len = file.metadata().unwrap().len();
// println!("replaying {:?} trace {} bytes", path, len);
replay_trace(&mut reader, pagestream).await?;
}
}
}
Ok(())
}

View File

@@ -364,23 +364,6 @@ impl PageServerConf {
self.timelines_path(tenant_id).join(timeline_id.to_string())
}
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}
pub fn trace_path(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &TimelineId, // TODO make a new type
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {

View File

@@ -207,6 +207,62 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get LSN by a timestamp
parameters:
- name: timestamp
in: query
required: true
schema:
type: string
format: date-time
description: A timestamp to get the LSN
responses:
"200":
description: OK
content:
application/json:
schema:
type: string
"400":
description: Error when no tenant id found in path, no timeline id or invalid timestamp
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id

View File

@@ -12,6 +12,7 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant::{TenantState, Timeline};
@@ -265,6 +266,23 @@ fn query_param_present(request: &Request<Body>, param: &str) -> bool {
.unwrap_or(false)
}
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
request.uri().query().map_or(
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|v| {
url::form_urlencoded::parse(v.as_bytes())
.into_owned()
.find(|(k, _)| k == param_name)
.map_or(
Err(ApiError::BadRequest(anyhow!(
"no {param_name} specified in query parameters"
))),
|(_, v)| Ok(v),
)
},
)
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -329,6 +347,33 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
}
}
async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let timeline = tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id))
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
let result = match timeline
.find_lsn_for_timestamp(timestamp_pg)
.map_err(ApiError::InternalServerError)?
{
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
json_response(StatusCode::OK, result)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
@@ -617,9 +662,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
@@ -911,6 +953,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
get_lsn_by_timestamp_handler,
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
testing_api!("run timeline GC", timeline_gc_handler),

View File

@@ -16,8 +16,6 @@ pub mod tenant;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_tasks;
// pub mod timelines;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -10,11 +10,8 @@
//
use anyhow::{bail, ensure, Context, Result};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::Buf;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
use std::net::TcpListener;
use std::str;
@@ -37,24 +34,19 @@ use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
// TODO these should be in a library outside the pageserver
#[derive(Debug)]
pub enum PagestreamFeMessage {
enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
@@ -62,7 +54,7 @@ pub enum PagestreamFeMessage {
}
// Wrapped in libpq CopyData
pub enum PagestreamBeMessage {
enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
@@ -71,152 +63,106 @@ pub enum PagestreamBeMessage {
}
#[derive(Debug)]
pub struct PagestreamExistsRequest {
struct PagestreamExistsRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamNblocksRequest {
struct PagestreamNblocksRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
pub struct PagestreamGetPageRequest {
struct PagestreamGetPageRequest {
latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
lsn: Lsn,
rel: RelTag,
blkno: u32,
}
#[derive(Debug)]
pub struct PagestreamDbSizeRequest {
struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
struct PagestreamExistsResponse {
exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
struct PagestreamNblocksResponse {
n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
struct PagestreamGetPageResponse {
page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.read_u32::<BigEndian>()?,
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
@@ -473,17 +419,6 @@ impl PageServerHandler {
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let mut tracer = if tenant.get_trace_read_requests() {
let path = tenant
.conf
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
Some(Tracer::new(path))
} else {
None
};
// Check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
@@ -508,24 +443,15 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Sync) => {
// TODO what now?
continue;
}
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
@@ -1133,33 +1059,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("get_lsn_by_timestamp ") {
// Locate LSN of last transaction with timestamp less or equal than sppecified
// TODO lazy static
let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$")
.unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
let timestamp_pg = to_pg_timestamp(timestamp);
self.check_permission(Some(tenant_id))?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"lsn",
)]))?;
let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? {
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
pgb.write_message(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {
bail!("unknown command");
}

View File

@@ -593,14 +593,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
// TODO is there a need for all this boilerplate?
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}

View File

@@ -15,19 +15,25 @@ use crate::repository::Key;
use crate::tenant::inmemory_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use crate::tenant::storage_layer::{range_eq, range_overlaps};
use amplify_num::i256;
use anyhow::Result;
use num_traits::identities::{One, Zero};
use num_traits::{Bounded, Num, Signed};
use rstar::{RTree, RTreeObject, AABB};
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
use std::ops::Range;
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
struct BTreeKey {
lsn: Lsn,
seq: usize,
}
impl BTreeKey {
fn new(lsn: Lsn) -> BTreeKey {
BTreeKey { lsn, seq: 0 }
}
}
///
/// LayerMap tracks what layers exist on a timeline.
///
@@ -53,165 +59,14 @@ pub struct LayerMap {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// All the historic layers are kept here
historic_layers: RTree<LayerRTreeObject>,
historic_layers: BTreeMap<BTreeKey, Arc<dyn Layer>>,
layers_seqno: usize,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<dyn Layer>>,
}
struct LayerRTreeObject {
layer: Arc<dyn Layer>,
}
// Representation of Key as numeric type.
// We can not use native implementation of i128, because rstar::RTree
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
// By using i256 as the type, even though all the actual values would fit in i128, we can be
// sure that multiplication doesn't overflow.
//
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
struct IntKey(i256);
impl Copy for IntKey {}
impl IntKey {
fn from(i: i128) -> Self {
IntKey(i256::from(i))
}
}
impl Bounded for IntKey {
fn min_value() -> Self {
IntKey(i256::MIN)
}
fn max_value() -> Self {
IntKey(i256::MAX)
}
}
impl Signed for IntKey {
fn is_positive(&self) -> bool {
self.0 > i256::ZERO
}
fn is_negative(&self) -> bool {
self.0 < i256::ZERO
}
fn signum(&self) -> Self {
match self.0.cmp(&i256::ZERO) {
Ordering::Greater => IntKey(i256::ONE),
Ordering::Less => IntKey(-i256::ONE),
Ordering::Equal => IntKey(i256::ZERO),
}
}
fn abs(&self) -> Self {
IntKey(self.0.abs())
}
fn abs_sub(&self, other: &Self) -> Self {
if self.0 <= other.0 {
IntKey(i256::ZERO)
} else {
IntKey(self.0 - other.0)
}
}
}
impl Neg for IntKey {
type Output = Self;
fn neg(self) -> Self::Output {
IntKey(-self.0)
}
}
impl Rem for IntKey {
type Output = Self;
fn rem(self, rhs: Self) -> Self::Output {
IntKey(self.0 % rhs.0)
}
}
impl Div for IntKey {
type Output = Self;
fn div(self, rhs: Self) -> Self::Output {
IntKey(self.0 / rhs.0)
}
}
impl Add for IntKey {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
IntKey(self.0 + rhs.0)
}
}
impl Sub for IntKey {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
IntKey(self.0 - rhs.0)
}
}
impl Mul for IntKey {
type Output = Self;
fn mul(self, rhs: Self) -> Self::Output {
IntKey(self.0 * rhs.0)
}
}
impl One for IntKey {
fn one() -> Self {
IntKey(i256::ONE)
}
}
impl Zero for IntKey {
fn zero() -> Self {
IntKey(i256::ZERO)
}
fn is_zero(&self) -> bool {
self.0 == i256::ZERO
}
}
impl Num for IntKey {
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
}
}
impl PartialEq for LayerRTreeObject {
fn eq(&self, other: &Self) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(&self.layer, &other.layer)
}
}
impl RTreeObject for LayerRTreeObject {
type Envelope = AABB<[IntKey; 2]>;
fn envelope(&self) -> Self::Envelope {
let key_range = self.layer.get_key_range();
let lsn_range = self.layer.get_lsn_range();
AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
)
}
}
/// Return value of LayerMap::search
pub struct SearchResult {
pub layer: Arc<dyn Layer>,
@@ -234,23 +89,17 @@ impl LayerMap {
// linear search
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<dyn Layer>> = None;
let mut latest_img_lsn: Option<Lsn> = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
[
IntKey::from(key.to_i128()),
IntKey::from(end_lsn.0 as i128 - 1),
],
);
for e in self
let mut latest_img_lsn = Lsn(0);
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
while let Some((_key, l)) = iter.next_back() {
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
if !l.get_key_range().contains(&key) {
continue;
}
let img_lsn = l.get_lsn_range().start;
assert!(img_lsn < end_lsn);
if Lsn(img_lsn.0 + 1) == end_lsn {
@@ -260,23 +109,23 @@ impl LayerMap {
lsn_floor: img_lsn,
}));
}
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
latest_img = Some(Arc::clone(l));
latest_img_lsn = Some(img_lsn);
}
latest_img = Some(Arc::clone(l));
latest_img_lsn = img_lsn;
break;
}
// Search the delta layers
let mut latest_delta: Option<Arc<dyn Layer>> = None;
for e in self
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
while let Some((_key, l)) = iter.next_back() {
if !l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
if !l.get_key_range().contains(&key) {
continue;
}
if l.get_lsn_range().start >= end_lsn {
info!(
"Candidate delta layer {}..{} is too new for lsn {}",
@@ -286,6 +135,9 @@ impl LayerMap {
);
}
assert!(l.get_lsn_range().start < end_lsn);
if l.get_lsn_range().end <= latest_img_lsn {
continue;
}
if l.get_lsn_range().end >= end_lsn {
// this layer contains the requested point in the key/lsn space.
// No need to search any further
@@ -311,10 +163,7 @@ impl LayerMap {
"found (old) layer {} for request on {key} at {end_lsn}",
l.filename().display(),
);
let lsn_floor = std::cmp::max(
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
l.get_lsn_range().start,
);
let lsn_floor = std::cmp::max(Lsn(latest_img_lsn.0 + 1), l.get_lsn_range().start);
Ok(Some(SearchResult {
lsn_floor,
layer: l,
@@ -322,7 +171,7 @@ impl LayerMap {
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Ok(Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
lsn_floor: latest_img_lsn,
layer: l,
}))
} else {
@@ -338,7 +187,14 @@ impl LayerMap {
if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone());
}
self.historic_layers.insert(LayerRTreeObject { layer });
self.historic_layers.insert(
BTreeKey {
lsn: layer.get_lsn_range().start,
seq: self.layers_seqno,
},
layer,
);
self.layers_seqno += 1;
NUM_ONDISK_LAYERS.inc();
}
@@ -360,10 +216,26 @@ impl LayerMap {
.retain(|other| !Arc::ptr_eq(other, &layer));
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
}
assert!(self
.historic_layers
.remove(&LayerRTreeObject { layer })
.is_some());
let len_before = self.historic_layers.len();
#[allow(clippy::vtable_address_comparisons)]
self.historic_layers
.retain(|_key, other| !Arc::ptr_eq(other, &layer));
if self.historic_layers.len() != len_before - 1 {
assert!(self.historic_layers.len() == len_before);
error!(
"Failed to remove {} layer: {}..{}__{}..{}",
if layer.is_incremental() {
"inremental"
} else {
"image"
},
layer.get_key_range().start,
layer.get_key_range().end,
layer.get_lsn_range().start,
layer.get_lsn_range().end
);
}
assert!(self.historic_layers.len() == len_before - 1);
NUM_ONDISK_LAYERS.dec();
}
@@ -380,21 +252,10 @@ impl LayerMap {
loop {
let mut made_progress = false;
let envelope = AABB::from_corners(
[
IntKey::from(range_remain.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(range_remain.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
for (_key, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
@@ -417,39 +278,30 @@ impl LayerMap {
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
self.historic_layers.iter().map(|e| e.layer.clone())
self.historic_layers
.iter()
.map(|(_key, layer)| layer.clone())
}
/// Find the last image layer that covers 'key', ignoring any image layers
/// newer than 'lsn'.
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
let mut candidate_lsn = Lsn(0);
let mut candidate = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0)],
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
);
for e in self
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1));
while let Some((_key, l)) = iter.next_back() {
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
if this_lsn < candidate_lsn {
// our previous candidate was better
if !l.get_key_range().contains(&key) {
continue;
}
candidate_lsn = this_lsn;
candidate = Some(Arc::clone(l));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
return Some(Arc::clone(l));
}
candidate
None
}
///
@@ -466,18 +318,10 @@ impl LayerMap {
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
let mut points = vec![key_range.start];
let envelope = AABB::from_corners(
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
[
IntKey::from(key_range.end.to_i128()),
IntKey::from(lsn.0 as i128),
],
);
for e in self
for (_lsn, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1))
{
let l = &e.layer;
assert!(l.get_lsn_range().start <= lsn);
let range = l.get_key_range();
if key_range.contains(&range.start) {
@@ -514,26 +358,17 @@ impl LayerMap {
if lsn_range.start >= lsn_range.end {
return Ok(0);
}
let envelope = AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
for (_lsn, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{
let l = &e.layer;
if !l.is_incremental() {
continue;
}
if !range_overlaps(&l.get_key_range(), key_range) {
continue;
}
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
assert!(range_overlaps(&l.get_key_range(), key_range));
// We ignore level0 delta layers. Unless the whole keyspace fits
// into one partition
@@ -569,8 +404,8 @@ impl LayerMap {
}
println!("historic_layers:");
for e in self.historic_layers.iter() {
e.layer.dump(verbose)?;
for (_key, layer) in self.historic_layers.iter() {
layer.dump(verbose)?;
}
println!("End dump LayerMap");
Ok(())

View File

@@ -82,10 +82,6 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
/// If enabled, records all read requests for this ... TODO
/// Even though read traces are small, there's no delete meachanism so they can
/// pile up. So we disable this by default.
pub trace_read_requests: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -109,7 +105,6 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
impl TenantConfOpt {
@@ -143,9 +138,6 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
}
}
@@ -215,7 +207,6 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
}
}
@@ -241,7 +232,6 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
trace_read_requests: false,
}
}
}

View File

@@ -1,36 +0,0 @@
use bytes::Bytes;
use std::{
fs::{create_dir_all, File},
io::{BufWriter, Write},
path::PathBuf,
};
pub struct Tracer {
writer: BufWriter<File>,
}
impl Drop for Tracer {
fn drop(&mut self) {
self.flush()
}
}
impl Tracer {
pub fn new(path: PathBuf) -> Self {
let parent = path.parent().expect("failed to parse parent path");
create_dir_all(parent).expect("failed to create trace dir");
let file = File::create(path).expect("failed to create trace file");
Tracer {
writer: BufWriter::new(file),
}
}
pub fn trace(&mut self, msg: &Bytes) {
self.writer.write(msg).expect("failed to write trace");
}
pub fn flush(&mut self) {
self.writer.flush().expect("failed to flush trace file");
}
}

View File

@@ -12,6 +12,8 @@ use chrono::{NaiveDateTime, Utc};
use fail::fail_point;
use futures::StreamExt;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
@@ -156,6 +158,14 @@ pub async fn handle_walreceiver_connection(
// There might be some padding after the last full record, skip it.
startpoint += startpoint.calc_padding(8u32);
// If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
// for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
//. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
// but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
// header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
// to the safekeepers.
startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");

View File

@@ -264,76 +264,12 @@ pageserver_flush(void)
}
}
// Entry of the page_cache
typedef struct
{
// HACK Just xor of bytes lol
char request_hash;
// Points directly to a NeonResponse. We can't just own the
// NeonResponse because it's a "supertype", so it's not Sized.
char* response;
int len;
} NeonRequestResponse;
#define MAX_PAGE_CACHE_SIZE 50
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
int page_cache_size = 0;
int page_cache_head = 0;
static NeonResponse *
pageserver_call(NeonRequest * request)
{
// Compute hash
char hash = 0;
StringInfoData req_buff;
req_buff = nm_pack_request(request);
for (int i = 0; i < req_buff.len; i++) {
hash ^= req_buff.data[i];
}
pfree(req_buff.data);
// If result is cached, memcpy and return
for (int i = 0; i < page_cache_size; i++) {
if (page_cache[i].request_hash == hash) {
int len = page_cache[0].response->len;
NeonResponse *resp = palloc0(len);
// I'd rather Rc than memcpy, but this is not rust :(
memcpy(resp, page_cache[0].response->data, len);
elog(LOG, "cache hit !!!");
return resp;
}
}
// Send request, get response
pageserver_send(request);
pageserver_flush();
NeonResponse *resp = pageserver_receive();
// Get length
int len = -1;
switch (resp->tag) {
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
}
// Cache result
page_cache[page_cache_head].request_hash = hash;
if (page_cache_head < page_cache_size) {
pfree(page_cache[page_cache_head].response);
}
page_cache[page_cache_head].len = len;
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
memcpy(page_cache[page_cache_head].response, resp, len);
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
page_cache_size += 1;
}
return resp;
return pageserver_receive();
}
page_server_api api = {

View File

@@ -291,9 +291,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let registry = metrics::default_registry();
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
registry.register(Box::new(timeline_collector))?;
metrics::register_internal(Box::new(timeline_collector))?;
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];

View File

@@ -91,25 +91,10 @@ class NeonCompare(PgCompare):
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
self.tenant, _ = self.env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# HACK enable request tracing, as an experiment
# NOTE This must be done before the pg starts pagestream
# TODO why does it not work?
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
# "trace_read_requests": "true",
# })
# We only use one branch and one timeline
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
self._pg = self.env.postgres.create_start(
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
self.env.neon_cli.create_branch(branch_name, "empty")
self._pg = self.env.postgres.create_start(branch_name)
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
@property
def pg(self):
@@ -124,10 +109,10 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
def compact(self):
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
def report_peak_memory_use(self) -> None:
self.zenbenchmark.record(
@@ -139,7 +124,7 @@ class NeonCompare(PgCompare):
def report_size(self) -> None:
timeline_size = self.zenbenchmark.get_timeline_size(
self.env.repo_dir, self.tenant, self.timeline
self.env.repo_dir, self.env.initial_tenant, self.timeline
)
self.zenbenchmark.record(
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER

View File

@@ -1136,6 +1136,19 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def timeline_get_lsn_by_timestamp(
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
@@ -1187,6 +1200,7 @@ class AbstractNeonCli(abc.ABC):
arguments: List[str],
extra_env_vars: Optional[Dict[str, str]] = None,
check_return_code=True,
timeout=None,
) -> "subprocess.CompletedProcess[str]":
"""
Run the command with the specified arguments.
@@ -1233,6 +1247,7 @@ class AbstractNeonCli(abc.ABC):
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
if not res.returncode:
log.info(f"Run success: {res.stdout}")
@@ -1606,6 +1621,14 @@ class WalCraft(AbstractNeonCli):
res.check_returncode()
class ComputeCtl(AbstractNeonCli):
"""
A typed wrapper around the `compute_ctl` CLI tool.
"""
COMMAND = "compute_ctl"
class NeonPageserver(PgProtocol):
"""
An object representing a running pageserver.
@@ -1749,37 +1772,6 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
@dataclass
class ReplayBin:
"""A helper class for replaying pageserver wal and read traces."""
traces_dir: str
def replay_all(self, pageserver_connstr):
replay_binpath = os.path.join(str(neon_binpath), "replay")
args = [
replay_binpath,
self.traces_dir,
pageserver_connstr,
]
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
subprocess.run(args)
def draw_all(self):
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
args = [
draw_binpath,
self.traces_dir,
]
subprocess.run(args)
@pytest.fixture(scope="function")
def replay_bin(test_output_dir):
traces_dir = os.path.join(test_output_dir, "repo", "traces")
return ReplayBin(traces_dir)
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host="localhost", port=port, dbname="postgres")

View File

@@ -175,11 +175,6 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#

View File

@@ -1,57 +0,0 @@
from contextlib import closing
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
# This test is a demonstration of how to do trace playback. With the current
# workload it uses it's not testing anything meaningful.
def test_trace_replay(
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant, _ = env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "true",
# })
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
with zenbenchmark.record_duration("run"):
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
# Stop pg so we drop the connection and flush the traces
pg.stop()
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "false",
# })
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
# assert trace_path.exists()
replay_bin.draw_all()
return
print("replaying")
ps_connstr = env.pageserver.connstr()
with zenbenchmark.record_duration("replay"):
output = replay_bin.replay_all(ps_connstr)
print(output)

View File

@@ -6,6 +6,8 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
@@ -88,3 +90,39 @@ def test_branching_with_pgbench(
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)
# Test branching from an "unnormalized" LSN.
#
# Context:
# When doing basebackup for a newly created branch, pageserver generates
# 'pg_control' file to bootstrap WAL segment by specifying the redo position
# a "normalized" LSN based on the timeline's starting LSN:
#
# checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0;
#
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
#
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
XLOG_BLCKSZ = 8192
env = neon_simple_env
env.neon_cli.create_branch("b0")
pg0 = env.postgres.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
with pg0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
# and is smaller than `curr_lsn`.
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
pg1 = env.postgres.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])

View File

@@ -0,0 +1,203 @@
import os
from subprocess import TimeoutExpired
from fixtures.log_helper import log
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(pg.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
if "=" in line:
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = pg.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
pg.stop_and_destroy()
spec = (
"""
{
"format_version": 1.0,
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Neon Test",
"state": "restarted",
"roles": [
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": """
+ f'"{cfg_map["neon.safekeepers"]}"'
+ """,
"vartype": "string"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "32768",
"vartype": "integer"
},
{
"name": "port",
"value": """
+ f'"{cfg_map["port"]}"'
+ """,
"vartype": "integer"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "wal_sender_timeout",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "neon.tenant_id",
"value": """
+ f'"{cfg_map["neon.tenant_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.timeline_id",
"value": """
+ f'"{cfg_map["neon.timeline_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": """
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
+ """,
"vartype": "string"
}
]
},
"delta_operations": [
]
}
"""
)
ps_connstr = cfg_map["neon.pageserver_connstring"]
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
# run compute_ctl and wait for 10s
try:
ctl.raw_cli(
["--connstr", ps_connstr, "--pgdata", pgdata, "--spec", spec, "--pgbin", pg_bin_path],
timeout=10,
)
except TimeoutExpired as exc:
ctl_logs = exc.stderr.decode("utf-8")
log.info("compute_ctl output:\n" + ctl_logs)
start = "starting safekeepers syncing"
end = "safekeepers synced at LSN"
start_pos = ctl_logs.index(start)
assert start_pos != -1
end_pos = ctl_logs.index(end, start_pos)
assert end_pos != -1
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
# assert that --sync-safekeepers logs are present in the output
assert "connecting with node" in sync_safekeepers_logs
assert "connected with node" in sync_safekeepers_logs
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
assert "got votes from majority (2)" in sync_safekeepers_logs
assert "sending elected msg to node" in sync_safekeepers_logs

View File

@@ -15,7 +15,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
ps_cur = env.pageserver.connect().cursor()
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
@@ -38,37 +37,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
)
assert result == "future"
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
)
assert result == "past"
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
lsn = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
with env.pageserver.http_client() as client:
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result == "future"
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
assert result == "past"
pg_here.stop_and_destroy()
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
lsn = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
pg_here.stop_and_destroy()

View File

@@ -1,7 +1,6 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder