mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 15:20:38 +00:00
Compare commits
10 Commits
ps-trace
...
layer_map_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
121c19fcd6 | ||
|
|
687ba81366 | ||
|
|
47bae68a2e | ||
|
|
e8b195acb7 | ||
|
|
254cb7dc4f | ||
|
|
ed85d97f17 | ||
|
|
4a216c5f7f | ||
|
|
c5a428a61a | ||
|
|
ff8c481777 | ||
|
|
f25dd75be9 |
11
.github/workflows/build_and_test.yml
vendored
11
.github/workflows/build_and_test.yml
vendored
@@ -564,7 +564,7 @@ jobs:
|
||||
|
||||
promote-images:
|
||||
runs-on: dev
|
||||
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-tools-image ]
|
||||
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
|
||||
if: github.event_name != 'workflow_dispatch'
|
||||
container: amazon/aws-cli
|
||||
strategy:
|
||||
@@ -573,7 +573,7 @@ jobs:
|
||||
# compute-node uses postgres 14, which is default now
|
||||
# cloud repo depends on this image name, thus duplicating it
|
||||
# remove compute-node when cloud repo is updated
|
||||
name: [ neon, compute-node, compute-node-v14, compute-tools ]
|
||||
name: [ neon, compute-node, compute-node-v14, compute-node-v15, compute-tools ]
|
||||
|
||||
steps:
|
||||
- name: Promote image to latest
|
||||
@@ -608,6 +608,9 @@ jobs:
|
||||
- name: Pull compute node v14 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
|
||||
|
||||
- name: Pull compute node v15 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
|
||||
|
||||
- name: Pull rust image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
|
||||
|
||||
@@ -638,6 +641,9 @@ jobs:
|
||||
- name: Push compute node v14 image to Docker Hub
|
||||
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute node v15 image to Docker Hub
|
||||
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push rust image to Docker Hub
|
||||
run: crane push rust neondatabase/rust:pinned
|
||||
|
||||
@@ -650,6 +656,7 @@ jobs:
|
||||
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
calculate-deploy-targets:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
|
||||
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -2059,7 +2059,6 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"svg_fmt",
|
||||
"tar",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
@@ -3334,12 +3333,6 @@ version = "2.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||
|
||||
[[package]]
|
||||
name = "svg_fmt"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "8.8.0"
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
|
||||
ARG TAG=pinned
|
||||
# apparently, ARGs don't get replaced in RUN commands in kaniko
|
||||
# ARG POSTGIS_VERSION=3.3.0
|
||||
# ARG POSTGIS_VERSION=3.3.1
|
||||
# ARG PLV8_VERSION=3.1.4
|
||||
# ARG PG_VERSION=v15
|
||||
|
||||
@@ -13,9 +13,12 @@ ARG TAG=pinned
|
||||
# Layer "build-deps"
|
||||
#
|
||||
FROM debian:bullseye-slim AS build-deps
|
||||
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
|
||||
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
|
||||
apt update
|
||||
RUN apt update && \
|
||||
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
|
||||
libcurl4-openssl-dev libossp-uuid-dev
|
||||
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
|
||||
|
||||
#
|
||||
# Layer "pg-build"
|
||||
@@ -42,11 +45,11 @@ RUN cd postgres && \
|
||||
FROM build-deps AS postgis-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
|
||||
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
|
||||
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
|
||||
tar xvzf postgis-3.3.0.tar.gz && \
|
||||
cd postgis-3.3.0 && \
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
|
||||
tar xvzf postgis-3.3.1.tar.gz && \
|
||||
cd postgis-3.3.1 && \
|
||||
./autogen.sh && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
./configure && \
|
||||
@@ -64,15 +67,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
|
||||
# Build plv8
|
||||
#
|
||||
FROM build-deps AS plv8-build
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
|
||||
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
|
||||
|
||||
# https://github.com/plv8/plv8/issues/475
|
||||
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
|
||||
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
|
||||
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
|
||||
apt update && \
|
||||
RUN apt update && \
|
||||
apt install -y --no-install-recommends -t testing binutils
|
||||
|
||||
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
|
||||
@@ -84,12 +85,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
|
||||
rm -rf /plv8-* && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
|
||||
|
||||
#
|
||||
# Layer "h3-pg-build"
|
||||
# Build h3_pg
|
||||
#
|
||||
FROM build-deps AS h3-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# packaged cmake is too old
|
||||
RUN apt update && \
|
||||
apt install -y --no-install-recommends -t testing cmake
|
||||
|
||||
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
|
||||
tar xvzf h3.tgz && \
|
||||
cd h3-4.0.1 && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
cmake .. -DCMAKE_BUILD_TYPE=Release && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/h3 make install && \
|
||||
cp -R /h3/usr / && \
|
||||
rm -rf build
|
||||
|
||||
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
|
||||
tar xvzf h3-pg.tgz && \
|
||||
cd h3-pg-4.0.1 && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
|
||||
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
# compile neon extensions
|
||||
#
|
||||
FROM build-deps AS neon-pg-ext-build
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
# plv8 still sometimes crashes during the creation
|
||||
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=h3-pg-build /h3/usr /
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -137,8 +172,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
chmod 0750 /var/db/postgres/compute && \
|
||||
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
|
||||
|
||||
# TODO: Check if we can make the extension setup more modular versus a linear build
|
||||
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
|
||||
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
|
||||
|
||||
@@ -178,7 +178,6 @@ impl ComputeNode {
|
||||
.args(&["--sync-safekeepers"])
|
||||
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.expect("postgres --sync-safekeepers failed to start");
|
||||
|
||||
@@ -191,10 +190,10 @@ impl ComputeNode {
|
||||
|
||||
if !sync_output.status.success() {
|
||||
anyhow::bail!(
|
||||
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}, stderr: {}",
|
||||
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
|
||||
sync_output.status,
|
||||
String::from_utf8(sync_output.stdout).expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
|
||||
String::from_utf8(sync_output.stderr).expect("postgres --sync-safekeepers exited, and stderr is not utf-8"),
|
||||
String::from_utf8(sync_output.stdout)
|
||||
.expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -250,9 +250,36 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
// case we miss some events for some reason. Not strictly necessary, but
|
||||
// better safe than sorry.
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let mut watcher = notify::recommended_watcher(move |res| {
|
||||
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
|
||||
let _ = tx.send(res);
|
||||
})?;
|
||||
}) {
|
||||
Ok(watcher) => (Box::new(watcher), rx),
|
||||
Err(e) => {
|
||||
match e.kind {
|
||||
notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
|
||||
// docker on m1 macs does not support recommended_watcher
|
||||
// but return "Function not implemented (os error 38)"
|
||||
// see https://github.com/notify-rs/notify/issues/423
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
|
||||
// let's poll it faster than what we check the results for (100ms)
|
||||
let config =
|
||||
notify::Config::default().with_poll_interval(Duration::from_millis(50));
|
||||
|
||||
let watcher = notify::PollWatcher::new(
|
||||
move |res| {
|
||||
let _ = tx.send(res);
|
||||
},
|
||||
config,
|
||||
)?;
|
||||
|
||||
(Box::new(watcher), rx)
|
||||
}
|
||||
_ => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
|
||||
|
||||
let started_at = Instant::now();
|
||||
|
||||
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
|
||||
let url = self.url().to_owned();
|
||||
Err(PageserverHttpError::Response(
|
||||
match self.json::<HttpErrorBody>() {
|
||||
Ok(err_body) => format!("Response error: {}", err_body.msg),
|
||||
Ok(err_body) => format!("Error: {}", err_body.msg),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
},
|
||||
))
|
||||
@@ -181,15 +181,14 @@ impl PageServerNode {
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<TimelineId> {
|
||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
|
||||
.context("Failed to create tenant")?;
|
||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
|
||||
let initial_timeline_info = self.timeline_create(
|
||||
initial_tenant_id,
|
||||
new_timeline_id,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
).context("Failed to create timeline")?;
|
||||
)?;
|
||||
Ok(initial_timeline_info.timeline_id)
|
||||
}
|
||||
|
||||
@@ -420,11 +419,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//! Otherwise, we might not see all metrics registered via
|
||||
//! a default registry.
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
|
||||
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
|
||||
pub use prometheus::opts;
|
||||
pub use prometheus::register;
|
||||
pub use prometheus::{core, default_registry, proto};
|
||||
@@ -17,6 +17,7 @@ pub use prometheus::{register_int_counter_vec, IntCounterVec};
|
||||
pub use prometheus::{register_int_gauge, IntGauge};
|
||||
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
|
||||
pub use prometheus::{Encoder, TextEncoder};
|
||||
use prometheus::{Registry, Result};
|
||||
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
@@ -32,13 +33,27 @@ macro_rules! register_uint_gauge_vec {
|
||||
}};
|
||||
}
|
||||
|
||||
/// Special internal registry, to collect metrics independently from the default registry.
|
||||
/// Was introduced to fix deadlock with lazy registration of metrics in the default registry.
|
||||
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
|
||||
|
||||
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
|
||||
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
|
||||
/// while holding the lock.
|
||||
pub fn register_internal(c: Box<dyn Collector>) -> Result<()> {
|
||||
INTERNAL_REGISTRY.register(c)
|
||||
}
|
||||
|
||||
/// Gathers all Prometheus metrics and records the I/O stats just before that.
|
||||
///
|
||||
/// Metrics gathering is a relatively simple and standalone operation, so
|
||||
/// it might be fine to do it this way to keep things simple.
|
||||
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
|
||||
update_rusage_metrics();
|
||||
prometheus::gather()
|
||||
let mut mfs = prometheus::gather();
|
||||
let mut internal_mfs = INTERNAL_REGISTRY.gather();
|
||||
mfs.append(&mut internal_mfs);
|
||||
mfs
|
||||
}
|
||||
|
||||
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
|
||||
@@ -52,7 +52,6 @@ pub struct TenantCreateRequest {
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -9,6 +9,7 @@ use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::RequestInfo;
|
||||
use routerify::{Middleware, Router, RouterBuilder, RouterService};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::info;
|
||||
|
||||
use std::future::Future;
|
||||
@@ -35,7 +36,13 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
|
||||
let metrics = metrics::gather();
|
||||
let metrics = tokio::task::spawn_blocking(move || {
|
||||
// Currently we take a lot of mutexes while collecting metrics, so it's
|
||||
// better to spawn a blocking task to avoid blocking the event loop.
|
||||
metrics::gather()
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
let response = Response::builder()
|
||||
|
||||
@@ -67,7 +67,6 @@ remote_storage = { path = "../libs/remote_storage" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
close_fds = "0.3.2"
|
||||
walkdir = "2.3.2"
|
||||
svg_fmt = "0.4.1"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.3"
|
||||
|
||||
@@ -1,185 +0,0 @@
|
||||
use clap::{App, Arg};
|
||||
use futures::TryFutureExt;
|
||||
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
|
||||
|
||||
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
|
||||
use std::io::Write;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
io::BufReader,
|
||||
str::FromStr,
|
||||
};
|
||||
use svg_fmt::*;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
};
|
||||
|
||||
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
|
||||
let set: BTreeSet<T> = coords.into_iter().collect();
|
||||
|
||||
let mut map: BTreeMap<T, usize> = BTreeMap::new();
|
||||
for (i, e) in set.iter().enumerate() {
|
||||
map.insert(*e, i);
|
||||
}
|
||||
|
||||
(set.len(), map)
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
// TODO upgrade to struct macro arg parsing
|
||||
let arg_matches = App::new("Pageserver trace visualization tool")
|
||||
.about("Makes a svg file that displays the read pattern")
|
||||
.arg(
|
||||
Arg::new("traces_dir")
|
||||
.takes_value(true)
|
||||
.help("Directory where the read traces are stored"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
// (blkno, lsn)
|
||||
let mut dots = Vec::<(u32, Lsn)>::new();
|
||||
|
||||
|
||||
let mut dump_file = File::create("dump.txt").expect("can't make file");
|
||||
let mut deltas = HashMap::<i32, u32>::new();
|
||||
let mut prev1: u32 = 0;
|
||||
let mut prev2: u32 = 0;
|
||||
let mut prev3: u32 = 0;
|
||||
|
||||
println!("scanning trace ...");
|
||||
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||
for tenant_dir in read_dir(traces_dir)? {
|
||||
let entry = tenant_dir?;
|
||||
let path = entry.path();
|
||||
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
// println!("tenant: {tenant_id}");
|
||||
|
||||
println!("opening {path:?}");
|
||||
for timeline_dir in read_dir(path)? {
|
||||
let entry = timeline_dir?;
|
||||
let path = entry.path();
|
||||
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
// println!("hi");
|
||||
// println!("timeline: {timeline_id}");
|
||||
|
||||
println!("opening {path:?}");
|
||||
for trace_dir in read_dir(path)? {
|
||||
let entry = trace_dir?;
|
||||
let path = entry.path();
|
||||
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
println!("opening {path:?}");
|
||||
let file = File::open(path.clone())?;
|
||||
let mut reader = BufReader::new(file);
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
|
||||
// println!("Parsed message {:?}", msg);
|
||||
match msg {
|
||||
PagestreamFeMessage::Exists(_) => {}
|
||||
PagestreamFeMessage::Nblocks(_) => {}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
|
||||
// dots.push((req.blkno, req.lsn));
|
||||
// HACK
|
||||
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
|
||||
|
||||
let delta1 = (req.blkno as i32) - (prev1 as i32);
|
||||
let delta2 = (req.blkno as i32) - (prev2 as i32);
|
||||
let delta3 = (req.blkno as i32) - (prev3 as i32);
|
||||
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
|
||||
delta1
|
||||
} else {
|
||||
delta2
|
||||
};
|
||||
if i32::abs(delta3) < i32::abs(delta) {
|
||||
delta = delta3;
|
||||
}
|
||||
let delta = delta1;
|
||||
|
||||
prev3 = prev2;
|
||||
prev2 = prev1;
|
||||
prev1 = req.blkno;
|
||||
|
||||
match deltas.get_mut(&delta) {
|
||||
Some(c) => {*c += 1;},
|
||||
None => {deltas.insert(delta, 1);},
|
||||
};
|
||||
|
||||
if delta == 9 {
|
||||
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
|
||||
}
|
||||
},
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
|
||||
// HACK
|
||||
// if dots.len() > 1000 {
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut other = deltas.len();
|
||||
deltas.retain(|_, count| *count > 3);
|
||||
other -= deltas.len();
|
||||
dbg!(other);
|
||||
dbg!(deltas);
|
||||
|
||||
// Collect all coordinates
|
||||
let mut keys: Vec<u32> = vec![];
|
||||
let mut lsns: Vec<Lsn> = vec![];
|
||||
for dot in &dots {
|
||||
keys.push(dot.0);
|
||||
lsns.push(dot.1);
|
||||
}
|
||||
|
||||
// Analyze
|
||||
let (key_max, key_map) = analyze(keys);
|
||||
let (lsn_max, lsn_map) = analyze(lsns);
|
||||
|
||||
// Draw
|
||||
println!("drawing trace ...");
|
||||
let mut svg_file = File::create("out.svg").expect("can't make file");
|
||||
writeln!(
|
||||
&mut svg_file,
|
||||
"{}",
|
||||
BeginSvg {
|
||||
w: (key_max + 1) as f32,
|
||||
h: (lsn_max + 1) as f32,
|
||||
}
|
||||
)?;
|
||||
for (key, lsn) in &dots {
|
||||
let key = key_map.get(&key).unwrap();
|
||||
let lsn = lsn_map.get(&lsn).unwrap();
|
||||
writeln!(
|
||||
&mut svg_file,
|
||||
" {}",
|
||||
rectangle(
|
||||
*key as f32,
|
||||
*lsn as f32,
|
||||
10.0,
|
||||
10.0
|
||||
)
|
||||
.fill(Fill::Color(red()))
|
||||
.stroke(Stroke::Color(black(), 0.0))
|
||||
.border_radius(0.5)
|
||||
)?;
|
||||
// println!(" {}",
|
||||
// rectangle(key_start as f32 + stretch * margin,
|
||||
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
|
||||
// key_diff as f32 - stretch * 2.0 * margin,
|
||||
// stretch * (lsn_diff - 2.0 * margin))
|
||||
// // .fill(rgb(200, 200, 200))
|
||||
// .fill(fill)
|
||||
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
|
||||
// .border_radius(0.4)
|
||||
// );
|
||||
}
|
||||
|
||||
writeln!(&mut svg_file, "{}", EndSvg)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,133 +0,0 @@
|
||||
use bytes::BytesMut;
|
||||
use pageserver::page_service::PagestreamFeMessage;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
io::BufReader,
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
};
|
||||
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||
|
||||
use clap::{App, Arg};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
};
|
||||
|
||||
// TODO put this in library, dedup with stuff in control_plane
|
||||
/// Client for the pageserver's pagestream API
|
||||
struct PagestreamApi {
|
||||
stream: TcpStream,
|
||||
}
|
||||
|
||||
impl PagestreamApi {
|
||||
async fn connect(
|
||||
connstr: &str,
|
||||
tenant: &TenantId,
|
||||
timeline: &TimelineId,
|
||||
) -> anyhow::Result<PagestreamApi> {
|
||||
// Parse connstr
|
||||
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
|
||||
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
|
||||
|
||||
// Connect
|
||||
let mut stream = TcpStream::connect(tcp_addr).await?;
|
||||
let (client, conn) = config
|
||||
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||
.await?;
|
||||
|
||||
// Enter pagestream protocol
|
||||
let init_query = format!("pagestream {} {}", tenant, timeline);
|
||||
tokio::select! {
|
||||
_ = conn => panic!("connection closed during pagestream initialization"),
|
||||
_ = client.query(init_query.as_str(), &[]) => (),
|
||||
};
|
||||
|
||||
Ok(PagestreamApi { stream })
|
||||
}
|
||||
|
||||
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||
let request = {
|
||||
let msg_bytes = msg.serialize();
|
||||
let mut buf = BytesMut::new();
|
||||
let copy_msg = BeMessage::CopyData(&msg_bytes);
|
||||
|
||||
// TODO it's actually a fe message but it doesn't have a serializer yet
|
||||
BeMessage::write(&mut buf, ©_msg)?;
|
||||
buf.freeze()
|
||||
};
|
||||
self.stream.write_all(&request).await?;
|
||||
|
||||
// TODO It's actually a be message, but it doesn't have a parser.
|
||||
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
|
||||
let _response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||
Some(FeMessage::CopyData(page)) => page,
|
||||
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn replay_trace<R: std::io::Read>(
|
||||
reader: &mut R,
|
||||
mut pagestream: PagestreamApi,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
|
||||
println!("Parsed message {:?}", msg);
|
||||
pagestream.make_request(msg).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// TODO upgrade to struct macro arg parsing
|
||||
let arg_matches = App::new("Pageserver trace replay tool")
|
||||
.about("Replays wal or read traces to test pageserver performance")
|
||||
.arg(
|
||||
Arg::new("traces_dir")
|
||||
.takes_value(true)
|
||||
.help("Directory where the read traces are stored"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("pageserver_connstr")
|
||||
.takes_value(true)
|
||||
.help("Pageserver pg endpoint to connect to"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
|
||||
|
||||
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||
for tenant_dir in read_dir(traces_dir)? {
|
||||
let entry = tenant_dir?;
|
||||
let path = entry.path();
|
||||
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for timeline_dir in read_dir(path)? {
|
||||
let entry = timeline_dir?;
|
||||
let path = entry.path();
|
||||
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for trace_dir in read_dir(path)? {
|
||||
let entry = trace_dir?;
|
||||
let path = entry.path();
|
||||
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
// TODO The pageserver deletes existing traces?
|
||||
// LOL yes because I use tenant ID as trace id
|
||||
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
|
||||
|
||||
let file = File::open(path.clone())?;
|
||||
let mut reader = BufReader::new(file);
|
||||
// let len = file.metadata().unwrap().len();
|
||||
// println!("replaying {:?} trace {} bytes", path, len);
|
||||
replay_trace(&mut reader, pagestream).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -364,23 +364,6 @@ impl PageServerConf {
|
||||
self.timelines_path(tenant_id).join(timeline_id.to_string())
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> PathBuf {
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
pub fn trace_path(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
connection_id: &TimelineId, // TODO make a new type
|
||||
) -> PathBuf {
|
||||
self.traces_path()
|
||||
.join(tenant_id.to_string())
|
||||
.join(timeline_id.to_string())
|
||||
.join(connection_id.to_string())
|
||||
|
||||
}
|
||||
|
||||
/// Points to a place in pageserver's local directory,
|
||||
/// where certain timeline's metadata file should be located.
|
||||
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
|
||||
|
||||
@@ -207,6 +207,62 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: Get LSN by a timestamp
|
||||
parameters:
|
||||
- name: timestamp
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
description: A timestamp to get the LSN
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
"400":
|
||||
description: Error when no tenant id found in path, no timeline id or invalid timestamp
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
/v1/tenant/{tenant_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
|
||||
@@ -12,6 +12,7 @@ use super::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest,
|
||||
};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::storage_sync;
|
||||
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
|
||||
use crate::tenant::{TenantState, Timeline};
|
||||
@@ -265,6 +266,23 @@ fn query_param_present(request: &Request<Body>, param: &str) -> bool {
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
|
||||
request.uri().query().map_or(
|
||||
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|
||||
|v| {
|
||||
url::form_urlencoded::parse(v.as_bytes())
|
||||
.into_owned()
|
||||
.find(|(k, _)| k == param_name)
|
||||
.map_or(
|
||||
Err(ApiError::BadRequest(anyhow!(
|
||||
"no {param_name} specified in query parameters"
|
||||
))),
|
||||
|(_, v)| Ok(v),
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
@@ -329,6 +347,33 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let timestamp_raw = get_query_param(&request, "timestamp")?;
|
||||
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
|
||||
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
|
||||
|
||||
let timeline = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.and_then(|tenant| tenant.get_timeline(timeline_id))
|
||||
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let result = match timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
{
|
||||
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
|
||||
LsnForTimestamp::Future(_lsn) => "future".into(),
|
||||
LsnForTimestamp::Past(_lsn) => "past".into(),
|
||||
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
|
||||
};
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
|
||||
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
|
||||
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
@@ -617,9 +662,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(trace_read_requests) = request_data.trace_read_requests {
|
||||
tenant_conf.trace_read_requests = Some(trace_read_requests);
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
@@ -911,6 +953,10 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_detail_handler,
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
|
||||
get_lsn_by_timestamp_handler,
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
|
||||
testing_api!("run timeline GC", timeline_gc_handler),
|
||||
|
||||
@@ -16,8 +16,6 @@ pub mod tenant;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_tasks;
|
||||
// pub mod timelines;
|
||||
pub mod trace;
|
||||
pub mod virtual_file;
|
||||
pub mod walingest;
|
||||
pub mod walreceiver;
|
||||
|
||||
@@ -10,11 +10,8 @@
|
||||
//
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::Buf;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use futures::{Stream, StreamExt};
|
||||
use regex::Regex;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
@@ -37,24 +34,19 @@ use crate::basebackup;
|
||||
use crate::config::{PageServerConf, ProfilingConfig};
|
||||
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
|
||||
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::profiling::profpoint_start;
|
||||
use crate::reltag::RelTag;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant_mgr;
|
||||
use crate::trace::Tracer;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
// TODO these should be in a library outside the pageserver
|
||||
#[derive(Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
@@ -62,7 +54,7 @@ pub enum PagestreamFeMessage {
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
pub enum PagestreamBeMessage {
|
||||
enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
@@ -71,152 +63,106 @@ pub enum PagestreamBeMessage {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsRequest {
|
||||
struct PagestreamExistsRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamNblocksRequest {
|
||||
struct PagestreamNblocksRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
struct PagestreamGetPageRequest {
|
||||
latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
blkno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamDbSizeRequest {
|
||||
struct PagestreamDbSizeRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
struct PagestreamExistsResponse {
|
||||
exists: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamNblocksResponse {
|
||||
struct PagestreamNblocksResponse {
|
||||
n_blocks: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetPageResponse {
|
||||
struct PagestreamGetPageResponse {
|
||||
page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
struct PagestreamErrorResponse {
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamDbSizeResponse {
|
||||
struct PagestreamDbSizeResponse {
|
||||
db_size: i64,
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
pub fn serialize(&self) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(1);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(2);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
}
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(3);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
|
||||
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// TODO these gets can fail
|
||||
|
||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||
//
|
||||
// TODO: consider using protobuf or serde bincode for less error prone
|
||||
// serialization.
|
||||
let msg_tag = body.read_u8()?;
|
||||
let msg_tag = body.get_u8();
|
||||
match msg_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
},
|
||||
})),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
},
|
||||
})),
|
||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
blkno: body.get_u32(),
|
||||
})),
|
||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
dbnode: body.get_u32(),
|
||||
})),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -473,17 +419,6 @@ impl PageServerHandler {
|
||||
// so there is no need to reset the association
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
// Make request tracer if needed
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
let path = tenant
|
||||
.conf
|
||||
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
|
||||
Some(Tracer::new(path))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||
|
||||
@@ -508,24 +443,15 @@ impl PageServerHandler {
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Sync) => {
|
||||
// TODO what now?
|
||||
continue;
|
||||
}
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
t.trace(©_data_bytes)
|
||||
}
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
|
||||
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
@@ -1133,33 +1059,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("get_lsn_by_timestamp ") {
|
||||
// Locate LSN of last transaction with timestamp less or equal than sppecified
|
||||
// TODO lazy static
|
||||
let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$")
|
||||
.unwrap();
|
||||
let caps = re
|
||||
.captures(query_string)
|
||||
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
|
||||
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
|
||||
let timestamp_pg = to_pg_timestamp(timestamp);
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
b"lsn",
|
||||
)]))?;
|
||||
let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? {
|
||||
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
|
||||
LsnForTimestamp::Future(_lsn) => "future".into(),
|
||||
LsnForTimestamp::Past(_lsn) => "past".into(),
|
||||
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
|
||||
};
|
||||
pgb.write_message(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else {
|
||||
bail!("unknown command");
|
||||
}
|
||||
|
||||
@@ -593,14 +593,6 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||
}
|
||||
|
||||
// TODO is there a need for all this boilerplate?
|
||||
pub fn get_trace_read_requests(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.trace_read_requests
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||
}
|
||||
|
||||
@@ -15,19 +15,25 @@ use crate::repository::Key;
|
||||
use crate::tenant::inmemory_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant::storage_layer::{range_eq, range_overlaps};
|
||||
use amplify_num::i256;
|
||||
use anyhow::Result;
|
||||
use num_traits::identities::{One, Zero};
|
||||
use num_traits::{Bounded, Num, Signed};
|
||||
use rstar::{RTree, RTreeObject, AABB};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::ops::Range;
|
||||
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
|
||||
struct BTreeKey {
|
||||
lsn: Lsn,
|
||||
seq: usize,
|
||||
}
|
||||
|
||||
impl BTreeKey {
|
||||
fn new(lsn: Lsn) -> BTreeKey {
|
||||
BTreeKey { lsn, seq: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
///
|
||||
@@ -53,165 +59,14 @@ pub struct LayerMap {
|
||||
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
|
||||
|
||||
/// All the historic layers are kept here
|
||||
historic_layers: RTree<LayerRTreeObject>,
|
||||
historic_layers: BTreeMap<BTreeKey, Arc<dyn Layer>>,
|
||||
layers_seqno: usize,
|
||||
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<dyn Layer>>,
|
||||
}
|
||||
|
||||
struct LayerRTreeObject {
|
||||
layer: Arc<dyn Layer>,
|
||||
}
|
||||
|
||||
// Representation of Key as numeric type.
|
||||
// We can not use native implementation of i128, because rstar::RTree
|
||||
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
|
||||
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
|
||||
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
|
||||
// By using i256 as the type, even though all the actual values would fit in i128, we can be
|
||||
// sure that multiplication doesn't overflow.
|
||||
//
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
|
||||
struct IntKey(i256);
|
||||
|
||||
impl Copy for IntKey {}
|
||||
|
||||
impl IntKey {
|
||||
fn from(i: i128) -> Self {
|
||||
IntKey(i256::from(i))
|
||||
}
|
||||
}
|
||||
|
||||
impl Bounded for IntKey {
|
||||
fn min_value() -> Self {
|
||||
IntKey(i256::MIN)
|
||||
}
|
||||
fn max_value() -> Self {
|
||||
IntKey(i256::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
impl Signed for IntKey {
|
||||
fn is_positive(&self) -> bool {
|
||||
self.0 > i256::ZERO
|
||||
}
|
||||
fn is_negative(&self) -> bool {
|
||||
self.0 < i256::ZERO
|
||||
}
|
||||
fn signum(&self) -> Self {
|
||||
match self.0.cmp(&i256::ZERO) {
|
||||
Ordering::Greater => IntKey(i256::ONE),
|
||||
Ordering::Less => IntKey(-i256::ONE),
|
||||
Ordering::Equal => IntKey(i256::ZERO),
|
||||
}
|
||||
}
|
||||
fn abs(&self) -> Self {
|
||||
IntKey(self.0.abs())
|
||||
}
|
||||
fn abs_sub(&self, other: &Self) -> Self {
|
||||
if self.0 <= other.0 {
|
||||
IntKey(i256::ZERO)
|
||||
} else {
|
||||
IntKey(self.0 - other.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Neg for IntKey {
|
||||
type Output = Self;
|
||||
fn neg(self) -> Self::Output {
|
||||
IntKey(-self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Rem for IntKey {
|
||||
type Output = Self;
|
||||
fn rem(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 % rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Div for IntKey {
|
||||
type Output = Self;
|
||||
fn div(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 / rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Add for IntKey {
|
||||
type Output = Self;
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 + rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sub for IntKey {
|
||||
type Output = Self;
|
||||
fn sub(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 - rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Mul for IntKey {
|
||||
type Output = Self;
|
||||
fn mul(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 * rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl One for IntKey {
|
||||
fn one() -> Self {
|
||||
IntKey(i256::ONE)
|
||||
}
|
||||
}
|
||||
|
||||
impl Zero for IntKey {
|
||||
fn zero() -> Self {
|
||||
IntKey(i256::ZERO)
|
||||
}
|
||||
fn is_zero(&self) -> bool {
|
||||
self.0 == i256::ZERO
|
||||
}
|
||||
}
|
||||
|
||||
impl Num for IntKey {
|
||||
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
|
||||
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
|
||||
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for LayerRTreeObject {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// FIXME: ptr_eq might fail to return true for 'dyn'
|
||||
// references. Clippy complains about this. In practice it
|
||||
// seems to work, the assertion below would be triggered
|
||||
// otherwise but this ought to be fixed.
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
Arc::ptr_eq(&self.layer, &other.layer)
|
||||
}
|
||||
}
|
||||
|
||||
impl RTreeObject for LayerRTreeObject {
|
||||
type Envelope = AABB<[IntKey; 2]>;
|
||||
fn envelope(&self) -> Self::Envelope {
|
||||
let key_range = self.layer.get_key_range();
|
||||
let lsn_range = self.layer.get_lsn_range();
|
||||
AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<dyn Layer>,
|
||||
@@ -234,23 +89,17 @@ impl LayerMap {
|
||||
// linear search
|
||||
// Find the latest image layer that covers the given key
|
||||
let mut latest_img: Option<Arc<dyn Layer>> = None;
|
||||
let mut latest_img_lsn: Option<Lsn> = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
|
||||
[
|
||||
IntKey::from(key.to_i128()),
|
||||
IntKey::from(end_lsn.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
let mut latest_img_lsn = Lsn(0);
|
||||
let mut iter = self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
|
||||
while let Some((_key, l)) = iter.next_back() {
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
if !l.get_key_range().contains(&key) {
|
||||
continue;
|
||||
}
|
||||
let img_lsn = l.get_lsn_range().start;
|
||||
assert!(img_lsn < end_lsn);
|
||||
if Lsn(img_lsn.0 + 1) == end_lsn {
|
||||
@@ -260,23 +109,23 @@ impl LayerMap {
|
||||
lsn_floor: img_lsn,
|
||||
}));
|
||||
}
|
||||
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
|
||||
latest_img = Some(Arc::clone(l));
|
||||
latest_img_lsn = Some(img_lsn);
|
||||
}
|
||||
latest_img = Some(Arc::clone(l));
|
||||
latest_img_lsn = img_lsn;
|
||||
break;
|
||||
}
|
||||
|
||||
// Search the delta layers
|
||||
let mut latest_delta: Option<Arc<dyn Layer>> = None;
|
||||
for e in self
|
||||
let mut iter = self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
|
||||
while let Some((_key, l)) = iter.next_back() {
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
if !l.get_key_range().contains(&key) {
|
||||
continue;
|
||||
}
|
||||
if l.get_lsn_range().start >= end_lsn {
|
||||
info!(
|
||||
"Candidate delta layer {}..{} is too new for lsn {}",
|
||||
@@ -286,6 +135,9 @@ impl LayerMap {
|
||||
);
|
||||
}
|
||||
assert!(l.get_lsn_range().start < end_lsn);
|
||||
if l.get_lsn_range().end <= latest_img_lsn {
|
||||
continue;
|
||||
}
|
||||
if l.get_lsn_range().end >= end_lsn {
|
||||
// this layer contains the requested point in the key/lsn space.
|
||||
// No need to search any further
|
||||
@@ -311,10 +163,7 @@ impl LayerMap {
|
||||
"found (old) layer {} for request on {key} at {end_lsn}",
|
||||
l.filename().display(),
|
||||
);
|
||||
let lsn_floor = std::cmp::max(
|
||||
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
|
||||
l.get_lsn_range().start,
|
||||
);
|
||||
let lsn_floor = std::cmp::max(Lsn(latest_img_lsn.0 + 1), l.get_lsn_range().start);
|
||||
Ok(Some(SearchResult {
|
||||
lsn_floor,
|
||||
layer: l,
|
||||
@@ -322,7 +171,7 @@ impl LayerMap {
|
||||
} else if let Some(l) = latest_img {
|
||||
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
|
||||
Ok(Some(SearchResult {
|
||||
lsn_floor: latest_img_lsn.unwrap(),
|
||||
lsn_floor: latest_img_lsn,
|
||||
layer: l,
|
||||
}))
|
||||
} else {
|
||||
@@ -338,7 +187,14 @@ impl LayerMap {
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
self.l0_delta_layers.push(layer.clone());
|
||||
}
|
||||
self.historic_layers.insert(LayerRTreeObject { layer });
|
||||
self.historic_layers.insert(
|
||||
BTreeKey {
|
||||
lsn: layer.get_lsn_range().start,
|
||||
seq: self.layers_seqno,
|
||||
},
|
||||
layer,
|
||||
);
|
||||
self.layers_seqno += 1;
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
}
|
||||
|
||||
@@ -360,10 +216,26 @@ impl LayerMap {
|
||||
.retain(|other| !Arc::ptr_eq(other, &layer));
|
||||
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
|
||||
}
|
||||
assert!(self
|
||||
.historic_layers
|
||||
.remove(&LayerRTreeObject { layer })
|
||||
.is_some());
|
||||
let len_before = self.historic_layers.len();
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
self.historic_layers
|
||||
.retain(|_key, other| !Arc::ptr_eq(other, &layer));
|
||||
if self.historic_layers.len() != len_before - 1 {
|
||||
assert!(self.historic_layers.len() == len_before);
|
||||
error!(
|
||||
"Failed to remove {} layer: {}..{}__{}..{}",
|
||||
if layer.is_incremental() {
|
||||
"inremental"
|
||||
} else {
|
||||
"image"
|
||||
},
|
||||
layer.get_key_range().start,
|
||||
layer.get_key_range().end,
|
||||
layer.get_lsn_range().start,
|
||||
layer.get_lsn_range().end
|
||||
);
|
||||
}
|
||||
assert!(self.historic_layers.len() == len_before - 1);
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
@@ -380,21 +252,10 @@ impl LayerMap {
|
||||
|
||||
loop {
|
||||
let mut made_progress = false;
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(range_remain.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(range_remain.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
for (_key, l) in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
@@ -417,39 +278,30 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
|
||||
self.historic_layers.iter().map(|e| e.layer.clone())
|
||||
self.historic_layers
|
||||
.iter()
|
||||
.map(|(_key, layer)| layer.clone())
|
||||
}
|
||||
|
||||
/// Find the last image layer that covers 'key', ignoring any image layers
|
||||
/// newer than 'lsn'.
|
||||
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
|
||||
let mut candidate_lsn = Lsn(0);
|
||||
let mut candidate = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0)],
|
||||
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
|
||||
);
|
||||
for e in self
|
||||
let mut iter = self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1));
|
||||
while let Some((_key, l)) = iter.next_back() {
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
let this_lsn = l.get_lsn_range().start;
|
||||
assert!(this_lsn <= lsn);
|
||||
if this_lsn < candidate_lsn {
|
||||
// our previous candidate was better
|
||||
if !l.get_key_range().contains(&key) {
|
||||
continue;
|
||||
}
|
||||
candidate_lsn = this_lsn;
|
||||
candidate = Some(Arc::clone(l));
|
||||
let this_lsn = l.get_lsn_range().start;
|
||||
assert!(this_lsn <= lsn);
|
||||
return Some(Arc::clone(l));
|
||||
}
|
||||
|
||||
candidate
|
||||
None
|
||||
}
|
||||
|
||||
///
|
||||
@@ -466,18 +318,10 @@ impl LayerMap {
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
|
||||
let mut points = vec![key_range.start];
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128()),
|
||||
IntKey::from(lsn.0 as i128),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
for (_lsn, l) in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1))
|
||||
{
|
||||
let l = &e.layer;
|
||||
assert!(l.get_lsn_range().start <= lsn);
|
||||
let range = l.get_key_range();
|
||||
if key_range.contains(&range.start) {
|
||||
@@ -514,26 +358,17 @@ impl LayerMap {
|
||||
if lsn_range.start >= lsn_range.end {
|
||||
return Ok(0);
|
||||
}
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
for (_lsn, l) in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
|
||||
{
|
||||
let l = &e.layer;
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
if !range_overlaps(&l.get_key_range(), key_range) {
|
||||
continue;
|
||||
}
|
||||
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
|
||||
assert!(range_overlaps(&l.get_key_range(), key_range));
|
||||
|
||||
// We ignore level0 delta layers. Unless the whole keyspace fits
|
||||
// into one partition
|
||||
@@ -569,8 +404,8 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
for e in self.historic_layers.iter() {
|
||||
e.layer.dump(verbose)?;
|
||||
for (_key, layer) in self.historic_layers.iter() {
|
||||
layer.dump(verbose)?;
|
||||
}
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
|
||||
@@ -82,10 +82,6 @@ pub struct TenantConf {
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
/// If enabled, records all read requests for this ... TODO
|
||||
/// Even though read traces are small, there's no delete meachanism so they can
|
||||
/// pile up. So we disable this by default.
|
||||
pub trace_read_requests: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -109,7 +105,6 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -143,9 +138,6 @@ impl TenantConfOpt {
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: self
|
||||
.trace_read_requests
|
||||
.unwrap_or(global_conf.trace_read_requests),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,7 +207,6 @@ impl TenantConf {
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,7 +232,6 @@ impl TenantConf {
|
||||
.unwrap(),
|
||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.unwrap(),
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
use bytes::Bytes;
|
||||
use std::{
|
||||
fs::{create_dir_all, File},
|
||||
io::{BufWriter, Write},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
pub struct Tracer {
|
||||
writer: BufWriter<File>,
|
||||
}
|
||||
|
||||
impl Drop for Tracer {
|
||||
fn drop(&mut self) {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl Tracer {
|
||||
pub fn new(path: PathBuf) -> Self {
|
||||
let parent = path.parent().expect("failed to parse parent path");
|
||||
create_dir_all(parent).expect("failed to create trace dir");
|
||||
|
||||
let file = File::create(path).expect("failed to create trace file");
|
||||
Tracer {
|
||||
writer: BufWriter::new(file),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trace(&mut self, msg: &Bytes) {
|
||||
self.writer.write(msg).expect("failed to write trace");
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) {
|
||||
self.writer.flush().expect("failed to flush trace file");
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,8 @@ use chrono::{NaiveDateTime, Utc};
|
||||
use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use postgres::{SimpleQueryMessage, SimpleQueryRow};
|
||||
use postgres_ffi::v14::xlog_utils::normalize_lsn;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use tokio::{pin, select, sync::watch, time};
|
||||
@@ -156,6 +158,14 @@ pub async fn handle_walreceiver_connection(
|
||||
// There might be some padding after the last full record, skip it.
|
||||
startpoint += startpoint.calc_padding(8u32);
|
||||
|
||||
// If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
|
||||
// for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
|
||||
//. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
|
||||
// but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
|
||||
// header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
|
||||
// to the safekeepers.
|
||||
startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
|
||||
|
||||
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
|
||||
|
||||
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
|
||||
|
||||
@@ -264,76 +264,12 @@ pageserver_flush(void)
|
||||
}
|
||||
}
|
||||
|
||||
// Entry of the page_cache
|
||||
typedef struct
|
||||
{
|
||||
// HACK Just xor of bytes lol
|
||||
char request_hash;
|
||||
|
||||
// Points directly to a NeonResponse. We can't just own the
|
||||
// NeonResponse because it's a "supertype", so it's not Sized.
|
||||
char* response;
|
||||
int len;
|
||||
} NeonRequestResponse;
|
||||
|
||||
#define MAX_PAGE_CACHE_SIZE 50
|
||||
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
|
||||
int page_cache_size = 0;
|
||||
int page_cache_head = 0;
|
||||
|
||||
static NeonResponse *
|
||||
pageserver_call(NeonRequest * request)
|
||||
{
|
||||
// Compute hash
|
||||
char hash = 0;
|
||||
StringInfoData req_buff;
|
||||
req_buff = nm_pack_request(request);
|
||||
for (int i = 0; i < req_buff.len; i++) {
|
||||
hash ^= req_buff.data[i];
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
|
||||
// If result is cached, memcpy and return
|
||||
for (int i = 0; i < page_cache_size; i++) {
|
||||
if (page_cache[i].request_hash == hash) {
|
||||
int len = page_cache[0].response->len;
|
||||
NeonResponse *resp = palloc0(len);
|
||||
// I'd rather Rc than memcpy, but this is not rust :(
|
||||
memcpy(resp, page_cache[0].response->data, len);
|
||||
elog(LOG, "cache hit !!!");
|
||||
return resp;
|
||||
}
|
||||
}
|
||||
|
||||
// Send request, get response
|
||||
pageserver_send(request);
|
||||
pageserver_flush();
|
||||
NeonResponse *resp = pageserver_receive();
|
||||
|
||||
// Get length
|
||||
int len = -1;
|
||||
switch (resp->tag) {
|
||||
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
|
||||
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
|
||||
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
|
||||
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
|
||||
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
|
||||
}
|
||||
|
||||
// Cache result
|
||||
page_cache[page_cache_head].request_hash = hash;
|
||||
if (page_cache_head < page_cache_size) {
|
||||
pfree(page_cache[page_cache_head].response);
|
||||
}
|
||||
page_cache[page_cache_head].len = len;
|
||||
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
|
||||
memcpy(page_cache[page_cache_head].response, resp, len);
|
||||
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
|
||||
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
|
||||
page_cache_size += 1;
|
||||
}
|
||||
|
||||
return resp;
|
||||
return pageserver_receive();
|
||||
}
|
||||
|
||||
page_server_api api = {
|
||||
|
||||
@@ -291,9 +291,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
|
||||
// Register metrics collector for active timelines. It's important to do this
|
||||
// after daemonizing, otherwise process collector will be upset.
|
||||
let registry = metrics::default_registry();
|
||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
|
||||
registry.register(Box::new(timeline_collector))?;
|
||||
metrics::register_internal(Box::new(timeline_collector))?;
|
||||
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
let mut threads = vec![];
|
||||
|
||||
@@ -91,25 +91,10 @@ class NeonCompare(PgCompare):
|
||||
self._pg_bin = pg_bin
|
||||
self.pageserver_http_client = self.env.pageserver.http_client()
|
||||
|
||||
self.tenant, _ = self.env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
|
||||
# HACK enable request tracing, as an experiment
|
||||
# NOTE This must be done before the pg starts pagestream
|
||||
# TODO why does it not work?
|
||||
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
|
||||
# "trace_read_requests": "true",
|
||||
# })
|
||||
|
||||
# We only use one branch and one timeline
|
||||
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
|
||||
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||
self._pg = self.env.postgres.create_start(
|
||||
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
|
||||
self.env.neon_cli.create_branch(branch_name, "empty")
|
||||
self._pg = self.env.postgres.create_start(branch_name)
|
||||
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
|
||||
@property
|
||||
def pg(self):
|
||||
@@ -124,10 +109,10 @@ class NeonCompare(PgCompare):
|
||||
return self._pg_bin
|
||||
|
||||
def flush(self):
|
||||
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
|
||||
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
|
||||
|
||||
def compact(self):
|
||||
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
|
||||
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
|
||||
|
||||
def report_peak_memory_use(self) -> None:
|
||||
self.zenbenchmark.record(
|
||||
@@ -139,7 +124,7 @@ class NeonCompare(PgCompare):
|
||||
|
||||
def report_size(self) -> None:
|
||||
timeline_size = self.zenbenchmark.get_timeline_size(
|
||||
self.env.repo_dir, self.tenant, self.timeline
|
||||
self.env.repo_dir, self.env.initial_tenant, self.timeline
|
||||
)
|
||||
self.zenbenchmark.record(
|
||||
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
||||
|
||||
@@ -1136,6 +1136,19 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def timeline_get_lsn_by_timestamp(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
|
||||
):
|
||||
log.info(
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||
)
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
return res_json
|
||||
|
||||
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
|
||||
res = self.put(
|
||||
@@ -1187,6 +1200,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
arguments: List[str],
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
check_return_code=True,
|
||||
timeout=None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
"""
|
||||
Run the command with the specified arguments.
|
||||
@@ -1233,6 +1247,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=timeout,
|
||||
)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
@@ -1606,6 +1621,14 @@ class WalCraft(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
|
||||
|
||||
class ComputeCtl(AbstractNeonCli):
|
||||
"""
|
||||
A typed wrapper around the `compute_ctl` CLI tool.
|
||||
"""
|
||||
|
||||
COMMAND = "compute_ctl"
|
||||
|
||||
|
||||
class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
An object representing a running pageserver.
|
||||
@@ -1749,37 +1772,6 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_version)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReplayBin:
|
||||
"""A helper class for replaying pageserver wal and read traces."""
|
||||
|
||||
traces_dir: str
|
||||
|
||||
def replay_all(self, pageserver_connstr):
|
||||
replay_binpath = os.path.join(str(neon_binpath), "replay")
|
||||
args = [
|
||||
replay_binpath,
|
||||
self.traces_dir,
|
||||
pageserver_connstr,
|
||||
]
|
||||
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
|
||||
subprocess.run(args)
|
||||
|
||||
def draw_all(self):
|
||||
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
|
||||
args = [
|
||||
draw_binpath,
|
||||
self.traces_dir,
|
||||
]
|
||||
subprocess.run(args)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def replay_bin(test_output_dir):
|
||||
traces_dir = os.path.join(test_output_dir, "repo", "traces")
|
||||
return ReplayBin(traces_dir)
|
||||
|
||||
|
||||
class VanillaPostgres(PgProtocol):
|
||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||
super().__init__(host="localhost", port=port, dbname="postgres")
|
||||
|
||||
@@ -175,11 +175,6 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
|
||||
# Run the pgbench tests, and generate a flamegraph from it
|
||||
# This requires that the pageserver was built with the 'profiling' feature.
|
||||
#
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
|
||||
|
||||
|
||||
# This test is a demonstration of how to do trace playback. With the current
|
||||
# workload it uses it's not testing anything meaningful.
|
||||
def test_trace_replay(
|
||||
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
|
||||
):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||
# env.neon_cli.config_tenant(tenant, conf={
|
||||
# "trace_read_requests": "true",
|
||||
# })
|
||||
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
|
||||
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
|
||||
|
||||
with zenbenchmark.record_duration("run"):
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("create table t (i integer);")
|
||||
cur.execute(f"insert into t values (generate_series(1,{10000}));")
|
||||
cur.execute("select count(*) from t;")
|
||||
|
||||
# Stop pg so we drop the connection and flush the traces
|
||||
pg.stop()
|
||||
|
||||
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||
# env.neon_cli.config_tenant(tenant, conf={
|
||||
# "trace_read_requests": "false",
|
||||
# })
|
||||
|
||||
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
|
||||
# assert trace_path.exists()
|
||||
|
||||
replay_bin.draw_all()
|
||||
return
|
||||
|
||||
print("replaying")
|
||||
ps_connstr = env.pageserver.connstr()
|
||||
with zenbenchmark.record_duration("replay"):
|
||||
output = replay_bin.replay_all(ps_connstr)
|
||||
print(output)
|
||||
@@ -6,6 +6,8 @@ from typing import List
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
|
||||
|
||||
@@ -88,3 +90,39 @@ def test_branching_with_pgbench(
|
||||
for pg in pgs:
|
||||
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
|
||||
assert res[0] == (100000 * scale,)
|
||||
|
||||
|
||||
# Test branching from an "unnormalized" LSN.
|
||||
#
|
||||
# Context:
|
||||
# When doing basebackup for a newly created branch, pageserver generates
|
||||
# 'pg_control' file to bootstrap WAL segment by specifying the redo position
|
||||
# a "normalized" LSN based on the timeline's starting LSN:
|
||||
#
|
||||
# checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0;
|
||||
#
|
||||
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
|
||||
#
|
||||
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
|
||||
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
XLOG_BLCKSZ = 8192
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("b0")
|
||||
pg0 = env.postgres.create_start("b0")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
|
||||
|
||||
with pg0.cursor() as cur:
|
||||
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
|
||||
# and is smaller than `curr_lsn`.
|
||||
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
|
||||
|
||||
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
|
||||
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
|
||||
pg1 = env.postgres.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])
|
||||
|
||||
203
test_runner/regress/test_compute_ctl.py
Normal file
203
test_runner/regress/test_compute_ctl.py
Normal file
@@ -0,0 +1,203 @@
|
||||
import os
|
||||
from subprocess import TimeoutExpired
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
|
||||
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
ctl = ComputeCtl(env)
|
||||
|
||||
env.neon_cli.create_branch("test_compute_ctl", "main")
|
||||
pg = env.postgres.create_start("test_compute_ctl")
|
||||
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
|
||||
with open(pg.config_file_path(), "r") as f:
|
||||
cfg_lines = f.readlines()
|
||||
cfg_map = {}
|
||||
for line in cfg_lines:
|
||||
if "=" in line:
|
||||
k, v = line.split("=")
|
||||
cfg_map[k] = v.strip("\n '\"")
|
||||
log.info(f"postgres config: {cfg_map}")
|
||||
pgdata = pg.pg_data_dir_path()
|
||||
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
|
||||
|
||||
pg.stop_and_destroy()
|
||||
|
||||
spec = (
|
||||
"""
|
||||
{
|
||||
"format_version": 1.0,
|
||||
|
||||
"timestamp": "2021-05-23T18:25:43.511Z",
|
||||
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
|
||||
|
||||
"cluster": {
|
||||
"cluster_id": "test-cluster-42",
|
||||
"name": "Neon Test",
|
||||
"state": "restarted",
|
||||
"roles": [
|
||||
],
|
||||
"databases": [
|
||||
],
|
||||
"settings": [
|
||||
{
|
||||
"name": "fsync",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "replica",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "hot_standby",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.safekeepers"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "log_connections",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "shared_buffers",
|
||||
"value": "32768",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "port",
|
||||
"value": """
|
||||
+ f'"{cfg_map["port"]}"'
|
||||
+ """,
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_connections",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_senders",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "listen_addresses",
|
||||
"value": "0.0.0.0",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_sender_timeout",
|
||||
"value": "0",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "password_encryption",
|
||||
"value": "md5",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "maintenance_work_mem",
|
||||
"value": "65536",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_parallel_workers",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_worker_processes",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.tenant_id",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.tenant_id"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_slots",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.timeline_id",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.timeline_id"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "shared_preload_libraries",
|
||||
"value": "neon",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "synchronous_standby_names",
|
||||
"value": "walproposer",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.pageserver_connstring",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
}
|
||||
]
|
||||
},
|
||||
"delta_operations": [
|
||||
]
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
ps_connstr = cfg_map["neon.pageserver_connstring"]
|
||||
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
|
||||
|
||||
# run compute_ctl and wait for 10s
|
||||
try:
|
||||
ctl.raw_cli(
|
||||
["--connstr", ps_connstr, "--pgdata", pgdata, "--spec", spec, "--pgbin", pg_bin_path],
|
||||
timeout=10,
|
||||
)
|
||||
except TimeoutExpired as exc:
|
||||
ctl_logs = exc.stderr.decode("utf-8")
|
||||
log.info("compute_ctl output:\n" + ctl_logs)
|
||||
|
||||
start = "starting safekeepers syncing"
|
||||
end = "safekeepers synced at LSN"
|
||||
start_pos = ctl_logs.index(start)
|
||||
assert start_pos != -1
|
||||
end_pos = ctl_logs.index(end, start_pos)
|
||||
assert end_pos != -1
|
||||
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
|
||||
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
|
||||
|
||||
# assert that --sync-safekeepers logs are present in the output
|
||||
assert "connecting with node" in sync_safekeepers_logs
|
||||
assert "connected with node" in sync_safekeepers_logs
|
||||
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
|
||||
assert "got votes from majority (2)" in sync_safekeepers_logs
|
||||
assert "sending elected msg to node" in sync_safekeepers_logs
|
||||
@@ -15,7 +15,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
pgmain = env.postgres.create_start("test_lsn_mapping")
|
||||
log.info("postgres is running on 'test_lsn_mapping' branch")
|
||||
|
||||
ps_cur = env.pageserver.connect().cursor()
|
||||
cur = pgmain.connect().cursor()
|
||||
# Create table, and insert rows, each in a separate transaction
|
||||
# Disable synchronous_commit to make this initialization go faster.
|
||||
@@ -38,37 +37,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Wait until WAL is received by pageserver
|
||||
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
|
||||
|
||||
# Check edge cases: timestamp in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = query_scalar(
|
||||
ps_cur,
|
||||
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
|
||||
)
|
||||
assert result == "future"
|
||||
|
||||
# timestamp too the far history
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = query_scalar(
|
||||
ps_cur,
|
||||
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
|
||||
)
|
||||
assert result == "past"
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
lsn = query_scalar(
|
||||
ps_cur,
|
||||
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
|
||||
with env.pageserver.http_client() as client:
|
||||
# Check edge cases: timestamp in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
assert result == "future"
|
||||
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
# that were supposed to be committed at that point in time are visible.
|
||||
pg_here = env.postgres.create_start(
|
||||
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
|
||||
# timestamp too the far history
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
|
||||
assert result == "past"
|
||||
|
||||
pg_here.stop_and_destroy()
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
lsn = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
# that were supposed to be committed at that point in time are visible.
|
||||
pg_here = env.postgres.create_start(
|
||||
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
|
||||
)
|
||||
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
|
||||
|
||||
pg_here.stop_and_destroy()
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2.extras
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 9383aaa9c2...ff18cec1ee
Reference in New Issue
Block a user