Compare commits

...

20 Commits

Author SHA1 Message Date
Konstantin Knizhnik
121c19fcd6 Replace R-Tree with B-Tree in layer map 2022-10-07 13:11:24 +03:00
Arthur Petukhovsky
687ba81366 Display sync safekeepers output in compute_ctl (#2571)
Pipe postgres output to compute_ctl stdout and create a test to check that compute_ctl works and prints postgres logs.
2022-10-06 13:53:52 +00:00
Andrés
47bae68a2e Make get_lsn_by_timestamp available in mgmt API (#2536) (#2560)
Co-authored-by: andres <andres.rodriguez@outlook.es>
2022-10-06 12:42:50 +03:00
Joonas Koivunen
e8b195acb7 fix: apply notify workaround on m1 mac docker (#2564)
workaround as discussed in the notify repository.
2022-10-06 11:13:40 +03:00
Anastasia Lubennikova
254cb7dc4f Update CI script to push compute-node-v15 to dockerhub 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
ed85d97f17 bump vendor/postgres-v15. Rebase it to Stamp 15rc2 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
4a216c5f7f Use PostGIS 3.3.1 that is compatible with pg 15 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
c5a428a61a Update Dockerfile.compute-node-v15 to match v14 version.
Fix build script to promote the image for v15 to neon dockerhub
2022-10-06 10:50:08 +03:00
Konstantin Knizhnik
ff8c481777 Normalize last_record LSN in wal receiver (#2529)
* Add test for branching on page boundary

* Normalize start recovery point

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>

Co-authored-by:  Thang Pham <thang@neon.tech>
2022-10-06 09:01:56 +03:00
Arthur Petukhovsky
f25dd75be9 Fix deadlock in safekeeper metrics (#2566)
We had a problem where almost all of the threads were waiting on a futex syscall. More specifically:
- `/metrics` handler was inside `TimelineCollector::collect()`, waiting on a mutex for a single Timeline
- This exact timeline was inside `control_file::FileStorage::persist()`, waiting on a mutex for Lazy initialization of `PERSIST_CONTROL_FILE_SECONDS`
- `PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram>` was blocked on `prometheus::register`
- `prometheus::register` calls `DEFAULT_REGISTRY.write().register()` to take a write lock on Registry and add a new metric
- `DEFAULT_REGISTRY` lock was already taken inside `DEFAULT_REGISTRY.gather()`, which was called by `/metrics` handler to collect all metrics

This commit creates another Registry with a separate lock, to avoid deadlock in a case where `TimelineCollector` triggers registration of new metrics inside default registry.
2022-10-06 01:07:02 +03:00
Sergey Melnikov
b99bed510d Move proxies to neon-proxy namespace (#2555) 2022-10-05 16:14:09 +03:00
sharnoff
580584c8fc Remove control_plane deps on pageserver/safekeeper (#2513)
Creates new `pageserver_api` and `safekeeper_api` crates to serve as the
shared dependencies. Should reduce both recompile times and cold compile
times.

Decreases the size of the optimized `neon_local` binary: 380M -> 179M.
No significant changes for anything else (mostly as expected).
2022-10-04 11:14:45 -07:00
Kirill Bulatov
d823e84ed5 Allow attaching tenants with zero timelines 2022-10-04 18:13:51 +03:00
Kirill Bulatov
231dfbaed6 Do not remove empty timelines/ directory for tenants 2022-10-04 18:13:51 +03:00
Dmitry Rodionov
5cf53786f9 Improve pytest ergonomics
1. Disable perf tests by default
2. Add instruction to run tests in parallel
2022-10-04 14:53:01 +03:00
Heikki Linnakangas
9b9bbad462 Use 'notify' crate to wait for PostgreSQL startup.
Compute node startup time is very important. After launching
PostgreSQL, use 'notify' to be notified immediately when it has
updated the PID file, instead of polling. The polling loop had 100 ms
interval so this shaves up to 100 ms from the startup time.
2022-10-04 13:00:15 +03:00
Heikki Linnakangas
537b2c1ae6 Remove unnecessary check for open PostgreSQL TCP port.
The loop checked if the TCP port is open for connections, by trying to
connect to it. That seems unnecessary. By the time the postmaster.pid
file says that it's ready, the port should be open. Remove that check.
2022-10-04 12:09:13 +03:00
Joonas Koivunen
31123d1fa8 Silence clippies, minor doc fix (#2543)
* doc: remove stray backtick

* chore: clippy::let_unit_value

* chore: silence useless_transmute, duplicate_mod

* chore: remove allowing deref_nullptr

not needed since bindgen 0.60.0.

* chore: remove repeated allowed lints

they are already allowed from the crate root.
2022-10-03 17:44:17 +03:00
Kirill Bulatov
4f2ac51bdd Bump rustc to 1.61 2022-10-03 16:36:03 +03:00
Kirill Bulatov
7b2f9dc908 Reuse existing tenants during attach (#2540) 2022-10-03 13:33:55 +03:00
50 changed files with 1113 additions and 480 deletions

View File

@@ -127,8 +127,8 @@ jobs:
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-
- name: Cache postgres v14 build
id: cache_pg_14
@@ -389,7 +389,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git/
target/
key: v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
key: v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact
uses: ./.github/actions/download
@@ -564,7 +564,7 @@ jobs:
promote-images:
runs-on: dev
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-tools-image ]
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli
strategy:
@@ -573,7 +573,7 @@ jobs:
# compute-node uses postgres 14, which is default now
# cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated
name: [ neon, compute-node, compute-node-v14, compute-tools ]
name: [ neon, compute-node, compute-node-v14, compute-node-v15, compute-tools ]
steps:
- name: Promote image to latest
@@ -608,6 +608,9 @@ jobs:
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
@@ -638,6 +641,9 @@ jobs:
- name: Push compute node v14 image to Docker Hub
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push compute node v15 image to Docker Hub
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
@@ -650,6 +656,7 @@ jobs:
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
calculate-deploy-targets:
runs-on: [ self-hosted, Linux, k8s-runner ]
@@ -768,5 +775,5 @@ jobs:
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -106,7 +106,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git
target
key: v4-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
key: v5-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
- name: Run cargo clippy
run: ./run_clippy.sh

98
Cargo.lock generated
View File

@@ -497,8 +497,10 @@ dependencies = [
"chrono",
"clap 3.2.16",
"env_logger",
"futures",
"hyper",
"log",
"notify",
"postgres",
"regex",
"serde",
@@ -540,11 +542,11 @@ dependencies = [
"git-version",
"nix",
"once_cell",
"pageserver",
"pageserver_api",
"postgres",
"regex",
"reqwest",
"safekeeper",
"safekeeper_api",
"serde",
"serde_with",
"tar",
@@ -1072,6 +1074,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures"
version = "0.3.21"
@@ -1493,6 +1504,26 @@ dependencies = [
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -1552,6 +1583,26 @@ dependencies = [
"simple_asn1",
]
[[package]]
name = "kqueue"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "kstring"
version = "1.0.6"
@@ -1797,6 +1848,24 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -1975,6 +2044,7 @@ dependencies = [
"nix",
"num-traits",
"once_cell",
"pageserver_api",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -2003,6 +2073,17 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -2891,6 +2972,7 @@ dependencies = [
"postgres_ffi",
"regex",
"remote_storage",
"safekeeper_api",
"serde",
"serde_json",
"serde_with",
@@ -2906,6 +2988,17 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "safekeeper_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "same-file"
version = "1.0.6"
@@ -4142,6 +4235,7 @@ dependencies = [
"bstr",
"bytes",
"chrono",
"crossbeam-utils",
"either",
"fail",
"hashbrown",

View File

@@ -5,7 +5,7 @@
ARG TAG=pinned
# apparently, ARGs don't get replaced in RUN commands in kaniko
# ARG POSTGIS_VERSION=3.3.0
# ARG POSTGIS_VERSION=3.3.1
# ARG PLV8_VERSION=3.1.4
# ARG PG_VERSION=v15
@@ -13,9 +13,12 @@ ARG TAG=pinned
# Layer "build-deps"
#
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
#
# Layer "pg-build"
@@ -42,11 +45,11 @@ RUN cd postgres && \
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
tar xvzf postgis-3.3.0.tar.gz && \
cd postgis-3.3.0 && \
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
tar xvzf postgis-3.3.1.tar.gz && \
cd postgis-3.3.1 && \
./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure && \
@@ -64,15 +67,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
# Build plv8
#
FROM build-deps AS plv8-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
# https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -84,12 +85,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#
# Layer "h3-pg-build"
# Build h3_pg
#
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
cd h3-4.0.1 && \
mkdir build && \
cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/h3 make install && \
cp -R /h3/usr / && \
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
tar xvzf h3-pg.tgz && \
cd h3-pg-4.0.1 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -137,8 +172,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
chmod 0750 /var/db/postgres/compute && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# TODO: Check if we can make the extension setup more modular versus a linear build
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -8,8 +8,10 @@ anyhow = "1.0"
chrono = "0.4"
clap = "3.0"
env_logger = "0.9"
futures = "0.3.13"
hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] }
notify = "5.0.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -178,7 +178,6 @@ impl ComputeNode {
.args(&["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("postgres --sync-safekeepers failed to start");
@@ -191,10 +190,10 @@ impl ComputeNode {
if !sync_output.status.success() {
anyhow::bail!(
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}, stderr: {}",
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
sync_output.status,
String::from_utf8(sync_output.stdout).expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
String::from_utf8(sync_output.stderr).expect("postgres --sync-safekeepers exited, and stderr is not utf-8"),
String::from_utf8(sync_output.stdout)
.expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
);
}
@@ -258,14 +257,7 @@ impl ComputeNode {
.spawn()
.expect("cannot start postgres process");
// Try default Postgres port if it is not provided
let port = self
.spec
.cluster
.settings
.find("port")
.unwrap_or_else(|| "5432".to_string());
wait_for_postgres(&mut pg, &port, pgdata_path)?;
wait_for_postgres(&mut pg, pgdata_path)?;
// If connection fails,
// it may be the old node with `zenith_admin` superuser.

View File

@@ -1,18 +1,19 @@
use std::fmt::Write;
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::net::{SocketAddr, TcpStream};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Child;
use std::str::FromStr;
use std::{fs, thread, time};
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use postgres::{Client, Transaction};
use serde::Deserialize;
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
use notify::{RecursiveMode, Watcher};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
@@ -230,52 +231,112 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
Ok(postgres_dbs)
}
/// Wait for Postgres to become ready to accept connections:
/// - state should be `ready` in the `pgdata/postmaster.pid`
/// - and we should be able to connect to 127.0.0.1:5432
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");
let mut slept: u64 = 0; // ms
let pause = time::Duration::from_millis(100);
let timeout = time::Duration::from_millis(10);
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
// completed initialization and is ready to accept connections. We want to
// react quickly and perform the rest of our initialization as soon as
// PostgreSQL starts accepting connections. Use 'notify' to be notified
// whenever the PID file is changed, and whenever it changes, read it to
// check if it's now "ready".
//
// You cannot actually watch a file before it exists, so we first watch the
// data directory, and once the postmaster.pid file appears, we switch to
// watch the file instead. We also wake up every 100 ms to poll, just in
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
let _ = tx.send(res);
}) {
Ok(watcher) => (Box::new(watcher), rx),
Err(e) => {
match e.kind {
notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
// docker on m1 macs does not support recommended_watcher
// but return "Function not implemented (os error 38)"
// see https://github.com/notify-rs/notify/issues/423
let (tx, rx) = std::sync::mpsc::channel();
loop {
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
// but postgres starts listening almost immediately, even if it is not really
// ready to accept connections).
if slept >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
// let's poll it faster than what we check the results for (100ms)
let config =
notify::Config::default().with_poll_interval(Duration::from_millis(50));
let watcher = notify::PollWatcher::new(
move |res| {
let _ = tx.send(res);
},
config,
)?;
(Box::new(watcher), rx)
}
_ => return Err(e.into()),
}
}
};
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
let started_at = Instant::now();
let mut postmaster_pid_seen = false;
loop {
if let Ok(Some(status)) = pg.try_wait() {
// Postgres exited, that is not what we expected, bail out earlier.
let code = status.code().unwrap_or(-1);
bail!("Postgres exited unexpectedly with code {}", code);
}
let res = rx.recv_timeout(Duration::from_millis(100));
log::debug!("woken up by notify: {res:?}");
// If there are multiple events in the channel already, we only need to be
// check once. Swallow the extra events before we go ahead to check the
// pid file.
while let Ok(res) = rx.try_recv() {
log::debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
log::debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
watcher
.watch(&pid_path, RecursiveMode::NonRecursive)
.expect("Failed to add postmaster.pid file watch");
postmaster_pid_seen = true;
}
let file = BufReader::new(file);
let last_line = file.lines().last();
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
log::debug!("last line of postmaster.pid: {status:?}");
// Now Postgres is ready to accept connections
if status == "ready" && can_connect {
if status == "ready" {
break;
}
}
}
thread::sleep(pause);
slept += 100;
// Give up after POSTGRES_WAIT_TIMEOUT.
let duration = started_at.elapsed();
if duration >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
}
log::info!("PostgreSQL is now running, continuing to configure it");
Ok(())
}

View File

@@ -19,7 +19,9 @@ thiserror = "1"
nix = "0.23"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
pageserver = { path = "../pageserver" }
safekeeper = { path = "../safekeeper" }
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.
pageserver_api = { path = "../libs/pageserver_api" }
safekeeper_api = { path = "../libs/safekeeper_api" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env};
use pageserver::config::defaults::{
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use pageserver::http::models::TimelineInfo;
use safekeeper::defaults::{
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};

View File

@@ -12,7 +12,7 @@ use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use safekeeper::http::models::TimelineCreateRequest;
use safekeeper_api::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
connstring::connection_address,

View File

@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{
use pageserver_api::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres::{Config, NoTls};

View File

@@ -96,7 +96,7 @@ A single virtual environment with all dependencies is described in the single `P
sudo apt install python3.9
```
- Install `poetry`
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation)`.
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation).
- Install dependencies via `./scripts/pysync`.
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
so if you have different version some linting tools can yield different result locally vs in the CI.

View File

@@ -3,7 +3,7 @@
//! Otherwise, we might not see all metrics registered via
//! a default registry.
use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::{core, default_registry, proto};
@@ -17,6 +17,7 @@ pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
use prometheus::{Registry, Result};
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
@@ -32,13 +33,27 @@ macro_rules! register_uint_gauge_vec {
}};
}
/// Special internal registry, to collect metrics independently from the default registry.
/// Was introduced to fix deadlock with lazy registration of metrics in the default registry.
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> Result<()> {
INTERNAL_REGISTRY.register(c)
}
/// Gathers all Prometheus metrics and records the I/O stats just before that.
///
/// Metrics gathering is a relatively simple and standalone operation, so
/// it might be fine to do it this way to keep things simple.
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
update_rusage_metrics();
prometheus::gather()
let mut mfs = prometheus::gather();
let mut internal_mfs = INTERNAL_REGISTRY.gather();
mfs.append(&mut internal_mfs);
mfs
}
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {

View File

@@ -0,0 +1,12 @@
[package]
name = "pageserver_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,9 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -7,7 +7,17 @@ use utils::{
lsn::Lsn,
};
use crate::tenant::TenantState;
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
#[serde_as]
#[derive(Serialize, Deserialize)]

View File

@@ -3,9 +3,11 @@
#![allow(non_snake_case)]
// bindgen creates some unsafe code with no doc comments.
#![allow(clippy::missing_safety_doc)]
// suppress warnings on rust 1.53 due to bindgen unit tests.
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
#![allow(clippy::useless_transmute)]
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;

View File

@@ -57,12 +57,10 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
}
#[allow(non_snake_case)]
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,
offset: u32,
@@ -71,7 +69,6 @@ pub fn XLogSegNoOffsetToRecPtr(
segno * (wal_segsz_bytes as u64) + (offset as u64)
}
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
format!(
"{:>08X}{:>08X}{:>08X}",
@@ -81,7 +78,6 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}
#[allow(non_snake_case)]
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
@@ -89,12 +85,10 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
}
#[allow(non_snake_case)]
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
}
#[allow(non_snake_case)]
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
}

View File

@@ -0,0 +1,12 @@
[package]
name = "safekeeper_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,10 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -9,6 +9,7 @@ use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use tokio::task::JoinError;
use tracing::info;
use std::future::Future;
@@ -35,7 +36,13 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metrics = metrics::gather();
let metrics = tokio::task::spawn_blocking(move || {
// Currently we take a lot of mutexes while collecting metrics, so it's
// better to spawn a blocking task to avoid blocking the event loop.
metrics::gather()
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
encoder.encode(&metrics, &mut buffer).unwrap();
let response = Response::builder()

View File

@@ -58,6 +58,7 @@ rstar = "0.9.3"
num-traits = "0.2.15"
amplify_num = "0.4.1"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }

View File

@@ -30,10 +30,10 @@ pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;

View File

@@ -207,6 +207,62 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get LSN by a timestamp
parameters:
- name: timestamp
in: query
required: true
schema:
type: string
format: date-time
description: A timestamp to get the LSN
responses:
"200":
description: OK
content:
application/json:
schema:
type: string
"400":
description: Error when no tenant id found in path, no timeline id or invalid timestamp
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id

View File

@@ -12,6 +12,7 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant::{TenantState, Timeline};
@@ -265,6 +266,23 @@ fn query_param_present(request: &Request<Body>, param: &str) -> bool {
.unwrap_or(false)
}
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
request.uri().query().map_or(
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|v| {
url::form_urlencoded::parse(v.as_bytes())
.into_owned()
.find(|(k, _)| k == param_name)
.map_or(
Err(ApiError::BadRequest(anyhow!(
"no {param_name} specified in query parameters"
))),
|(_, v)| Ok(v),
)
},
)
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -329,6 +347,33 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
}
}
async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let timeline = tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id))
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
let result = match timeline
.find_lsn_for_timestamp(timestamp_pg)
.map_err(ApiError::InternalServerError)?
{
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
json_response(StatusCode::OK, result)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
@@ -337,9 +382,16 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
info!("Handling tenant attach {tenant_id}");
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
Ok(_) => Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
)),
Ok(tenant) => {
if tenant.list_timelines().is_empty() {
info!("Attaching to tenant {tenant_id} with zero timelines");
Ok(())
} else {
Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
))
}
}
Err(_) => Ok(()),
})
.await
@@ -901,6 +953,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
get_lsn_by_timestamp_handler,
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
testing_api!("run timeline GC", timeline_gc_handler),

View File

@@ -12,7 +12,6 @@
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
use std::net::TcpListener;
use std::str;
@@ -35,7 +34,6 @@ use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::task_mgr;
@@ -45,7 +43,6 @@ use crate::tenant_mgr;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
@@ -1062,33 +1059,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("get_lsn_by_timestamp ") {
// Locate LSN of last transaction with timestamp less or equal than sppecified
// TODO lazy static
let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$")
.unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
let timestamp_pg = to_pg_timestamp(timestamp);
self.check_permission(Some(tenant_id))?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"lsn",
)]))?;
let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? {
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
pgb.write_message(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {
bail!("unknown command");
}

View File

@@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task(
(storage, remote_index_clone, sync_queue),
max_sync_errors,
)
.instrument(info_span!("storage_sync_loop"))
.await;
Ok(())
},

View File

@@ -45,6 +45,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
use toml_edit;
use utils::{
@@ -118,18 +119,6 @@ pub struct Tenant {
upload_layers: bool,
}
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Tenant {
@@ -400,16 +389,19 @@ impl Tenant {
timeline_id,
metadata.pg_version()
);
let timeline = self
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
match timelines_accessor.entry(timeline.timeline_id) {
Entry::Occupied(_) => bail!(
"Found freshly initialized timeline {} in the tenant map",
timeline.timeline_id
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
match timelines_accessor.entry(timeline_id) {
Entry::Occupied(_) => warn!(
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
self.tenant_id, timeline_id
),
Entry::Vacant(v) => {
let timeline = self
.initialize_new_timeline(timeline_id, metadata, ancestor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
v.insert(timeline);
}
}
@@ -609,21 +601,14 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
new_metadata: TimelineMetadata,
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<Arc<Timeline>> {
let ancestor = match new_metadata.ancestor_timeline() {
Some(ancestor_timeline_id) => Some(
timelines
.get(&ancestor_timeline_id)
.cloned()
.with_context(|| {
format!(
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
})?,
),
None => None,
};
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
anyhow::ensure!(
ancestor.is_some(),
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
}
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version();
@@ -1080,8 +1065,12 @@ impl Tenant {
)
})?;
let ancestor = new_metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
.cloned();
let new_timeline = self
.initialize_new_timeline(new_timeline_id, new_metadata, timelines)
.initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
.with_context(|| {
format!(
"Failed to initialize timeline {}/{}",

View File

@@ -15,19 +15,25 @@ use crate::repository::Key;
use crate::tenant::inmemory_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use crate::tenant::storage_layer::{range_eq, range_overlaps};
use amplify_num::i256;
use anyhow::Result;
use num_traits::identities::{One, Zero};
use num_traits::{Bounded, Num, Signed};
use rstar::{RTree, RTreeObject, AABB};
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
use std::ops::Range;
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
struct BTreeKey {
lsn: Lsn,
seq: usize,
}
impl BTreeKey {
fn new(lsn: Lsn) -> BTreeKey {
BTreeKey { lsn, seq: 0 }
}
}
///
/// LayerMap tracks what layers exist on a timeline.
///
@@ -53,165 +59,14 @@ pub struct LayerMap {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// All the historic layers are kept here
historic_layers: RTree<LayerRTreeObject>,
historic_layers: BTreeMap<BTreeKey, Arc<dyn Layer>>,
layers_seqno: usize,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<dyn Layer>>,
}
struct LayerRTreeObject {
layer: Arc<dyn Layer>,
}
// Representation of Key as numeric type.
// We can not use native implementation of i128, because rstar::RTree
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
// By using i256 as the type, even though all the actual values would fit in i128, we can be
// sure that multiplication doesn't overflow.
//
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
struct IntKey(i256);
impl Copy for IntKey {}
impl IntKey {
fn from(i: i128) -> Self {
IntKey(i256::from(i))
}
}
impl Bounded for IntKey {
fn min_value() -> Self {
IntKey(i256::MIN)
}
fn max_value() -> Self {
IntKey(i256::MAX)
}
}
impl Signed for IntKey {
fn is_positive(&self) -> bool {
self.0 > i256::ZERO
}
fn is_negative(&self) -> bool {
self.0 < i256::ZERO
}
fn signum(&self) -> Self {
match self.0.cmp(&i256::ZERO) {
Ordering::Greater => IntKey(i256::ONE),
Ordering::Less => IntKey(-i256::ONE),
Ordering::Equal => IntKey(i256::ZERO),
}
}
fn abs(&self) -> Self {
IntKey(self.0.abs())
}
fn abs_sub(&self, other: &Self) -> Self {
if self.0 <= other.0 {
IntKey(i256::ZERO)
} else {
IntKey(self.0 - other.0)
}
}
}
impl Neg for IntKey {
type Output = Self;
fn neg(self) -> Self::Output {
IntKey(-self.0)
}
}
impl Rem for IntKey {
type Output = Self;
fn rem(self, rhs: Self) -> Self::Output {
IntKey(self.0 % rhs.0)
}
}
impl Div for IntKey {
type Output = Self;
fn div(self, rhs: Self) -> Self::Output {
IntKey(self.0 / rhs.0)
}
}
impl Add for IntKey {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
IntKey(self.0 + rhs.0)
}
}
impl Sub for IntKey {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
IntKey(self.0 - rhs.0)
}
}
impl Mul for IntKey {
type Output = Self;
fn mul(self, rhs: Self) -> Self::Output {
IntKey(self.0 * rhs.0)
}
}
impl One for IntKey {
fn one() -> Self {
IntKey(i256::ONE)
}
}
impl Zero for IntKey {
fn zero() -> Self {
IntKey(i256::ZERO)
}
fn is_zero(&self) -> bool {
self.0 == i256::ZERO
}
}
impl Num for IntKey {
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
}
}
impl PartialEq for LayerRTreeObject {
fn eq(&self, other: &Self) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(&self.layer, &other.layer)
}
}
impl RTreeObject for LayerRTreeObject {
type Envelope = AABB<[IntKey; 2]>;
fn envelope(&self) -> Self::Envelope {
let key_range = self.layer.get_key_range();
let lsn_range = self.layer.get_lsn_range();
AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
)
}
}
/// Return value of LayerMap::search
pub struct SearchResult {
pub layer: Arc<dyn Layer>,
@@ -234,23 +89,17 @@ impl LayerMap {
// linear search
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<dyn Layer>> = None;
let mut latest_img_lsn: Option<Lsn> = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
[
IntKey::from(key.to_i128()),
IntKey::from(end_lsn.0 as i128 - 1),
],
);
for e in self
let mut latest_img_lsn = Lsn(0);
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
while let Some((_key, l)) = iter.next_back() {
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
if !l.get_key_range().contains(&key) {
continue;
}
let img_lsn = l.get_lsn_range().start;
assert!(img_lsn < end_lsn);
if Lsn(img_lsn.0 + 1) == end_lsn {
@@ -260,23 +109,23 @@ impl LayerMap {
lsn_floor: img_lsn,
}));
}
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
latest_img = Some(Arc::clone(l));
latest_img_lsn = Some(img_lsn);
}
latest_img = Some(Arc::clone(l));
latest_img_lsn = img_lsn;
break;
}
// Search the delta layers
let mut latest_delta: Option<Arc<dyn Layer>> = None;
for e in self
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
while let Some((_key, l)) = iter.next_back() {
if !l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
if !l.get_key_range().contains(&key) {
continue;
}
if l.get_lsn_range().start >= end_lsn {
info!(
"Candidate delta layer {}..{} is too new for lsn {}",
@@ -286,6 +135,9 @@ impl LayerMap {
);
}
assert!(l.get_lsn_range().start < end_lsn);
if l.get_lsn_range().end <= latest_img_lsn {
continue;
}
if l.get_lsn_range().end >= end_lsn {
// this layer contains the requested point in the key/lsn space.
// No need to search any further
@@ -311,10 +163,7 @@ impl LayerMap {
"found (old) layer {} for request on {key} at {end_lsn}",
l.filename().display(),
);
let lsn_floor = std::cmp::max(
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
l.get_lsn_range().start,
);
let lsn_floor = std::cmp::max(Lsn(latest_img_lsn.0 + 1), l.get_lsn_range().start);
Ok(Some(SearchResult {
lsn_floor,
layer: l,
@@ -322,7 +171,7 @@ impl LayerMap {
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Ok(Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
lsn_floor: latest_img_lsn,
layer: l,
}))
} else {
@@ -338,7 +187,14 @@ impl LayerMap {
if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone());
}
self.historic_layers.insert(LayerRTreeObject { layer });
self.historic_layers.insert(
BTreeKey {
lsn: layer.get_lsn_range().start,
seq: self.layers_seqno,
},
layer,
);
self.layers_seqno += 1;
NUM_ONDISK_LAYERS.inc();
}
@@ -360,10 +216,26 @@ impl LayerMap {
.retain(|other| !Arc::ptr_eq(other, &layer));
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
}
assert!(self
.historic_layers
.remove(&LayerRTreeObject { layer })
.is_some());
let len_before = self.historic_layers.len();
#[allow(clippy::vtable_address_comparisons)]
self.historic_layers
.retain(|_key, other| !Arc::ptr_eq(other, &layer));
if self.historic_layers.len() != len_before - 1 {
assert!(self.historic_layers.len() == len_before);
error!(
"Failed to remove {} layer: {}..{}__{}..{}",
if layer.is_incremental() {
"inremental"
} else {
"image"
},
layer.get_key_range().start,
layer.get_key_range().end,
layer.get_lsn_range().start,
layer.get_lsn_range().end
);
}
assert!(self.historic_layers.len() == len_before - 1);
NUM_ONDISK_LAYERS.dec();
}
@@ -380,21 +252,10 @@ impl LayerMap {
loop {
let mut made_progress = false;
let envelope = AABB::from_corners(
[
IntKey::from(range_remain.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(range_remain.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
for (_key, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{
let l = &e.layer;
if l.is_incremental() {
continue;
}
@@ -417,39 +278,30 @@ impl LayerMap {
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
self.historic_layers.iter().map(|e| e.layer.clone())
self.historic_layers
.iter()
.map(|(_key, layer)| layer.clone())
}
/// Find the last image layer that covers 'key', ignoring any image layers
/// newer than 'lsn'.
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
let mut candidate_lsn = Lsn(0);
let mut candidate = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0)],
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
);
for e in self
let mut iter = self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
{
let l = &e.layer;
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1));
while let Some((_key, l)) = iter.next_back() {
if l.is_incremental() {
continue;
}
assert!(l.get_key_range().contains(&key));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
if this_lsn < candidate_lsn {
// our previous candidate was better
if !l.get_key_range().contains(&key) {
continue;
}
candidate_lsn = this_lsn;
candidate = Some(Arc::clone(l));
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
return Some(Arc::clone(l));
}
candidate
None
}
///
@@ -466,18 +318,10 @@ impl LayerMap {
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
let mut points = vec![key_range.start];
let envelope = AABB::from_corners(
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
[
IntKey::from(key_range.end.to_i128()),
IntKey::from(lsn.0 as i128),
],
);
for e in self
for (_lsn, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1))
{
let l = &e.layer;
assert!(l.get_lsn_range().start <= lsn);
let range = l.get_key_range();
if key_range.contains(&range.start) {
@@ -514,26 +358,17 @@ impl LayerMap {
if lsn_range.start >= lsn_range.end {
return Ok(0);
}
let envelope = AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
for (_lsn, l) in self
.historic_layers
.locate_in_envelope_intersecting(&envelope)
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{
let l = &e.layer;
if !l.is_incremental() {
continue;
}
if !range_overlaps(&l.get_key_range(), key_range) {
continue;
}
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
assert!(range_overlaps(&l.get_key_range(), key_range));
// We ignore level0 delta layers. Unless the whole keyspace fits
// into one partition
@@ -569,8 +404,8 @@ impl LayerMap {
}
println!("historic_layers:");
for e in self.historic_layers.iter() {
e.layer.dump(verbose)?;
for (_key, layer) in self.historic_layers.iter() {
layer.dump(verbose)?;
}
println!("End dump LayerMap");
Ok(())

View File

@@ -627,7 +627,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
let _ = spawn_connection_manager_task(
spawn_connection_manager_task(
self.conf.broker_etcd_prefix.clone(),
self_clone,
walreceiver_connect_timeout,

View File

@@ -107,6 +107,13 @@ pub fn init_tenant_mgr(
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
/// and the load continues.
///
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
///
/// Attach happens on startup and sucessful timeline downloads
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
pub fn attach_local_tenants(
conf: &'static PageServerConf,
remote_index: &RemoteIndex,
@@ -122,18 +129,20 @@ pub fn attach_local_tenants(
);
debug!("Timelines to attach: {local_timelines:?}");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
{
match tenants_state::write_tenants().entry(tenant_id) {
hash_map::Entry::Occupied(_) => {
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
continue;
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::clone(&tenant));
}
let mut tenants_accessor = tenants_state::write_tenants();
let tenant = match tenants_accessor.entry(tenant_id) {
hash_map::Entry::Occupied(o) => {
info!("Tenant {tenant_id} was found in pageserver's memory");
Arc::clone(o.get())
}
}
hash_map::Entry::Vacant(v) => {
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
v.insert(Arc::clone(&tenant));
tenant
}
};
drop(tenants_accessor);
if tenant.current_state() == TenantState::Broken {
warn!("Skipping timeline load for broken tenant {tenant_id}")
@@ -168,16 +177,28 @@ fn load_local_tenant(
remote_index.clone(),
conf.remote_storage_config.is_some(),
));
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
if !tenant_timelines_dir.is_dir() {
error!(
"Tenant {} has no timelines directory at {}",
tenant_id,
tenant_timelines_dir.display()
);
tenant.set_state(TenantState::Broken);
} else {
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
}
}
tenant
}
@@ -625,14 +646,10 @@ fn collect_timelines_for_tenant(
}
if tenant_timelines.is_empty() {
match remove_if_empty(&timelines_dir) {
Ok(true) => info!(
"Removed empty tenant timelines directory {}",
timelines_dir.display()
),
Ok(false) => (),
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
}
// this is normal, we've removed all broken, empty and temporary timeline dirs
// but should allow the tenant to stay functional and allow creating new timelines
// on a restart, we require tenants to have the timelines dir, so leave it on disk
debug!("Tenant {tenant_id} has no timelines loaded");
}
Ok((tenant_id, tenant_timelines))

View File

@@ -12,6 +12,8 @@ use chrono::{NaiveDateTime, Utc};
use fail::fail_point;
use futures::StreamExt;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
@@ -156,6 +158,14 @@ pub async fn handle_walreceiver_connection(
// There might be some padding after the last full record, skip it.
startpoint += startpoint.calc_padding(8u32);
// If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
// for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
//. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
// but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
// header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
// to the safekeepers.
startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");

View File

@@ -129,13 +129,13 @@ mod tests {
assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send");
let () = futures::future::pending().await; // sleep forever
futures::future::pending::<()>().await; // sleep forever
Ok(())
}));
// Wait until the task has been spawned.
let () = rx.await.context("failed to hear from the task")?;
rx.await.context("failed to hear from the task")?;
// Drop the session's entry by cancelling the task.
task.abort();

View File

@@ -5,6 +5,7 @@ filterwarnings =
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
addopts =
-m 'not remote_cluster'
--ignore=test_runner/performance
markers =
remote_cluster
testpaths =

View File

@@ -1,11 +1,10 @@
[toolchain]
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
# version, we can consider updating. As of this writing, 1.60 is available on Debian 'experimental' but not yet on
# 'testing' or even 'unstable', which is a bit more cutting-edge than we'd like. Hopefully the 1.60 packages reach
# 'testing' soon (and similarly for the other distributions).
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package.
channel = "1.60" # do update GitHub CI cache values for rust builds, when changing this value
# version, we can consider updating.
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package,
# we use "unstable" version number as the highest version used in the project by default.
channel = "1.61" # do update GitHub CI cache values for rust builds, when changing this value
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -33,6 +33,7 @@ toml_edit = { version = "0.13", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }

View File

@@ -291,9 +291,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let registry = metrics::default_registry();
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
registry.register(Box::new(timeline_collector))?;
metrics::register_internal(Box::new(timeline_collector))?;
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use safekeeper_api::models;

View File

@@ -27,14 +27,13 @@ mod timelines_global_map;
pub use timelines_global_map::GlobalTimelines;
pub mod defaults {
use const_format::formatcp;
use std::time::Duration;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub use safekeeper_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
}

View File

@@ -56,6 +56,14 @@ If you want to run all tests that have the string "bench" in their names:
`./scripts/pytest -k bench`
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
`./scripts/pytest -n4`
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
`./scripts/pytest test_runner/performance`
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.

View File

@@ -455,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]:
@@ -583,7 +586,9 @@ class NeonEnvBuilder:
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
@@ -1131,6 +1136,19 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def timeline_get_lsn_by_timestamp(
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
@@ -1182,6 +1200,7 @@ class AbstractNeonCli(abc.ABC):
arguments: List[str],
extra_env_vars: Optional[Dict[str, str]] = None,
check_return_code=True,
timeout=None,
) -> "subprocess.CompletedProcess[str]":
"""
Run the command with the specified arguments.
@@ -1228,6 +1247,7 @@ class AbstractNeonCli(abc.ABC):
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
if not res.returncode:
log.info(f"Run success: {res.stdout}")
@@ -1601,6 +1621,14 @@ class WalCraft(AbstractNeonCli):
res.check_returncode()
class ComputeCtl(AbstractNeonCli):
"""
A typed wrapper around the `compute_ctl` CLI tool.
"""
COMMAND = "compute_ctl"
class NeonPageserver(PgProtocol):
"""
An object representing a running pageserver.

View File

@@ -6,6 +6,8 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
@@ -88,3 +90,39 @@ def test_branching_with_pgbench(
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)
# Test branching from an "unnormalized" LSN.
#
# Context:
# When doing basebackup for a newly created branch, pageserver generates
# 'pg_control' file to bootstrap WAL segment by specifying the redo position
# a "normalized" LSN based on the timeline's starting LSN:
#
# checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0;
#
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
#
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
XLOG_BLCKSZ = 8192
env = neon_simple_env
env.neon_cli.create_branch("b0")
pg0 = env.postgres.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
with pg0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
# and is smaller than `curr_lsn`.
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
pg1 = env.postgres.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])

View File

@@ -0,0 +1,203 @@
import os
from subprocess import TimeoutExpired
from fixtures.log_helper import log
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(pg.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
if "=" in line:
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = pg.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
pg.stop_and_destroy()
spec = (
"""
{
"format_version": 1.0,
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Neon Test",
"state": "restarted",
"roles": [
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": """
+ f'"{cfg_map["neon.safekeepers"]}"'
+ """,
"vartype": "string"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "32768",
"vartype": "integer"
},
{
"name": "port",
"value": """
+ f'"{cfg_map["port"]}"'
+ """,
"vartype": "integer"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "wal_sender_timeout",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "neon.tenant_id",
"value": """
+ f'"{cfg_map["neon.tenant_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.timeline_id",
"value": """
+ f'"{cfg_map["neon.timeline_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": """
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
+ """,
"vartype": "string"
}
]
},
"delta_operations": [
]
}
"""
)
ps_connstr = cfg_map["neon.pageserver_connstring"]
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
# run compute_ctl and wait for 10s
try:
ctl.raw_cli(
["--connstr", ps_connstr, "--pgdata", pgdata, "--spec", spec, "--pgbin", pg_bin_path],
timeout=10,
)
except TimeoutExpired as exc:
ctl_logs = exc.stderr.decode("utf-8")
log.info("compute_ctl output:\n" + ctl_logs)
start = "starting safekeepers syncing"
end = "safekeepers synced at LSN"
start_pos = ctl_logs.index(start)
assert start_pos != -1
end_pos = ctl_logs.index(end, start_pos)
assert end_pos != -1
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
# assert that --sync-safekeepers logs are present in the output
assert "connecting with node" in sync_safekeepers_logs
assert "connected with node" in sync_safekeepers_logs
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
assert "got votes from majority (2)" in sync_safekeepers_logs
assert "sending elected msg to node" in sync_safekeepers_logs

View File

@@ -15,7 +15,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
ps_cur = env.pageserver.connect().cursor()
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
@@ -38,37 +37,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
)
assert result == "future"
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
)
assert result == "past"
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
lsn = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
with env.pageserver.http_client() as client:
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result == "future"
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
assert result == "past"
pg_here.stop_and_destroy()
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
lsn = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
pg_here.stop_and_destroy()

View File

@@ -1,4 +1,5 @@
import os
import shutil
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -7,8 +8,13 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from prometheus_client.samples import Sample
@@ -201,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_with_empty_tenants",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_without_timelines_dir = env.initial_tenant
log.info(
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
tenant_with_empty_timelines_dir = client.tenant_create()
log.info(
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
)
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline reinitialization after pageserver restart
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert (
len(tenants) == 1
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
loaded_tenant = tenants[0]
assert loaded_tenant["id"] == str(
tenant_with_empty_timelines_dir
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Empty tenant should be loaded and ready for timeline creation"

View File

@@ -7,19 +7,25 @@
#
import asyncio
import os
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
Postgres,
RemoteStorageKind,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_upload,
wait_until,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
async def tenant_workload(env: NeonEnv, pg: Postgres):
@@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
# run final checkpoint manually to flush all the data to remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_tenants_attached_after_download(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="remote_storage_kind",
)
data_id = 1
data_secret = "very secret secret"
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
for checkpoint_number in range(1, 3):
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(path)
local_layer_deleted = True
break
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
env.pageserver.start()
client = env.pageserver.http_client()
wait_until(
number_of_iterations=5,
interval=1,
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
)
restored_timelines = client.timeline_list(tenant_id)
assert (
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
retored_timeline = restored_timelines[0]
assert retored_timeline["timeline_id"] == str(
timeline_id
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
def expect_tenant_to_download_timeline(
client: NeonPageserverHttpClient,
tenant_id: TenantId,
):
for tenant in client.tenant_list():
if tenant["id"] == str(tenant_id):
assert not tenant.get(
"has_in_progress_downloads", True
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"

View File

@@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }