mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-24 22:00:37 +00:00
Compare commits
20 Commits
d/hack
...
layer_map_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
121c19fcd6 | ||
|
|
687ba81366 | ||
|
|
47bae68a2e | ||
|
|
e8b195acb7 | ||
|
|
254cb7dc4f | ||
|
|
ed85d97f17 | ||
|
|
4a216c5f7f | ||
|
|
c5a428a61a | ||
|
|
ff8c481777 | ||
|
|
f25dd75be9 | ||
|
|
b99bed510d | ||
|
|
580584c8fc | ||
|
|
d823e84ed5 | ||
|
|
231dfbaed6 | ||
|
|
5cf53786f9 | ||
|
|
9b9bbad462 | ||
|
|
537b2c1ae6 | ||
|
|
31123d1fa8 | ||
|
|
4f2ac51bdd | ||
|
|
7b2f9dc908 |
21
.github/workflows/build_and_test.yml
vendored
21
.github/workflows/build_and_test.yml
vendored
@@ -127,8 +127,8 @@ jobs:
|
||||
target/
|
||||
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
|
||||
key: |
|
||||
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
||||
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-
|
||||
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
||||
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg_14
|
||||
@@ -389,7 +389,7 @@ jobs:
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git/
|
||||
target/
|
||||
key: v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
||||
key: v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
|
||||
|
||||
- name: Get Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
@@ -564,7 +564,7 @@ jobs:
|
||||
|
||||
promote-images:
|
||||
runs-on: dev
|
||||
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-tools-image ]
|
||||
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
|
||||
if: github.event_name != 'workflow_dispatch'
|
||||
container: amazon/aws-cli
|
||||
strategy:
|
||||
@@ -573,7 +573,7 @@ jobs:
|
||||
# compute-node uses postgres 14, which is default now
|
||||
# cloud repo depends on this image name, thus duplicating it
|
||||
# remove compute-node when cloud repo is updated
|
||||
name: [ neon, compute-node, compute-node-v14, compute-tools ]
|
||||
name: [ neon, compute-node, compute-node-v14, compute-node-v15, compute-tools ]
|
||||
|
||||
steps:
|
||||
- name: Promote image to latest
|
||||
@@ -608,6 +608,9 @@ jobs:
|
||||
- name: Pull compute node v14 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
|
||||
|
||||
- name: Pull compute node v15 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
|
||||
|
||||
- name: Pull rust image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
|
||||
|
||||
@@ -638,6 +641,9 @@ jobs:
|
||||
- name: Push compute node v14 image to Docker Hub
|
||||
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push compute node v15 image to Docker Hub
|
||||
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Push rust image to Docker Hub
|
||||
run: crane push rust neondatabase/rust:pinned
|
||||
|
||||
@@ -650,6 +656,7 @@ jobs:
|
||||
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
|
||||
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
calculate-deploy-targets:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
@@ -768,5 +775,5 @@ jobs:
|
||||
- name: Re-deploy proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
2
.github/workflows/codestyle.yml
vendored
2
.github/workflows/codestyle.yml
vendored
@@ -106,7 +106,7 @@ jobs:
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: v4-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
|
||||
key: v5-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
|
||||
|
||||
- name: Run cargo clippy
|
||||
run: ./run_clippy.sh
|
||||
|
||||
98
Cargo.lock
generated
98
Cargo.lock
generated
@@ -497,8 +497,10 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap 3.2.16",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"hyper",
|
||||
"log",
|
||||
"notify",
|
||||
"postgres",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -540,11 +542,11 @@ dependencies = [
|
||||
"git-version",
|
||||
"nix",
|
||||
"once_cell",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"postgres",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"safekeeper",
|
||||
"safekeeper_api",
|
||||
"serde",
|
||||
"serde_with",
|
||||
"tar",
|
||||
@@ -1072,6 +1074,15 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fsevent-sys"
|
||||
version = "4.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.21"
|
||||
@@ -1493,6 +1504,26 @@ dependencies = [
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify-sys"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.12"
|
||||
@@ -1552,6 +1583,26 @@ dependencies = [
|
||||
"simple_asn1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
|
||||
dependencies = [
|
||||
"kqueue-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue-sys"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kstring"
|
||||
version = "1.0.6"
|
||||
@@ -1797,6 +1848,24 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "5.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"crossbeam-channel",
|
||||
"filetime",
|
||||
"fsevent-sys",
|
||||
"inotify",
|
||||
"kqueue",
|
||||
"libc",
|
||||
"mio",
|
||||
"walkdir",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
@@ -1975,6 +2044,7 @@ dependencies = [
|
||||
"nix",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -2003,6 +2073,17 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"const_format",
|
||||
"serde",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.2"
|
||||
@@ -2891,6 +2972,7 @@ dependencies = [
|
||||
"postgres_ffi",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"safekeeper_api",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
@@ -2906,6 +2988,17 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "safekeeper_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"const_format",
|
||||
"serde",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "same-file"
|
||||
version = "1.0.6"
|
||||
@@ -4142,6 +4235,7 @@ dependencies = [
|
||||
"bstr",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crossbeam-utils",
|
||||
"either",
|
||||
"fail",
|
||||
"hashbrown",
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
|
||||
ARG TAG=pinned
|
||||
# apparently, ARGs don't get replaced in RUN commands in kaniko
|
||||
# ARG POSTGIS_VERSION=3.3.0
|
||||
# ARG POSTGIS_VERSION=3.3.1
|
||||
# ARG PLV8_VERSION=3.1.4
|
||||
# ARG PG_VERSION=v15
|
||||
|
||||
@@ -13,9 +13,12 @@ ARG TAG=pinned
|
||||
# Layer "build-deps"
|
||||
#
|
||||
FROM debian:bullseye-slim AS build-deps
|
||||
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
|
||||
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
|
||||
apt update
|
||||
RUN apt update && \
|
||||
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
|
||||
libcurl4-openssl-dev libossp-uuid-dev
|
||||
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
|
||||
|
||||
#
|
||||
# Layer "pg-build"
|
||||
@@ -42,11 +45,11 @@ RUN cd postgres && \
|
||||
FROM build-deps AS postgis-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
|
||||
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
|
||||
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
|
||||
tar xvzf postgis-3.3.0.tar.gz && \
|
||||
cd postgis-3.3.0 && \
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
|
||||
tar xvzf postgis-3.3.1.tar.gz && \
|
||||
cd postgis-3.3.1 && \
|
||||
./autogen.sh && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
./configure && \
|
||||
@@ -64,15 +67,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
|
||||
# Build plv8
|
||||
#
|
||||
FROM build-deps AS plv8-build
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
|
||||
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
|
||||
|
||||
# https://github.com/plv8/plv8/issues/475
|
||||
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
|
||||
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
|
||||
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
|
||||
apt update && \
|
||||
RUN apt update && \
|
||||
apt install -y --no-install-recommends -t testing binutils
|
||||
|
||||
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
|
||||
@@ -84,12 +85,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
|
||||
rm -rf /plv8-* && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
|
||||
|
||||
#
|
||||
# Layer "h3-pg-build"
|
||||
# Build h3_pg
|
||||
#
|
||||
FROM build-deps AS h3-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# packaged cmake is too old
|
||||
RUN apt update && \
|
||||
apt install -y --no-install-recommends -t testing cmake
|
||||
|
||||
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
|
||||
tar xvzf h3.tgz && \
|
||||
cd h3-4.0.1 && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
cmake .. -DCMAKE_BUILD_TYPE=Release && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/h3 make install && \
|
||||
cp -R /h3/usr / && \
|
||||
rm -rf build
|
||||
|
||||
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
|
||||
tar xvzf h3-pg.tgz && \
|
||||
cd h3-pg-4.0.1 && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
|
||||
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
# compile neon extensions
|
||||
#
|
||||
FROM build-deps AS neon-pg-ext-build
|
||||
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
# plv8 still sometimes crashes during the creation
|
||||
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=h3-pg-build /h3/usr /
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -137,8 +172,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
chmod 0750 /var/db/postgres/compute && \
|
||||
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
|
||||
|
||||
# TODO: Check if we can make the extension setup more modular versus a linear build
|
||||
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
|
||||
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
|
||||
|
||||
@@ -8,8 +8,10 @@ anyhow = "1.0"
|
||||
chrono = "0.4"
|
||||
clap = "3.0"
|
||||
env_logger = "0.9"
|
||||
futures = "0.3.13"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
log = { version = "0.4", features = ["std", "serde"] }
|
||||
notify = "5.0.0"
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
regex = "1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
||||
@@ -178,7 +178,6 @@ impl ComputeNode {
|
||||
.args(&["--sync-safekeepers"])
|
||||
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.expect("postgres --sync-safekeepers failed to start");
|
||||
|
||||
@@ -191,10 +190,10 @@ impl ComputeNode {
|
||||
|
||||
if !sync_output.status.success() {
|
||||
anyhow::bail!(
|
||||
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}, stderr: {}",
|
||||
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
|
||||
sync_output.status,
|
||||
String::from_utf8(sync_output.stdout).expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
|
||||
String::from_utf8(sync_output.stderr).expect("postgres --sync-safekeepers exited, and stderr is not utf-8"),
|
||||
String::from_utf8(sync_output.stdout)
|
||||
.expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -258,14 +257,7 @@ impl ComputeNode {
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
|
||||
// Try default Postgres port if it is not provided
|
||||
let port = self
|
||||
.spec
|
||||
.cluster
|
||||
.settings
|
||||
.find("port")
|
||||
.unwrap_or_else(|| "5432".to_string());
|
||||
wait_for_postgres(&mut pg, &port, pgdata_path)?;
|
||||
wait_for_postgres(&mut pg, pgdata_path)?;
|
||||
|
||||
// If connection fails,
|
||||
// it may be the old node with `zenith_admin` superuser.
|
||||
|
||||
@@ -1,18 +1,19 @@
|
||||
use std::fmt::Write;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
use std::str::FromStr;
|
||||
use std::{fs, thread, time};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use postgres::{Client, Transaction};
|
||||
use serde::Deserialize;
|
||||
|
||||
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
|
||||
use notify::{RecursiveMode, Watcher};
|
||||
|
||||
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||
|
||||
/// Rust representation of Postgres role info with only those fields
|
||||
/// that matter for us.
|
||||
@@ -230,52 +231,112 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
|
||||
Ok(postgres_dbs)
|
||||
}
|
||||
|
||||
/// Wait for Postgres to become ready to accept connections:
|
||||
/// - state should be `ready` in the `pgdata/postmaster.pid`
|
||||
/// - and we should be able to connect to 127.0.0.1:5432
|
||||
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
|
||||
/// Wait for Postgres to become ready to accept connections. It's ready to
|
||||
/// accept connections when the state-field in `pgdata/postmaster.pid` says
|
||||
/// 'ready'.
|
||||
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
let pid_path = pgdata.join("postmaster.pid");
|
||||
let mut slept: u64 = 0; // ms
|
||||
let pause = time::Duration::from_millis(100);
|
||||
|
||||
let timeout = time::Duration::from_millis(10);
|
||||
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
|
||||
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
|
||||
// completed initialization and is ready to accept connections. We want to
|
||||
// react quickly and perform the rest of our initialization as soon as
|
||||
// PostgreSQL starts accepting connections. Use 'notify' to be notified
|
||||
// whenever the PID file is changed, and whenever it changes, read it to
|
||||
// check if it's now "ready".
|
||||
//
|
||||
// You cannot actually watch a file before it exists, so we first watch the
|
||||
// data directory, and once the postmaster.pid file appears, we switch to
|
||||
// watch the file instead. We also wake up every 100 ms to poll, just in
|
||||
// case we miss some events for some reason. Not strictly necessary, but
|
||||
// better safe than sorry.
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
|
||||
let _ = tx.send(res);
|
||||
}) {
|
||||
Ok(watcher) => (Box::new(watcher), rx),
|
||||
Err(e) => {
|
||||
match e.kind {
|
||||
notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
|
||||
// docker on m1 macs does not support recommended_watcher
|
||||
// but return "Function not implemented (os error 38)"
|
||||
// see https://github.com/notify-rs/notify/issues/423
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
|
||||
loop {
|
||||
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
|
||||
// but postgres starts listening almost immediately, even if it is not really
|
||||
// ready to accept connections).
|
||||
if slept >= POSTGRES_WAIT_TIMEOUT {
|
||||
bail!("timed out while waiting for Postgres to start");
|
||||
// let's poll it faster than what we check the results for (100ms)
|
||||
let config =
|
||||
notify::Config::default().with_poll_interval(Duration::from_millis(50));
|
||||
|
||||
let watcher = notify::PollWatcher::new(
|
||||
move |res| {
|
||||
let _ = tx.send(res);
|
||||
},
|
||||
config,
|
||||
)?;
|
||||
|
||||
(Box::new(watcher), rx)
|
||||
}
|
||||
_ => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
|
||||
|
||||
let started_at = Instant::now();
|
||||
let mut postmaster_pid_seen = false;
|
||||
loop {
|
||||
if let Ok(Some(status)) = pg.try_wait() {
|
||||
// Postgres exited, that is not what we expected, bail out earlier.
|
||||
let code = status.code().unwrap_or(-1);
|
||||
bail!("Postgres exited unexpectedly with code {}", code);
|
||||
}
|
||||
|
||||
let res = rx.recv_timeout(Duration::from_millis(100));
|
||||
log::debug!("woken up by notify: {res:?}");
|
||||
// If there are multiple events in the channel already, we only need to be
|
||||
// check once. Swallow the extra events before we go ahead to check the
|
||||
// pid file.
|
||||
while let Ok(res) = rx.try_recv() {
|
||||
log::debug!("swallowing extra event: {res:?}");
|
||||
}
|
||||
|
||||
// Check that we can open pid file first.
|
||||
if let Ok(file) = File::open(&pid_path) {
|
||||
if !postmaster_pid_seen {
|
||||
log::debug!("postmaster.pid appeared");
|
||||
watcher
|
||||
.unwatch(pgdata)
|
||||
.expect("Failed to remove pgdata dir watch");
|
||||
watcher
|
||||
.watch(&pid_path, RecursiveMode::NonRecursive)
|
||||
.expect("Failed to add postmaster.pid file watch");
|
||||
postmaster_pid_seen = true;
|
||||
}
|
||||
|
||||
let file = BufReader::new(file);
|
||||
let last_line = file.lines().last();
|
||||
|
||||
// Pid file could be there and we could read it, but it could be empty, for example.
|
||||
if let Some(Ok(line)) = last_line {
|
||||
let status = line.trim();
|
||||
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
|
||||
log::debug!("last line of postmaster.pid: {status:?}");
|
||||
|
||||
// Now Postgres is ready to accept connections
|
||||
if status == "ready" && can_connect {
|
||||
if status == "ready" {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
thread::sleep(pause);
|
||||
slept += 100;
|
||||
// Give up after POSTGRES_WAIT_TIMEOUT.
|
||||
let duration = started_at.elapsed();
|
||||
if duration >= POSTGRES_WAIT_TIMEOUT {
|
||||
bail!("timed out while waiting for Postgres to start");
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("PostgreSQL is now running, continuing to configure it");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,9 @@ thiserror = "1"
|
||||
nix = "0.23"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
||||
|
||||
pageserver = { path = "../pageserver" }
|
||||
safekeeper = { path = "../safekeeper" }
|
||||
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
|
||||
# instead, so that recompile times are better.
|
||||
pageserver_api = { path = "../libs/pageserver_api" }
|
||||
safekeeper_api = { path = "../libs/safekeeper_api" }
|
||||
utils = { path = "../libs/utils" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage::PageServerNode;
|
||||
use control_plane::{etcd, local_env};
|
||||
use pageserver::config::defaults::{
|
||||
use pageserver_api::models::TimelineInfo;
|
||||
use pageserver_api::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
|
||||
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
|
||||
};
|
||||
use pageserver::http::models::TimelineInfo;
|
||||
use safekeeper::defaults::{
|
||||
use safekeeper_api::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
};
|
||||
|
||||
@@ -12,7 +12,7 @@ use nix::unistd::Pid;
|
||||
use postgres::Config;
|
||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use safekeeper::http::models::TimelineCreateRequest;
|
||||
use safekeeper_api::models::TimelineCreateRequest;
|
||||
use thiserror::Error;
|
||||
use utils::{
|
||||
connstring::connection_address,
|
||||
|
||||
@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
|
||||
use nix::errno::Errno;
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
use pageserver::http::models::{
|
||||
use pageserver_api::models::{
|
||||
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use postgres::{Config, NoTls};
|
||||
|
||||
@@ -96,7 +96,7 @@ A single virtual environment with all dependencies is described in the single `P
|
||||
sudo apt install python3.9
|
||||
```
|
||||
- Install `poetry`
|
||||
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation)`.
|
||||
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation).
|
||||
- Install dependencies via `./scripts/pysync`.
|
||||
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
|
||||
so if you have different version some linting tools can yield different result locally vs in the CI.
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//! Otherwise, we might not see all metrics registered via
|
||||
//! a default registry.
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
|
||||
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
|
||||
pub use prometheus::opts;
|
||||
pub use prometheus::register;
|
||||
pub use prometheus::{core, default_registry, proto};
|
||||
@@ -17,6 +17,7 @@ pub use prometheus::{register_int_counter_vec, IntCounterVec};
|
||||
pub use prometheus::{register_int_gauge, IntGauge};
|
||||
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
|
||||
pub use prometheus::{Encoder, TextEncoder};
|
||||
use prometheus::{Registry, Result};
|
||||
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
@@ -32,13 +33,27 @@ macro_rules! register_uint_gauge_vec {
|
||||
}};
|
||||
}
|
||||
|
||||
/// Special internal registry, to collect metrics independently from the default registry.
|
||||
/// Was introduced to fix deadlock with lazy registration of metrics in the default registry.
|
||||
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
|
||||
|
||||
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
|
||||
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
|
||||
/// while holding the lock.
|
||||
pub fn register_internal(c: Box<dyn Collector>) -> Result<()> {
|
||||
INTERNAL_REGISTRY.register(c)
|
||||
}
|
||||
|
||||
/// Gathers all Prometheus metrics and records the I/O stats just before that.
|
||||
///
|
||||
/// Metrics gathering is a relatively simple and standalone operation, so
|
||||
/// it might be fine to do it this way to keep things simple.
|
||||
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
|
||||
update_rusage_metrics();
|
||||
prometheus::gather()
|
||||
let mut mfs = prometheus::gather();
|
||||
let mut internal_mfs = INTERNAL_REGISTRY.gather();
|
||||
mfs.append(&mut internal_mfs);
|
||||
mfs
|
||||
}
|
||||
|
||||
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
|
||||
12
libs/pageserver_api/Cargo.toml
Normal file
12
libs/pageserver_api/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "pageserver_api"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_with = "1.12.0"
|
||||
const_format = "0.2.21"
|
||||
|
||||
utils = { path = "../utils" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
9
libs/pageserver_api/src/lib.rs
Normal file
9
libs/pageserver_api/src/lib.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use const_format::formatcp;
|
||||
|
||||
/// Public API types
|
||||
pub mod models;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
@@ -7,7 +7,17 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::tenant::TenantState;
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TenantState {
|
||||
/// Tenant is fully operational, its background jobs might be running or not.
|
||||
Active { background_jobs_running: bool },
|
||||
/// A tenant is recognized by pageserver, but not yet ready to operate:
|
||||
/// e.g. not present locally and being downloaded or being read into memory from the file system.
|
||||
Paused,
|
||||
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
|
||||
Broken,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -3,9 +3,11 @@
|
||||
#![allow(non_snake_case)]
|
||||
// bindgen creates some unsafe code with no doc comments.
|
||||
#![allow(clippy::missing_safety_doc)]
|
||||
// suppress warnings on rust 1.53 due to bindgen unit tests.
|
||||
// https://github.com/rust-lang/rust-bindgen/issues/1651
|
||||
#![allow(deref_nullptr)]
|
||||
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
|
||||
#![allow(clippy::useless_transmute)]
|
||||
// modules included with the postgres_ffi macro depend on the types of the specific version's
|
||||
// types, and trigger a too eager lint.
|
||||
#![allow(clippy::duplicate_mod)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use utils::bin_ser::SerializeError;
|
||||
|
||||
@@ -57,12 +57,10 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
|
||||
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
|
||||
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegNoOffsetToRecPtr(
|
||||
segno: XLogSegNo,
|
||||
offset: u32,
|
||||
@@ -71,7 +69,6 @@ pub fn XLogSegNoOffsetToRecPtr(
|
||||
segno * (wal_segsz_bytes as u64) + (offset as u64)
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
|
||||
format!(
|
||||
"{:>08X}{:>08X}{:>08X}",
|
||||
@@ -81,7 +78,6 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
|
||||
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
||||
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
||||
@@ -89,12 +85,10 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
|
||||
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn IsXLogFileName(fname: &str) -> bool {
|
||||
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn IsPartialXLogFileName(fname: &str) -> bool {
|
||||
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
|
||||
}
|
||||
|
||||
12
libs/safekeeper_api/Cargo.toml
Normal file
12
libs/safekeeper_api/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "safekeeper_api"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_with = "1.12.0"
|
||||
const_format = "0.2.21"
|
||||
|
||||
utils = { path = "../utils" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
10
libs/safekeeper_api/src/lib.rs
Normal file
10
libs/safekeeper_api/src/lib.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
use const_format::formatcp;
|
||||
|
||||
/// Public API types
|
||||
pub mod models;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
@@ -9,6 +9,7 @@ use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::RequestInfo;
|
||||
use routerify::{Middleware, Router, RouterBuilder, RouterService};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::info;
|
||||
|
||||
use std::future::Future;
|
||||
@@ -35,7 +36,13 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
|
||||
let metrics = metrics::gather();
|
||||
let metrics = tokio::task::spawn_blocking(move || {
|
||||
// Currently we take a lot of mutexes while collecting metrics, so it's
|
||||
// better to spawn a blocking task to avoid blocking the event loop.
|
||||
metrics::gather()
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
let response = Response::builder()
|
||||
|
||||
@@ -58,6 +58,7 @@ rstar = "0.9.3"
|
||||
num-traits = "0.2.15"
|
||||
amplify_num = "0.4.1"
|
||||
|
||||
pageserver_api = { path = "../libs/pageserver_api" }
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
etcd_broker = { path = "../libs/etcd_broker" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
|
||||
@@ -30,10 +30,10 @@ pub mod defaults {
|
||||
use crate::tenant_config::defaults::*;
|
||||
use const_format::formatcp;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
pub use pageserver_api::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod models;
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use pageserver_api::models;
|
||||
|
||||
@@ -207,6 +207,62 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: Get LSN by a timestamp
|
||||
parameters:
|
||||
- name: timestamp
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
description: A timestamp to get the LSN
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
"400":
|
||||
description: Error when no tenant id found in path, no timeline id or invalid timestamp
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
/v1/tenant/{tenant_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
|
||||
@@ -12,6 +12,7 @@ use super::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest,
|
||||
};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::storage_sync;
|
||||
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
|
||||
use crate::tenant::{TenantState, Timeline};
|
||||
@@ -265,6 +266,23 @@ fn query_param_present(request: &Request<Body>, param: &str) -> bool {
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
|
||||
request.uri().query().map_or(
|
||||
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|
||||
|v| {
|
||||
url::form_urlencoded::parse(v.as_bytes())
|
||||
.into_owned()
|
||||
.find(|(k, _)| k == param_name)
|
||||
.map_or(
|
||||
Err(ApiError::BadRequest(anyhow!(
|
||||
"no {param_name} specified in query parameters"
|
||||
))),
|
||||
|(_, v)| Ok(v),
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
@@ -329,6 +347,33 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let timestamp_raw = get_query_param(&request, "timestamp")?;
|
||||
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
|
||||
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
|
||||
|
||||
let timeline = tenant_mgr::get_tenant(tenant_id, true)
|
||||
.and_then(|tenant| tenant.get_timeline(timeline_id))
|
||||
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let result = match timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
{
|
||||
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
|
||||
LsnForTimestamp::Future(_lsn) => "future".into(),
|
||||
LsnForTimestamp::Past(_lsn) => "past".into(),
|
||||
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
|
||||
};
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
|
||||
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
|
||||
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
@@ -337,9 +382,16 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
info!("Handling tenant attach {tenant_id}");
|
||||
|
||||
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
|
||||
Ok(_) => Err(ApiError::Conflict(
|
||||
"Tenant is already present locally".to_owned(),
|
||||
)),
|
||||
Ok(tenant) => {
|
||||
if tenant.list_timelines().is_empty() {
|
||||
info!("Attaching to tenant {tenant_id} with zero timelines");
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::Conflict(
|
||||
"Tenant is already present locally".to_owned(),
|
||||
))
|
||||
}
|
||||
}
|
||||
Err(_) => Ok(()),
|
||||
})
|
||||
.await
|
||||
@@ -901,6 +953,10 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_detail_handler,
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
|
||||
get_lsn_by_timestamp_handler,
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
|
||||
testing_api!("run timeline GC", timeline_gc_handler),
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use futures::{Stream, StreamExt};
|
||||
use regex::Regex;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
@@ -35,7 +34,6 @@ use crate::basebackup;
|
||||
use crate::config::{PageServerConf, ProfilingConfig};
|
||||
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
|
||||
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::profiling::profpoint_start;
|
||||
use crate::reltag::RelTag;
|
||||
use crate::task_mgr;
|
||||
@@ -45,7 +43,6 @@ use crate::tenant_mgr;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -1062,33 +1059,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
|
||||
]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("get_lsn_by_timestamp ") {
|
||||
// Locate LSN of last transaction with timestamp less or equal than sppecified
|
||||
// TODO lazy static
|
||||
let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$")
|
||||
.unwrap();
|
||||
let caps = re
|
||||
.captures(query_string)
|
||||
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
|
||||
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
|
||||
let timestamp_pg = to_pg_timestamp(timestamp);
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
b"lsn",
|
||||
)]))?;
|
||||
let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? {
|
||||
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
|
||||
LsnForTimestamp::Future(_lsn) => "future".into(),
|
||||
LsnForTimestamp::Past(_lsn) => "past".into(),
|
||||
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
|
||||
};
|
||||
pgb.write_message(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else {
|
||||
bail!("unknown command");
|
||||
}
|
||||
|
||||
@@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task(
|
||||
(storage, remote_index_clone, sync_queue),
|
||||
max_sync_errors,
|
||||
)
|
||||
.instrument(info_span!("storage_sync_loop"))
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
|
||||
@@ -45,6 +45,7 @@ use crate::tenant_config::TenantConfOpt;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
|
||||
pub use pageserver_api::models::TenantState;
|
||||
|
||||
use toml_edit;
|
||||
use utils::{
|
||||
@@ -118,18 +119,6 @@ pub struct Tenant {
|
||||
upload_layers: bool,
|
||||
}
|
||||
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TenantState {
|
||||
/// Tenant is fully operational, its background jobs might be running or not.
|
||||
Active { background_jobs_running: bool },
|
||||
/// A tenant is recognized by pageserver, but not yet ready to operate:
|
||||
/// e.g. not present locally and being downloaded or being read into memory from the file system.
|
||||
Paused,
|
||||
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
|
||||
Broken,
|
||||
}
|
||||
|
||||
/// A repository corresponds to one .neon directory. One repository holds multiple
|
||||
/// timelines, forked off from the same initial call to 'initdb'.
|
||||
impl Tenant {
|
||||
@@ -400,16 +389,19 @@ impl Tenant {
|
||||
timeline_id,
|
||||
metadata.pg_version()
|
||||
);
|
||||
let timeline = self
|
||||
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
|
||||
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
|
||||
|
||||
match timelines_accessor.entry(timeline.timeline_id) {
|
||||
Entry::Occupied(_) => bail!(
|
||||
"Found freshly initialized timeline {} in the tenant map",
|
||||
timeline.timeline_id
|
||||
let ancestor = metadata
|
||||
.ancestor_timeline()
|
||||
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
|
||||
.cloned();
|
||||
match timelines_accessor.entry(timeline_id) {
|
||||
Entry::Occupied(_) => warn!(
|
||||
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
|
||||
self.tenant_id, timeline_id
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
let timeline = self
|
||||
.initialize_new_timeline(timeline_id, metadata, ancestor)
|
||||
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
|
||||
v.insert(timeline);
|
||||
}
|
||||
}
|
||||
@@ -609,21 +601,14 @@ impl Tenant {
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: TimelineMetadata,
|
||||
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let ancestor = match new_metadata.ancestor_timeline() {
|
||||
Some(ancestor_timeline_id) => Some(
|
||||
timelines
|
||||
.get(&ancestor_timeline_id)
|
||||
.cloned()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
|
||||
)
|
||||
})?,
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some(),
|
||||
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
|
||||
)
|
||||
}
|
||||
|
||||
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
|
||||
let pg_version = new_metadata.pg_version();
|
||||
@@ -1080,8 +1065,12 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
let ancestor = new_metadata
|
||||
.ancestor_timeline()
|
||||
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
|
||||
.cloned();
|
||||
let new_timeline = self
|
||||
.initialize_new_timeline(new_timeline_id, new_metadata, timelines)
|
||||
.initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to initialize timeline {}/{}",
|
||||
|
||||
@@ -15,19 +15,25 @@ use crate::repository::Key;
|
||||
use crate::tenant::inmemory_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant::storage_layer::{range_eq, range_overlaps};
|
||||
use amplify_num::i256;
|
||||
use anyhow::Result;
|
||||
use num_traits::identities::{One, Zero};
|
||||
use num_traits::{Bounded, Num, Signed};
|
||||
use rstar::{RTree, RTreeObject, AABB};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::ops::Range;
|
||||
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
|
||||
struct BTreeKey {
|
||||
lsn: Lsn,
|
||||
seq: usize,
|
||||
}
|
||||
|
||||
impl BTreeKey {
|
||||
fn new(lsn: Lsn) -> BTreeKey {
|
||||
BTreeKey { lsn, seq: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
///
|
||||
@@ -53,165 +59,14 @@ pub struct LayerMap {
|
||||
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
|
||||
|
||||
/// All the historic layers are kept here
|
||||
historic_layers: RTree<LayerRTreeObject>,
|
||||
historic_layers: BTreeMap<BTreeKey, Arc<dyn Layer>>,
|
||||
layers_seqno: usize,
|
||||
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<dyn Layer>>,
|
||||
}
|
||||
|
||||
struct LayerRTreeObject {
|
||||
layer: Arc<dyn Layer>,
|
||||
}
|
||||
|
||||
// Representation of Key as numeric type.
|
||||
// We can not use native implementation of i128, because rstar::RTree
|
||||
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
|
||||
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
|
||||
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
|
||||
// By using i256 as the type, even though all the actual values would fit in i128, we can be
|
||||
// sure that multiplication doesn't overflow.
|
||||
//
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
|
||||
struct IntKey(i256);
|
||||
|
||||
impl Copy for IntKey {}
|
||||
|
||||
impl IntKey {
|
||||
fn from(i: i128) -> Self {
|
||||
IntKey(i256::from(i))
|
||||
}
|
||||
}
|
||||
|
||||
impl Bounded for IntKey {
|
||||
fn min_value() -> Self {
|
||||
IntKey(i256::MIN)
|
||||
}
|
||||
fn max_value() -> Self {
|
||||
IntKey(i256::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
impl Signed for IntKey {
|
||||
fn is_positive(&self) -> bool {
|
||||
self.0 > i256::ZERO
|
||||
}
|
||||
fn is_negative(&self) -> bool {
|
||||
self.0 < i256::ZERO
|
||||
}
|
||||
fn signum(&self) -> Self {
|
||||
match self.0.cmp(&i256::ZERO) {
|
||||
Ordering::Greater => IntKey(i256::ONE),
|
||||
Ordering::Less => IntKey(-i256::ONE),
|
||||
Ordering::Equal => IntKey(i256::ZERO),
|
||||
}
|
||||
}
|
||||
fn abs(&self) -> Self {
|
||||
IntKey(self.0.abs())
|
||||
}
|
||||
fn abs_sub(&self, other: &Self) -> Self {
|
||||
if self.0 <= other.0 {
|
||||
IntKey(i256::ZERO)
|
||||
} else {
|
||||
IntKey(self.0 - other.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Neg for IntKey {
|
||||
type Output = Self;
|
||||
fn neg(self) -> Self::Output {
|
||||
IntKey(-self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Rem for IntKey {
|
||||
type Output = Self;
|
||||
fn rem(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 % rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Div for IntKey {
|
||||
type Output = Self;
|
||||
fn div(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 / rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Add for IntKey {
|
||||
type Output = Self;
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 + rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sub for IntKey {
|
||||
type Output = Self;
|
||||
fn sub(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 - rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Mul for IntKey {
|
||||
type Output = Self;
|
||||
fn mul(self, rhs: Self) -> Self::Output {
|
||||
IntKey(self.0 * rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl One for IntKey {
|
||||
fn one() -> Self {
|
||||
IntKey(i256::ONE)
|
||||
}
|
||||
}
|
||||
|
||||
impl Zero for IntKey {
|
||||
fn zero() -> Self {
|
||||
IntKey(i256::ZERO)
|
||||
}
|
||||
fn is_zero(&self) -> bool {
|
||||
self.0 == i256::ZERO
|
||||
}
|
||||
}
|
||||
|
||||
impl Num for IntKey {
|
||||
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
|
||||
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
|
||||
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for LayerRTreeObject {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// FIXME: ptr_eq might fail to return true for 'dyn'
|
||||
// references. Clippy complains about this. In practice it
|
||||
// seems to work, the assertion below would be triggered
|
||||
// otherwise but this ought to be fixed.
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
Arc::ptr_eq(&self.layer, &other.layer)
|
||||
}
|
||||
}
|
||||
|
||||
impl RTreeObject for LayerRTreeObject {
|
||||
type Envelope = AABB<[IntKey; 2]>;
|
||||
fn envelope(&self) -> Self::Envelope {
|
||||
let key_range = self.layer.get_key_range();
|
||||
let lsn_range = self.layer.get_lsn_range();
|
||||
AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<dyn Layer>,
|
||||
@@ -234,23 +89,17 @@ impl LayerMap {
|
||||
// linear search
|
||||
// Find the latest image layer that covers the given key
|
||||
let mut latest_img: Option<Arc<dyn Layer>> = None;
|
||||
let mut latest_img_lsn: Option<Lsn> = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
|
||||
[
|
||||
IntKey::from(key.to_i128()),
|
||||
IntKey::from(end_lsn.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
let mut latest_img_lsn = Lsn(0);
|
||||
let mut iter = self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
|
||||
while let Some((_key, l)) = iter.next_back() {
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
if !l.get_key_range().contains(&key) {
|
||||
continue;
|
||||
}
|
||||
let img_lsn = l.get_lsn_range().start;
|
||||
assert!(img_lsn < end_lsn);
|
||||
if Lsn(img_lsn.0 + 1) == end_lsn {
|
||||
@@ -260,23 +109,23 @@ impl LayerMap {
|
||||
lsn_floor: img_lsn,
|
||||
}));
|
||||
}
|
||||
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) {
|
||||
latest_img = Some(Arc::clone(l));
|
||||
latest_img_lsn = Some(img_lsn);
|
||||
}
|
||||
latest_img = Some(Arc::clone(l));
|
||||
latest_img_lsn = img_lsn;
|
||||
break;
|
||||
}
|
||||
|
||||
// Search the delta layers
|
||||
let mut latest_delta: Option<Arc<dyn Layer>> = None;
|
||||
for e in self
|
||||
let mut iter = self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
|
||||
while let Some((_key, l)) = iter.next_back() {
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
if !l.get_key_range().contains(&key) {
|
||||
continue;
|
||||
}
|
||||
if l.get_lsn_range().start >= end_lsn {
|
||||
info!(
|
||||
"Candidate delta layer {}..{} is too new for lsn {}",
|
||||
@@ -286,6 +135,9 @@ impl LayerMap {
|
||||
);
|
||||
}
|
||||
assert!(l.get_lsn_range().start < end_lsn);
|
||||
if l.get_lsn_range().end <= latest_img_lsn {
|
||||
continue;
|
||||
}
|
||||
if l.get_lsn_range().end >= end_lsn {
|
||||
// this layer contains the requested point in the key/lsn space.
|
||||
// No need to search any further
|
||||
@@ -311,10 +163,7 @@ impl LayerMap {
|
||||
"found (old) layer {} for request on {key} at {end_lsn}",
|
||||
l.filename().display(),
|
||||
);
|
||||
let lsn_floor = std::cmp::max(
|
||||
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
|
||||
l.get_lsn_range().start,
|
||||
);
|
||||
let lsn_floor = std::cmp::max(Lsn(latest_img_lsn.0 + 1), l.get_lsn_range().start);
|
||||
Ok(Some(SearchResult {
|
||||
lsn_floor,
|
||||
layer: l,
|
||||
@@ -322,7 +171,7 @@ impl LayerMap {
|
||||
} else if let Some(l) = latest_img {
|
||||
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
|
||||
Ok(Some(SearchResult {
|
||||
lsn_floor: latest_img_lsn.unwrap(),
|
||||
lsn_floor: latest_img_lsn,
|
||||
layer: l,
|
||||
}))
|
||||
} else {
|
||||
@@ -338,7 +187,14 @@ impl LayerMap {
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
self.l0_delta_layers.push(layer.clone());
|
||||
}
|
||||
self.historic_layers.insert(LayerRTreeObject { layer });
|
||||
self.historic_layers.insert(
|
||||
BTreeKey {
|
||||
lsn: layer.get_lsn_range().start,
|
||||
seq: self.layers_seqno,
|
||||
},
|
||||
layer,
|
||||
);
|
||||
self.layers_seqno += 1;
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
}
|
||||
|
||||
@@ -360,10 +216,26 @@ impl LayerMap {
|
||||
.retain(|other| !Arc::ptr_eq(other, &layer));
|
||||
assert_eq!(self.l0_delta_layers.len(), len_before - 1);
|
||||
}
|
||||
assert!(self
|
||||
.historic_layers
|
||||
.remove(&LayerRTreeObject { layer })
|
||||
.is_some());
|
||||
let len_before = self.historic_layers.len();
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
self.historic_layers
|
||||
.retain(|_key, other| !Arc::ptr_eq(other, &layer));
|
||||
if self.historic_layers.len() != len_before - 1 {
|
||||
assert!(self.historic_layers.len() == len_before);
|
||||
error!(
|
||||
"Failed to remove {} layer: {}..{}__{}..{}",
|
||||
if layer.is_incremental() {
|
||||
"inremental"
|
||||
} else {
|
||||
"image"
|
||||
},
|
||||
layer.get_key_range().start,
|
||||
layer.get_key_range().end,
|
||||
layer.get_lsn_range().start,
|
||||
layer.get_lsn_range().end
|
||||
);
|
||||
}
|
||||
assert!(self.historic_layers.len() == len_before - 1);
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
@@ -380,21 +252,10 @@ impl LayerMap {
|
||||
|
||||
loop {
|
||||
let mut made_progress = false;
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(range_remain.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(range_remain.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
for (_key, l) in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
|
||||
{
|
||||
let l = &e.layer;
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
@@ -417,39 +278,30 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
|
||||
self.historic_layers.iter().map(|e| e.layer.clone())
|
||||
self.historic_layers
|
||||
.iter()
|
||||
.map(|(_key, layer)| layer.clone())
|
||||
}
|
||||
|
||||
/// Find the last image layer that covers 'key', ignoring any image layers
|
||||
/// newer than 'lsn'.
|
||||
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
|
||||
let mut candidate_lsn = Lsn(0);
|
||||
let mut candidate = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0)],
|
||||
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
|
||||
);
|
||||
for e in self
|
||||
let mut iter = self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
{
|
||||
let l = &e.layer;
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1));
|
||||
while let Some((_key, l)) = iter.next_back() {
|
||||
if l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert!(l.get_key_range().contains(&key));
|
||||
let this_lsn = l.get_lsn_range().start;
|
||||
assert!(this_lsn <= lsn);
|
||||
if this_lsn < candidate_lsn {
|
||||
// our previous candidate was better
|
||||
if !l.get_key_range().contains(&key) {
|
||||
continue;
|
||||
}
|
||||
candidate_lsn = this_lsn;
|
||||
candidate = Some(Arc::clone(l));
|
||||
let this_lsn = l.get_lsn_range().start;
|
||||
assert!(this_lsn <= lsn);
|
||||
return Some(Arc::clone(l));
|
||||
}
|
||||
|
||||
candidate
|
||||
None
|
||||
}
|
||||
|
||||
///
|
||||
@@ -466,18 +318,10 @@ impl LayerMap {
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
|
||||
let mut points = vec![key_range.start];
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128()),
|
||||
IntKey::from(lsn.0 as i128),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
for (_lsn, l) in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
.range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1))
|
||||
{
|
||||
let l = &e.layer;
|
||||
assert!(l.get_lsn_range().start <= lsn);
|
||||
let range = l.get_key_range();
|
||||
if key_range.contains(&range.start) {
|
||||
@@ -514,26 +358,17 @@ impl LayerMap {
|
||||
if lsn_range.start >= lsn_range.end {
|
||||
return Ok(0);
|
||||
}
|
||||
let envelope = AABB::from_corners(
|
||||
[
|
||||
IntKey::from(key_range.start.to_i128()),
|
||||
IntKey::from(lsn_range.start.0 as i128),
|
||||
],
|
||||
[
|
||||
IntKey::from(key_range.end.to_i128() - 1),
|
||||
IntKey::from(lsn_range.end.0 as i128 - 1),
|
||||
],
|
||||
);
|
||||
for e in self
|
||||
for (_lsn, l) in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
.range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
|
||||
{
|
||||
let l = &e.layer;
|
||||
if !l.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
if !range_overlaps(&l.get_key_range(), key_range) {
|
||||
continue;
|
||||
}
|
||||
assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
|
||||
assert!(range_overlaps(&l.get_key_range(), key_range));
|
||||
|
||||
// We ignore level0 delta layers. Unless the whole keyspace fits
|
||||
// into one partition
|
||||
@@ -569,8 +404,8 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
for e in self.historic_layers.iter() {
|
||||
e.layer.dump(verbose)?;
|
||||
for (_key, layer) in self.historic_layers.iter() {
|
||||
layer.dump(verbose)?;
|
||||
}
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
|
||||
@@ -627,7 +627,7 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
drop(tenant_conf_guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
let _ = spawn_connection_manager_task(
|
||||
spawn_connection_manager_task(
|
||||
self.conf.broker_etcd_prefix.clone(),
|
||||
self_clone,
|
||||
walreceiver_connect_timeout,
|
||||
|
||||
@@ -107,6 +107,13 @@ pub fn init_tenant_mgr(
|
||||
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
|
||||
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
|
||||
/// and the load continues.
|
||||
///
|
||||
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
|
||||
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
|
||||
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
|
||||
///
|
||||
/// Attach happens on startup and sucessful timeline downloads
|
||||
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
|
||||
pub fn attach_local_tenants(
|
||||
conf: &'static PageServerConf,
|
||||
remote_index: &RemoteIndex,
|
||||
@@ -122,18 +129,20 @@ pub fn attach_local_tenants(
|
||||
);
|
||||
debug!("Timelines to attach: {local_timelines:?}");
|
||||
|
||||
let tenant = load_local_tenant(conf, tenant_id, remote_index);
|
||||
{
|
||||
match tenants_state::write_tenants().entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(_) => {
|
||||
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
|
||||
continue;
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
v.insert(Arc::clone(&tenant));
|
||||
}
|
||||
let mut tenants_accessor = tenants_state::write_tenants();
|
||||
let tenant = match tenants_accessor.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(o) => {
|
||||
info!("Tenant {tenant_id} was found in pageserver's memory");
|
||||
Arc::clone(o.get())
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
|
||||
let tenant = load_local_tenant(conf, tenant_id, remote_index);
|
||||
v.insert(Arc::clone(&tenant));
|
||||
tenant
|
||||
}
|
||||
};
|
||||
drop(tenants_accessor);
|
||||
|
||||
if tenant.current_state() == TenantState::Broken {
|
||||
warn!("Skipping timeline load for broken tenant {tenant_id}")
|
||||
@@ -168,16 +177,28 @@ fn load_local_tenant(
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
match Tenant::load_tenant_config(conf, tenant_id) {
|
||||
Ok(tenant_conf) => {
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
tenant.activate(false);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
|
||||
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
|
||||
if !tenant_timelines_dir.is_dir() {
|
||||
error!(
|
||||
"Tenant {} has no timelines directory at {}",
|
||||
tenant_id,
|
||||
tenant_timelines_dir.display()
|
||||
);
|
||||
tenant.set_state(TenantState::Broken);
|
||||
} else {
|
||||
match Tenant::load_tenant_config(conf, tenant_id) {
|
||||
Ok(tenant_conf) => {
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
tenant.activate(false);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tenant
|
||||
}
|
||||
|
||||
@@ -625,14 +646,10 @@ fn collect_timelines_for_tenant(
|
||||
}
|
||||
|
||||
if tenant_timelines.is_empty() {
|
||||
match remove_if_empty(&timelines_dir) {
|
||||
Ok(true) => info!(
|
||||
"Removed empty tenant timelines directory {}",
|
||||
timelines_dir.display()
|
||||
),
|
||||
Ok(false) => (),
|
||||
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
|
||||
}
|
||||
// this is normal, we've removed all broken, empty and temporary timeline dirs
|
||||
// but should allow the tenant to stay functional and allow creating new timelines
|
||||
// on a restart, we require tenants to have the timelines dir, so leave it on disk
|
||||
debug!("Tenant {tenant_id} has no timelines loaded");
|
||||
}
|
||||
|
||||
Ok((tenant_id, tenant_timelines))
|
||||
|
||||
@@ -12,6 +12,8 @@ use chrono::{NaiveDateTime, Utc};
|
||||
use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use postgres::{SimpleQueryMessage, SimpleQueryRow};
|
||||
use postgres_ffi::v14::xlog_utils::normalize_lsn;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use tokio::{pin, select, sync::watch, time};
|
||||
@@ -156,6 +158,14 @@ pub async fn handle_walreceiver_connection(
|
||||
// There might be some padding after the last full record, skip it.
|
||||
startpoint += startpoint.calc_padding(8u32);
|
||||
|
||||
// If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
|
||||
// for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
|
||||
//. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
|
||||
// but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
|
||||
// header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
|
||||
// to the safekeepers.
|
||||
startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
|
||||
|
||||
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
|
||||
|
||||
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
|
||||
|
||||
@@ -129,13 +129,13 @@ mod tests {
|
||||
assert!(CANCEL_MAP.contains(&session));
|
||||
|
||||
tx.send(()).expect("failed to send");
|
||||
let () = futures::future::pending().await; // sleep forever
|
||||
futures::future::pending::<()>().await; // sleep forever
|
||||
|
||||
Ok(())
|
||||
}));
|
||||
|
||||
// Wait until the task has been spawned.
|
||||
let () = rx.await.context("failed to hear from the task")?;
|
||||
rx.await.context("failed to hear from the task")?;
|
||||
|
||||
// Drop the session's entry by cancelling the task.
|
||||
task.abort();
|
||||
|
||||
@@ -5,6 +5,7 @@ filterwarnings =
|
||||
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
|
||||
addopts =
|
||||
-m 'not remote_cluster'
|
||||
--ignore=test_runner/performance
|
||||
markers =
|
||||
remote_cluster
|
||||
testpaths =
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
[toolchain]
|
||||
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people
|
||||
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
|
||||
# version, we can consider updating. As of this writing, 1.60 is available on Debian 'experimental' but not yet on
|
||||
# 'testing' or even 'unstable', which is a bit more cutting-edge than we'd like. Hopefully the 1.60 packages reach
|
||||
# 'testing' soon (and similarly for the other distributions).
|
||||
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package.
|
||||
channel = "1.60" # do update GitHub CI cache values for rust builds, when changing this value
|
||||
# version, we can consider updating.
|
||||
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package,
|
||||
# we use "unstable" version number as the highest version used in the project by default.
|
||||
channel = "1.61" # do update GitHub CI cache values for rust builds, when changing this value
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -33,6 +33,7 @@ toml_edit = { version = "0.13", features = ["easy"] }
|
||||
thiserror = "1"
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
safekeeper_api = { path = "../libs/safekeeper_api" }
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
utils = { path = "../libs/utils" }
|
||||
|
||||
@@ -291,9 +291,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
|
||||
// Register metrics collector for active timelines. It's important to do this
|
||||
// after daemonizing, otherwise process collector will be upset.
|
||||
let registry = metrics::default_registry();
|
||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
|
||||
registry.register(Box::new(timeline_collector))?;
|
||||
metrics::register_internal(Box::new(timeline_collector))?;
|
||||
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
let mut threads = vec![];
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod models;
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use safekeeper_api::models;
|
||||
|
||||
@@ -27,14 +27,13 @@ mod timelines_global_map;
|
||||
pub use timelines_global_map::GlobalTimelines;
|
||||
|
||||
pub mod defaults {
|
||||
use const_format::formatcp;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub use safekeeper_api::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
|
||||
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
|
||||
}
|
||||
|
||||
@@ -56,6 +56,14 @@ If you want to run all tests that have the string "bench" in their names:
|
||||
|
||||
`./scripts/pytest -k bench`
|
||||
|
||||
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
|
||||
|
||||
`./scripts/pytest -n4`
|
||||
|
||||
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
|
||||
|
||||
`./scripts/pytest test_runner/performance`
|
||||
|
||||
Useful environment variables:
|
||||
|
||||
`NEON_BIN`: The directory where neon binaries can be found.
|
||||
|
||||
@@ -455,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
|
||||
LOCAL_FS = "local_fs"
|
||||
MOCK_S3 = "mock_s3"
|
||||
REAL_S3 = "real_s3"
|
||||
# Pass to tests that are generic to remote storage
|
||||
# to ensure the test pass with or without the remote storage
|
||||
NOOP = "noop"
|
||||
|
||||
|
||||
def available_remote_storages() -> List[RemoteStorageKind]:
|
||||
@@ -583,7 +586,9 @@ class NeonEnvBuilder:
|
||||
test_name: str,
|
||||
force_enable: bool = True,
|
||||
):
|
||||
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||
return
|
||||
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
||||
@@ -1131,6 +1136,19 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def timeline_get_lsn_by_timestamp(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
|
||||
):
|
||||
log.info(
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||
)
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
return res_json
|
||||
|
||||
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
|
||||
res = self.put(
|
||||
@@ -1182,6 +1200,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
arguments: List[str],
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
check_return_code=True,
|
||||
timeout=None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
"""
|
||||
Run the command with the specified arguments.
|
||||
@@ -1228,6 +1247,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=timeout,
|
||||
)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
@@ -1601,6 +1621,14 @@ class WalCraft(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
|
||||
|
||||
class ComputeCtl(AbstractNeonCli):
|
||||
"""
|
||||
A typed wrapper around the `compute_ctl` CLI tool.
|
||||
"""
|
||||
|
||||
COMMAND = "compute_ctl"
|
||||
|
||||
|
||||
class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
An object representing a running pageserver.
|
||||
|
||||
@@ -6,6 +6,8 @@ from typing import List
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
|
||||
|
||||
@@ -88,3 +90,39 @@ def test_branching_with_pgbench(
|
||||
for pg in pgs:
|
||||
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
|
||||
assert res[0] == (100000 * scale,)
|
||||
|
||||
|
||||
# Test branching from an "unnormalized" LSN.
|
||||
#
|
||||
# Context:
|
||||
# When doing basebackup for a newly created branch, pageserver generates
|
||||
# 'pg_control' file to bootstrap WAL segment by specifying the redo position
|
||||
# a "normalized" LSN based on the timeline's starting LSN:
|
||||
#
|
||||
# checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0;
|
||||
#
|
||||
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
|
||||
#
|
||||
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
|
||||
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
XLOG_BLCKSZ = 8192
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("b0")
|
||||
pg0 = env.postgres.create_start("b0")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
|
||||
|
||||
with pg0.cursor() as cur:
|
||||
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
|
||||
# and is smaller than `curr_lsn`.
|
||||
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
|
||||
|
||||
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
|
||||
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
|
||||
pg1 = env.postgres.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])
|
||||
|
||||
203
test_runner/regress/test_compute_ctl.py
Normal file
203
test_runner/regress/test_compute_ctl.py
Normal file
@@ -0,0 +1,203 @@
|
||||
import os
|
||||
from subprocess import TimeoutExpired
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
|
||||
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
ctl = ComputeCtl(env)
|
||||
|
||||
env.neon_cli.create_branch("test_compute_ctl", "main")
|
||||
pg = env.postgres.create_start("test_compute_ctl")
|
||||
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
|
||||
with open(pg.config_file_path(), "r") as f:
|
||||
cfg_lines = f.readlines()
|
||||
cfg_map = {}
|
||||
for line in cfg_lines:
|
||||
if "=" in line:
|
||||
k, v = line.split("=")
|
||||
cfg_map[k] = v.strip("\n '\"")
|
||||
log.info(f"postgres config: {cfg_map}")
|
||||
pgdata = pg.pg_data_dir_path()
|
||||
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
|
||||
|
||||
pg.stop_and_destroy()
|
||||
|
||||
spec = (
|
||||
"""
|
||||
{
|
||||
"format_version": 1.0,
|
||||
|
||||
"timestamp": "2021-05-23T18:25:43.511Z",
|
||||
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
|
||||
|
||||
"cluster": {
|
||||
"cluster_id": "test-cluster-42",
|
||||
"name": "Neon Test",
|
||||
"state": "restarted",
|
||||
"roles": [
|
||||
],
|
||||
"databases": [
|
||||
],
|
||||
"settings": [
|
||||
{
|
||||
"name": "fsync",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "replica",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "hot_standby",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.safekeepers"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "log_connections",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "shared_buffers",
|
||||
"value": "32768",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "port",
|
||||
"value": """
|
||||
+ f'"{cfg_map["port"]}"'
|
||||
+ """,
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_connections",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_senders",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "listen_addresses",
|
||||
"value": "0.0.0.0",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_sender_timeout",
|
||||
"value": "0",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "password_encryption",
|
||||
"value": "md5",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "maintenance_work_mem",
|
||||
"value": "65536",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_parallel_workers",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_worker_processes",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.tenant_id",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.tenant_id"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_slots",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.timeline_id",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.timeline_id"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "shared_preload_libraries",
|
||||
"value": "neon",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "synchronous_standby_names",
|
||||
"value": "walproposer",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.pageserver_connstring",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
}
|
||||
]
|
||||
},
|
||||
"delta_operations": [
|
||||
]
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
ps_connstr = cfg_map["neon.pageserver_connstring"]
|
||||
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
|
||||
|
||||
# run compute_ctl and wait for 10s
|
||||
try:
|
||||
ctl.raw_cli(
|
||||
["--connstr", ps_connstr, "--pgdata", pgdata, "--spec", spec, "--pgbin", pg_bin_path],
|
||||
timeout=10,
|
||||
)
|
||||
except TimeoutExpired as exc:
|
||||
ctl_logs = exc.stderr.decode("utf-8")
|
||||
log.info("compute_ctl output:\n" + ctl_logs)
|
||||
|
||||
start = "starting safekeepers syncing"
|
||||
end = "safekeepers synced at LSN"
|
||||
start_pos = ctl_logs.index(start)
|
||||
assert start_pos != -1
|
||||
end_pos = ctl_logs.index(end, start_pos)
|
||||
assert end_pos != -1
|
||||
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
|
||||
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
|
||||
|
||||
# assert that --sync-safekeepers logs are present in the output
|
||||
assert "connecting with node" in sync_safekeepers_logs
|
||||
assert "connected with node" in sync_safekeepers_logs
|
||||
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
|
||||
assert "got votes from majority (2)" in sync_safekeepers_logs
|
||||
assert "sending elected msg to node" in sync_safekeepers_logs
|
||||
@@ -15,7 +15,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
pgmain = env.postgres.create_start("test_lsn_mapping")
|
||||
log.info("postgres is running on 'test_lsn_mapping' branch")
|
||||
|
||||
ps_cur = env.pageserver.connect().cursor()
|
||||
cur = pgmain.connect().cursor()
|
||||
# Create table, and insert rows, each in a separate transaction
|
||||
# Disable synchronous_commit to make this initialization go faster.
|
||||
@@ -38,37 +37,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Wait until WAL is received by pageserver
|
||||
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
|
||||
|
||||
# Check edge cases: timestamp in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = query_scalar(
|
||||
ps_cur,
|
||||
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
|
||||
)
|
||||
assert result == "future"
|
||||
|
||||
# timestamp too the far history
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = query_scalar(
|
||||
ps_cur,
|
||||
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
|
||||
)
|
||||
assert result == "past"
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
lsn = query_scalar(
|
||||
ps_cur,
|
||||
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
|
||||
with env.pageserver.http_client() as client:
|
||||
# Check edge cases: timestamp in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
assert result == "future"
|
||||
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
# that were supposed to be committed at that point in time are visible.
|
||||
pg_here = env.postgres.create_start(
|
||||
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
|
||||
# timestamp too the far history
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
|
||||
assert result == "past"
|
||||
|
||||
pg_here.stop_and_destroy()
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
lsn = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
# that were supposed to be committed at that point in time are visible.
|
||||
pg_here = env.postgres.create_start(
|
||||
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
|
||||
)
|
||||
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
|
||||
|
||||
pg_here.stop_and_destroy()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import shutil
|
||||
from contextlib import closing
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
@@ -7,8 +8,13 @@ from typing import List
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.types import Lsn, TenantId
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
@@ -201,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
|
||||
|
||||
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
|
||||
assert post_detach_samples == set()
|
||||
|
||||
|
||||
# Check that empty tenants work with or without the remote storage
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
|
||||
)
|
||||
def test_pageserver_with_empty_tenants(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_pageserver_with_empty_tenants",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_without_timelines_dir = env.initial_tenant
|
||||
log.info(
|
||||
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
|
||||
)
|
||||
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
|
||||
|
||||
tenant_with_empty_timelines_dir = client.tenant_create()
|
||||
log.info(
|
||||
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
|
||||
)
|
||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
|
||||
for temp_timeline in temp_timelines:
|
||||
client.timeline_delete(
|
||||
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
|
||||
)
|
||||
files_in_timelines_dir = sum(
|
||||
1
|
||||
for _p in Path.iterdir(
|
||||
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
|
||||
)
|
||||
)
|
||||
assert (
|
||||
files_in_timelines_dir == 0
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
|
||||
|
||||
# Trigger timeline reinitialization after pageserver restart
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
tenants = client.tenant_list()
|
||||
|
||||
assert (
|
||||
len(tenants) == 1
|
||||
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
|
||||
loaded_tenant = tenants[0]
|
||||
assert loaded_tenant["id"] == str(
|
||||
tenant_with_empty_timelines_dir
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
|
||||
assert loaded_tenant["state"] == {
|
||||
"Active": {"background_jobs_running": False}
|
||||
}, "Empty tenant should be loaded and ready for timeline creation"
|
||||
|
||||
@@ -7,19 +7,25 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserverHttpClient,
|
||||
Postgres,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
async def tenant_workload(env: NeonEnv, pg: Postgres):
|
||||
@@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
|
||||
# run final checkpoint manually to flush all the data to remote storage
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_tenants_attached_after_download(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="remote_storage_kind",
|
||||
)
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pg = env.postgres.create_start("main")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
for checkpoint_number in range(1, 3):
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
|
||||
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
|
||||
"""
|
||||
)
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
log.info(f"waiting for checkpoint {checkpoint_number} upload")
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||
log.info(f"upload of checkpoint {checkpoint_number} is done")
|
||||
|
||||
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
|
||||
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
local_layer_deleted = False
|
||||
for path in Path.iterdir(timeline_dir):
|
||||
if path.name.startswith("00000"):
|
||||
# Looks like a layer file. Remove it
|
||||
os.remove(path)
|
||||
local_layer_deleted = True
|
||||
break
|
||||
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
|
||||
|
||||
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
|
||||
env.pageserver.start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
|
||||
)
|
||||
|
||||
restored_timelines = client.timeline_list(tenant_id)
|
||||
assert (
|
||||
len(restored_timelines) == 1
|
||||
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
|
||||
retored_timeline = restored_timelines[0]
|
||||
assert retored_timeline["timeline_id"] == str(
|
||||
timeline_id
|
||||
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
|
||||
|
||||
|
||||
def expect_tenant_to_download_timeline(
|
||||
client: NeonPageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
):
|
||||
for tenant in client.tenant_list():
|
||||
if tenant["id"] == str(tenant_id):
|
||||
assert not tenant.get(
|
||||
"has_in_progress_downloads", True
|
||||
), f"Tenant {tenant_id} should have no downloads in progress"
|
||||
return
|
||||
assert False, f"Tenant {tenant_id} is missing on pageserver"
|
||||
|
||||
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 9383aaa9c2...ff18cec1ee
@@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
|
||||
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
|
||||
bytes = { version = "1", features = ["serde", "std"] }
|
||||
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
|
||||
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
|
||||
either = { version = "1", features = ["use_std"] }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
||||
|
||||
Reference in New Issue
Block a user