Compare commits

..

20 Commits

Author SHA1 Message Date
Konstantin Knizhnik
121c19fcd6 Replace R-Tree with B-Tree in layer map 2022-10-07 13:11:24 +03:00
Arthur Petukhovsky
687ba81366 Display sync safekeepers output in compute_ctl (#2571)
Pipe postgres output to compute_ctl stdout and create a test to check that compute_ctl works and prints postgres logs.
2022-10-06 13:53:52 +00:00
Andrés
47bae68a2e Make get_lsn_by_timestamp available in mgmt API (#2536) (#2560)
Co-authored-by: andres <andres.rodriguez@outlook.es>
2022-10-06 12:42:50 +03:00
Joonas Koivunen
e8b195acb7 fix: apply notify workaround on m1 mac docker (#2564)
workaround as discussed in the notify repository.
2022-10-06 11:13:40 +03:00
Anastasia Lubennikova
254cb7dc4f Update CI script to push compute-node-v15 to dockerhub 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
ed85d97f17 bump vendor/postgres-v15. Rebase it to Stamp 15rc2 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
4a216c5f7f Use PostGIS 3.3.1 that is compatible with pg 15 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
c5a428a61a Update Dockerfile.compute-node-v15 to match v14 version.
Fix build script to promote the image for v15 to neon dockerhub
2022-10-06 10:50:08 +03:00
Konstantin Knizhnik
ff8c481777 Normalize last_record LSN in wal receiver (#2529)
* Add test for branching on page boundary

* Normalize start recovery point

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>

Co-authored-by:  Thang Pham <thang@neon.tech>
2022-10-06 09:01:56 +03:00
Arthur Petukhovsky
f25dd75be9 Fix deadlock in safekeeper metrics (#2566)
We had a problem where almost all of the threads were waiting on a futex syscall. More specifically:
- `/metrics` handler was inside `TimelineCollector::collect()`, waiting on a mutex for a single Timeline
- This exact timeline was inside `control_file::FileStorage::persist()`, waiting on a mutex for Lazy initialization of `PERSIST_CONTROL_FILE_SECONDS`
- `PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram>` was blocked on `prometheus::register`
- `prometheus::register` calls `DEFAULT_REGISTRY.write().register()` to take a write lock on Registry and add a new metric
- `DEFAULT_REGISTRY` lock was already taken inside `DEFAULT_REGISTRY.gather()`, which was called by `/metrics` handler to collect all metrics

This commit creates another Registry with a separate lock, to avoid deadlock in a case where `TimelineCollector` triggers registration of new metrics inside default registry.
2022-10-06 01:07:02 +03:00
Sergey Melnikov
b99bed510d Move proxies to neon-proxy namespace (#2555) 2022-10-05 16:14:09 +03:00
sharnoff
580584c8fc Remove control_plane deps on pageserver/safekeeper (#2513)
Creates new `pageserver_api` and `safekeeper_api` crates to serve as the
shared dependencies. Should reduce both recompile times and cold compile
times.

Decreases the size of the optimized `neon_local` binary: 380M -> 179M.
No significant changes for anything else (mostly as expected).
2022-10-04 11:14:45 -07:00
Kirill Bulatov
d823e84ed5 Allow attaching tenants with zero timelines 2022-10-04 18:13:51 +03:00
Kirill Bulatov
231dfbaed6 Do not remove empty timelines/ directory for tenants 2022-10-04 18:13:51 +03:00
Dmitry Rodionov
5cf53786f9 Improve pytest ergonomics
1. Disable perf tests by default
2. Add instruction to run tests in parallel
2022-10-04 14:53:01 +03:00
Heikki Linnakangas
9b9bbad462 Use 'notify' crate to wait for PostgreSQL startup.
Compute node startup time is very important. After launching
PostgreSQL, use 'notify' to be notified immediately when it has
updated the PID file, instead of polling. The polling loop had 100 ms
interval so this shaves up to 100 ms from the startup time.
2022-10-04 13:00:15 +03:00
Heikki Linnakangas
537b2c1ae6 Remove unnecessary check for open PostgreSQL TCP port.
The loop checked if the TCP port is open for connections, by trying to
connect to it. That seems unnecessary. By the time the postmaster.pid
file says that it's ready, the port should be open. Remove that check.
2022-10-04 12:09:13 +03:00
Joonas Koivunen
31123d1fa8 Silence clippies, minor doc fix (#2543)
* doc: remove stray backtick

* chore: clippy::let_unit_value

* chore: silence useless_transmute, duplicate_mod

* chore: remove allowing deref_nullptr

not needed since bindgen 0.60.0.

* chore: remove repeated allowed lints

they are already allowed from the crate root.
2022-10-03 17:44:17 +03:00
Kirill Bulatov
4f2ac51bdd Bump rustc to 1.61 2022-10-03 16:36:03 +03:00
Kirill Bulatov
7b2f9dc908 Reuse existing tenants during attach (#2540) 2022-10-03 13:33:55 +03:00
52 changed files with 1114 additions and 592 deletions

View File

@@ -127,8 +127,8 @@ jobs:
target/ target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found # Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: | key: |
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }} v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo- v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-
- name: Cache postgres v14 build - name: Cache postgres v14 build
id: cache_pg_14 id: cache_pg_14
@@ -389,7 +389,7 @@ jobs:
!~/.cargo/registry/src !~/.cargo/registry/src
~/.cargo/git/ ~/.cargo/git/
target/ target/
key: v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }} key: v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact - name: Get Neon artifact
uses: ./.github/actions/download uses: ./.github/actions/download
@@ -564,7 +564,7 @@ jobs:
promote-images: promote-images:
runs-on: dev runs-on: dev
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-tools-image ] needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
if: github.event_name != 'workflow_dispatch' if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli container: amazon/aws-cli
strategy: strategy:
@@ -573,7 +573,7 @@ jobs:
# compute-node uses postgres 14, which is default now # compute-node uses postgres 14, which is default now
# cloud repo depends on this image name, thus duplicating it # cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated # remove compute-node when cloud repo is updated
name: [ neon, compute-node, compute-node-v14, compute-tools ] name: [ neon, compute-node, compute-node-v14, compute-node-v15, compute-tools ]
steps: steps:
- name: Promote image to latest - name: Promote image to latest
@@ -608,6 +608,9 @@ jobs:
- name: Pull compute node v14 image from ECR - name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14 run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
- name: Pull rust image from ECR - name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
@@ -638,6 +641,9 @@ jobs:
- name: Push compute node v14 image to Docker Hub - name: Push compute node v14 image to Docker Hub
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push compute node v15 image to Docker Hub
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push rust image to Docker Hub - name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned run: crane push rust neondatabase/rust:pinned
@@ -650,6 +656,7 @@ jobs:
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
calculate-deploy-targets: calculate-deploy-targets:
runs-on: [ self-hosted, Linux, k8s-runner ] runs-on: [ self-hosted, Linux, k8s-runner ]
@@ -768,5 +775,5 @@ jobs:
- name: Re-deploy proxy - name: Re-deploy proxy
run: | run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}} DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -106,7 +106,7 @@ jobs:
!~/.cargo/registry/src !~/.cargo/registry/src
~/.cargo/git ~/.cargo/git
target target
key: v4-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust key: v5-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
- name: Run cargo clippy - name: Run cargo clippy
run: ./run_clippy.sh run: ./run_clippy.sh

98
Cargo.lock generated
View File

@@ -497,8 +497,10 @@ dependencies = [
"chrono", "chrono",
"clap 3.2.16", "clap 3.2.16",
"env_logger", "env_logger",
"futures",
"hyper", "hyper",
"log", "log",
"notify",
"postgres", "postgres",
"regex", "regex",
"serde", "serde",
@@ -540,11 +542,11 @@ dependencies = [
"git-version", "git-version",
"nix", "nix",
"once_cell", "once_cell",
"pageserver", "pageserver_api",
"postgres", "postgres",
"regex", "regex",
"reqwest", "reqwest",
"safekeeper", "safekeeper_api",
"serde", "serde",
"serde_with", "serde_with",
"tar", "tar",
@@ -1072,6 +1074,15 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.21" version = "0.3.21"
@@ -1493,6 +1504,26 @@ dependencies = [
"str_stack", "str_stack",
] ]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.12" version = "0.1.12"
@@ -1552,6 +1583,26 @@ dependencies = [
"simple_asn1", "simple_asn1",
] ]
[[package]]
name = "kqueue"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]] [[package]]
name = "kstring" name = "kstring"
version = "1.0.6" version = "1.0.6"
@@ -1797,6 +1848,24 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "notify"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"winapi",
]
[[package]] [[package]]
name = "num-bigint" name = "num-bigint"
version = "0.4.3" version = "0.4.3"
@@ -1975,6 +2044,7 @@ dependencies = [
"nix", "nix",
"num-traits", "num-traits",
"once_cell", "once_cell",
"pageserver_api",
"postgres", "postgres",
"postgres-protocol", "postgres-protocol",
"postgres-types", "postgres-types",
@@ -2003,6 +2073,17 @@ dependencies = [
"workspace_hack", "workspace_hack",
] ]
[[package]]
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@@ -2891,6 +2972,7 @@ dependencies = [
"postgres_ffi", "postgres_ffi",
"regex", "regex",
"remote_storage", "remote_storage",
"safekeeper_api",
"serde", "serde",
"serde_json", "serde_json",
"serde_with", "serde_with",
@@ -2906,6 +2988,17 @@ dependencies = [
"workspace_hack", "workspace_hack",
] ]
[[package]]
name = "safekeeper_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]] [[package]]
name = "same-file" name = "same-file"
version = "1.0.6" version = "1.0.6"
@@ -4142,6 +4235,7 @@ dependencies = [
"bstr", "bstr",
"bytes", "bytes",
"chrono", "chrono",
"crossbeam-utils",
"either", "either",
"fail", "fail",
"hashbrown", "hashbrown",

View File

@@ -5,7 +5,7 @@
ARG TAG=pinned ARG TAG=pinned
# apparently, ARGs don't get replaced in RUN commands in kaniko # apparently, ARGs don't get replaced in RUN commands in kaniko
# ARG POSTGIS_VERSION=3.3.0 # ARG POSTGIS_VERSION=3.3.1
# ARG PLV8_VERSION=3.1.4 # ARG PLV8_VERSION=3.1.4
# ARG PG_VERSION=v15 # ARG PG_VERSION=v15
@@ -13,9 +13,12 @@ ARG TAG=pinned
# Layer "build-deps" # Layer "build-deps"
# #
FROM debian:bullseye-slim AS build-deps FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \ RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \ apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
# #
# Layer "pg-build" # Layer "pg-build"
@@ -42,11 +45,11 @@ RUN cd postgres && \
FROM build-deps AS postgis-build FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \ RUN apt update && \
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
tar xvzf postgis-3.3.0.tar.gz && \ tar xvzf postgis-3.3.1.tar.gz && \
cd postgis-3.3.0 && \ cd postgis-3.3.1 && \
./autogen.sh && \ ./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \ export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure && \ ./configure && \
@@ -64,15 +67,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
# Build plv8 # Build plv8
# #
FROM build-deps AS plv8-build FROM build-deps AS plv8-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \ RUN apt update && \
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
# https://github.com/plv8/plv8/issues/475 # https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary # Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ RUN apt update && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
apt install -y --no-install-recommends -t testing binutils apt install -y --no-install-recommends -t testing binutils
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -84,12 +85,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
rm -rf /plv8-* && \ rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#
# Layer "h3-pg-build"
# Build h3_pg
#
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
cd h3-4.0.1 && \
mkdir build && \
cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/h3 make install && \
cp -R /h3/usr / && \
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
tar xvzf h3-pg.tgz && \
cd h3-pg-4.0.1 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
# #
# Layer "neon-pg-ext-build" # Layer "neon-pg-ext-build"
# compile neon extensions # compile neon extensions
# #
FROM build-deps AS neon-pg-ext-build FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/ COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \ RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -137,8 +172,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
chmod 0750 /var/db/postgres/compute && \ chmod 0750 /var/db/postgres/compute && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# TODO: Check if we can make the extension setup more modular versus a linear build
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -8,8 +8,10 @@ anyhow = "1.0"
chrono = "0.4" chrono = "0.4"
clap = "3.0" clap = "3.0"
env_logger = "0.9" env_logger = "0.9"
futures = "0.3.13"
hyper = { version = "0.14", features = ["full"] } hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] } log = { version = "0.4", features = ["std", "serde"] }
notify = "5.0.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1" regex = "1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@@ -178,7 +178,6 @@ impl ComputeNode {
.args(&["--sync-safekeepers"]) .args(&["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn() .spawn()
.expect("postgres --sync-safekeepers failed to start"); .expect("postgres --sync-safekeepers failed to start");
@@ -191,10 +190,10 @@ impl ComputeNode {
if !sync_output.status.success() { if !sync_output.status.success() {
anyhow::bail!( anyhow::bail!(
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}, stderr: {}", "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
sync_output.status, sync_output.status,
String::from_utf8(sync_output.stdout).expect("postgres --sync-safekeepers exited, and stdout is not utf-8"), String::from_utf8(sync_output.stdout)
String::from_utf8(sync_output.stderr).expect("postgres --sync-safekeepers exited, and stderr is not utf-8"), .expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
); );
} }
@@ -258,14 +257,7 @@ impl ComputeNode {
.spawn() .spawn()
.expect("cannot start postgres process"); .expect("cannot start postgres process");
// Try default Postgres port if it is not provided wait_for_postgres(&mut pg, pgdata_path)?;
let port = self
.spec
.cluster
.settings
.find("port")
.unwrap_or_else(|| "5432".to_string());
wait_for_postgres(&mut pg, &port, pgdata_path)?;
// If connection fails, // If connection fails,
// it may be the old node with `zenith_admin` superuser. // it may be the old node with `zenith_admin` superuser.

View File

@@ -1,18 +1,19 @@
use std::fmt::Write; use std::fmt::Write;
use std::fs;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::net::{SocketAddr, TcpStream};
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::Path; use std::path::Path;
use std::process::Child; use std::process::Child;
use std::str::FromStr; use std::time::{Duration, Instant};
use std::{fs, thread, time};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use postgres::{Client, Transaction}; use postgres::{Client, Transaction};
use serde::Deserialize; use serde::Deserialize;
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds use notify::{RecursiveMode, Watcher};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Rust representation of Postgres role info with only those fields /// Rust representation of Postgres role info with only those fields
/// that matter for us. /// that matter for us.
@@ -230,52 +231,112 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
Ok(postgres_dbs) Ok(postgres_dbs)
} }
/// Wait for Postgres to become ready to accept connections: /// Wait for Postgres to become ready to accept connections. It's ready to
/// - state should be `ready` in the `pgdata/postmaster.pid` /// accept connections when the state-field in `pgdata/postmaster.pid` says
/// - and we should be able to connect to 127.0.0.1:5432 /// 'ready'.
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> { pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid"); let pid_path = pgdata.join("postmaster.pid");
let mut slept: u64 = 0; // ms
let pause = time::Duration::from_millis(100);
let timeout = time::Duration::from_millis(10); // PostgreSQL writes line "ready" to the postmaster.pid file, when it has
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap(); // completed initialization and is ready to accept connections. We want to
// react quickly and perform the rest of our initialization as soon as
// PostgreSQL starts accepting connections. Use 'notify' to be notified
// whenever the PID file is changed, and whenever it changes, read it to
// check if it's now "ready".
//
// You cannot actually watch a file before it exists, so we first watch the
// data directory, and once the postmaster.pid file appears, we switch to
// watch the file instead. We also wake up every 100 ms to poll, just in
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
let _ = tx.send(res);
}) {
Ok(watcher) => (Box::new(watcher), rx),
Err(e) => {
match e.kind {
notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
// docker on m1 macs does not support recommended_watcher
// but return "Function not implemented (os error 38)"
// see https://github.com/notify-rs/notify/issues/423
let (tx, rx) = std::sync::mpsc::channel();
loop { // let's poll it faster than what we check the results for (100ms)
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout, let config =
// but postgres starts listening almost immediately, even if it is not really notify::Config::default().with_poll_interval(Duration::from_millis(50));
// ready to accept connections).
if slept >= POSTGRES_WAIT_TIMEOUT { let watcher = notify::PollWatcher::new(
bail!("timed out while waiting for Postgres to start"); move |res| {
let _ = tx.send(res);
},
config,
)?;
(Box::new(watcher), rx)
}
_ => return Err(e.into()),
}
} }
};
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
let started_at = Instant::now();
let mut postmaster_pid_seen = false;
loop {
if let Ok(Some(status)) = pg.try_wait() { if let Ok(Some(status)) = pg.try_wait() {
// Postgres exited, that is not what we expected, bail out earlier. // Postgres exited, that is not what we expected, bail out earlier.
let code = status.code().unwrap_or(-1); let code = status.code().unwrap_or(-1);
bail!("Postgres exited unexpectedly with code {}", code); bail!("Postgres exited unexpectedly with code {}", code);
} }
let res = rx.recv_timeout(Duration::from_millis(100));
log::debug!("woken up by notify: {res:?}");
// If there are multiple events in the channel already, we only need to be
// check once. Swallow the extra events before we go ahead to check the
// pid file.
while let Ok(res) = rx.try_recv() {
log::debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first. // Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) { if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
log::debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
watcher
.watch(&pid_path, RecursiveMode::NonRecursive)
.expect("Failed to add postmaster.pid file watch");
postmaster_pid_seen = true;
}
let file = BufReader::new(file); let file = BufReader::new(file);
let last_line = file.lines().last(); let last_line = file.lines().last();
// Pid file could be there and we could read it, but it could be empty, for example. // Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line { if let Some(Ok(line)) = last_line {
let status = line.trim(); let status = line.trim();
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok(); log::debug!("last line of postmaster.pid: {status:?}");
// Now Postgres is ready to accept connections // Now Postgres is ready to accept connections
if status == "ready" && can_connect { if status == "ready" {
break; break;
} }
} }
} }
thread::sleep(pause); // Give up after POSTGRES_WAIT_TIMEOUT.
slept += 100; let duration = started_at.elapsed();
if duration >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
} }
log::info!("PostgreSQL is now running, continuing to configure it");
Ok(()) Ok(())
} }

View File

@@ -19,7 +19,9 @@ thiserror = "1"
nix = "0.23" nix = "0.23"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
pageserver = { path = "../pageserver" } # Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
safekeeper = { path = "../safekeeper" } # instead, so that recompile times are better.
pageserver_api = { path = "../libs/pageserver_api" }
safekeeper_api = { path = "../libs/safekeeper_api" }
utils = { path = "../libs/utils" } utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" } workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::safekeeper::SafekeeperNode; use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode; use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env}; use control_plane::{etcd, local_env};
use pageserver::config::defaults::{ use pageserver_api::models::TimelineInfo;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR, DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR, DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
}; };
use pageserver::http::models::TimelineInfo; use safekeeper_api::{
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
}; };

View File

@@ -285,7 +285,7 @@ impl LocalEnv {
branch_name: &str, branch_name: &str,
tenant_id: TenantId, tenant_id: TenantId,
) -> Option<TimelineId> { ) -> Option<TimelineId> {
dbg!(&self.branch_name_mappings) self.branch_name_mappings
.get(branch_name)? .get(branch_name)?
.iter() .iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id) .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)

View File

@@ -12,7 +12,7 @@ use nix::unistd::Pid;
use postgres::Config; use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method}; use reqwest::{IntoUrl, Method};
use safekeeper::http::models::TimelineCreateRequest; use safekeeper_api::models::TimelineCreateRequest;
use thiserror::Error; use thiserror::Error;
use utils::{ use utils::{
connstring::connection_address, connstring::connection_address,

View File

@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno; use nix::errno::Errno;
use nix::sys::signal::{kill, Signal}; use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid; use nix::unistd::Pid;
use pageserver::http::models::{ use pageserver_api::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo, TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
}; };
use postgres::{Config, NoTls}; use postgres::{Config, NoTls};

View File

@@ -96,7 +96,7 @@ A single virtual environment with all dependencies is described in the single `P
sudo apt install python3.9 sudo apt install python3.9
``` ```
- Install `poetry` - Install `poetry`
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation)`. - Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation).
- Install dependencies via `./scripts/pysync`. - Install dependencies via `./scripts/pysync`.
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile)) - Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
so if you have different version some linting tools can yield different result locally vs in the CI. so if you have different version some linting tools can yield different result locally vs in the CI.

View File

@@ -1,111 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import textwrap
import uuid
from pathlib import Path
import testgres
def run_command(args):
print('> Cmd:', ' '.join(args))
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
ret = p.wait()
output = p.stdout.read().strip()
if output:
print(textwrap.indent(output, '>> '))
if ret != 0:
raise subprocess.CalledProcessError(ret, args)
def make_tarfile(output_filename, source_dir):
print("* Packing the backup into a tarball")
cmd = ["tar", r"--transform=s/\.\///", "-C", str(source_dir), "-cf", str(output_filename), "."]
run_command(cmd)
def create_tenant(tenant_id):
print("* Creating a new tenant")
cmd = ["neon_local", "tenant", "create", f"--tenant-id={tenant_id}"]
run_command(cmd)
def import_backup(args, backup_dir: Path):
tar = Path('/tmp/base.tar')
make_tarfile(tar, backup_dir / 'data')
print("* Importing the timeline into the pageserver")
manifest = json.loads((backup_dir / "data" / "backup_manifest").read_text())
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
print("> LSNs:", start_lsn, end_lsn)
cmd = (
"neon_local timeline import "
f"--tenant-id {args.tenant_id} "
f"--base-lsn {start_lsn} "
f"--end-lsn {end_lsn} "
f"--base-tarfile {tar} "
f"--timeline-id {args.timeline_id} "
f"--node-name {args.node}"
)
run_command(cmd.split())
def debug_prints(node):
tuples = node.execute("table foo")
oid = node.execute("select 'foo'::regclass::oid")[0][0]
print("> foo's tuples:", tuples, "&", "oid:", oid)
print("> DBs:", node.execute("select oid, datname from pg_database"))
def main(args):
print("* Creating a node")
node = testgres.get_new_node()
node.init(unix_sockets=False, allow_streaming=True).start()
node.execute("create table foo as select 1")
debug_prints(node)
# node.pgbench_init(scale=1)
print("* Creating a backup")
backup = node.backup()
backup_dir = Path(backup.base_dir)
print("> Backup dir:", backup_dir)
# pr = backup.spawn_primary().start()
# debug_prints(pr)
# exit(1)
create_tenant(args.tenant_id)
import_backup(args, backup_dir)
print("> Tenant:", args.tenant_id)
print("> Timeline:", args.timeline_id)
print("> Node:", args.node)
print("* Starting postgres")
cmd = ["neon_local", "pg", "start", f"--tenant-id={args.tenant_id}", f"--timeline-id={args.timeline_id}", args.node]
run_command(cmd)
print("* Opening psql session...")
cmd = ["psql", f"host=127.0.0.1 port=55432 user={os.getlogin()} dbname=postgres"]
subprocess.call(cmd)
if __name__ == "__main__":
tenant_id = uuid.uuid4().hex
parser = argparse.ArgumentParser()
parser.add_argument("--tenant-id", default=tenant_id)
parser.add_argument("--timeline-id", default=tenant_id)
parser.add_argument("node")
args = parser.parse_args(sys.argv[1:])
main(args)

View File

@@ -3,7 +3,7 @@
//! Otherwise, we might not see all metrics registered via //! Otherwise, we might not see all metrics registered via
//! a default registry. //! a default registry.
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec}; use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts; pub use prometheus::opts;
pub use prometheus::register; pub use prometheus::register;
pub use prometheus::{core, default_registry, proto}; pub use prometheus::{core, default_registry, proto};
@@ -17,6 +17,7 @@ pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge}; pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec}; pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder}; pub use prometheus::{Encoder, TextEncoder};
use prometheus::{Registry, Result};
mod wrappers; mod wrappers;
pub use wrappers::{CountedReader, CountedWriter}; pub use wrappers::{CountedReader, CountedWriter};
@@ -32,13 +33,27 @@ macro_rules! register_uint_gauge_vec {
}}; }};
} }
/// Special internal registry, to collect metrics independently from the default registry.
/// Was introduced to fix deadlock with lazy registration of metrics in the default registry.
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> Result<()> {
INTERNAL_REGISTRY.register(c)
}
/// Gathers all Prometheus metrics and records the I/O stats just before that. /// Gathers all Prometheus metrics and records the I/O stats just before that.
/// ///
/// Metrics gathering is a relatively simple and standalone operation, so /// Metrics gathering is a relatively simple and standalone operation, so
/// it might be fine to do it this way to keep things simple. /// it might be fine to do it this way to keep things simple.
pub fn gather() -> Vec<prometheus::proto::MetricFamily> { pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
update_rusage_metrics(); update_rusage_metrics();
prometheus::gather() let mut mfs = prometheus::gather();
let mut internal_mfs = INTERNAL_REGISTRY.gather();
mfs.append(&mut internal_mfs);
mfs
} }
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| { static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {

View File

@@ -0,0 +1,12 @@
[package]
name = "pageserver_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,9 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -7,7 +7,17 @@ use utils::{
lsn::Lsn, lsn::Lsn,
}; };
use crate::tenant::TenantState; /// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]

View File

@@ -3,9 +3,11 @@
#![allow(non_snake_case)] #![allow(non_snake_case)]
// bindgen creates some unsafe code with no doc comments. // bindgen creates some unsafe code with no doc comments.
#![allow(clippy::missing_safety_doc)] #![allow(clippy::missing_safety_doc)]
// suppress warnings on rust 1.53 due to bindgen unit tests. // noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
// https://github.com/rust-lang/rust-bindgen/issues/1651 #![allow(clippy::useless_transmute)]
#![allow(deref_nullptr)] // modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
use bytes::Bytes; use bytes::Bytes;
use utils::bin_ser::SerializeError; use utils::bin_ser::SerializeError;

View File

@@ -57,12 +57,10 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG. /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024; const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
} }
#[allow(non_snake_case)]
pub fn XLogSegNoOffsetToRecPtr( pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo, segno: XLogSegNo,
offset: u32, offset: u32,
@@ -71,7 +69,6 @@ pub fn XLogSegNoOffsetToRecPtr(
segno * (wal_segsz_bytes as u64) + (offset as u64) segno * (wal_segsz_bytes as u64) + (offset as u64)
} }
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String { pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
format!( format!(
"{:>08X}{:>08X}{:>08X}", "{:>08X}{:>08X}{:>08X}",
@@ -81,7 +78,6 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
) )
} }
#[allow(non_snake_case)]
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) { pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap(); let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo; let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
@@ -89,12 +85,10 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli) (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
} }
#[allow(non_snake_case)]
pub fn IsXLogFileName(fname: &str) -> bool { pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit()); return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
} }
#[allow(non_snake_case)]
pub fn IsPartialXLogFileName(fname: &str) -> bool { pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8]) fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
} }

View File

@@ -0,0 +1,12 @@
[package]
name = "safekeeper_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,10 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -9,6 +9,7 @@ use once_cell::sync::Lazy;
use routerify::ext::RequestExt; use routerify::ext::RequestExt;
use routerify::RequestInfo; use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService}; use routerify::{Middleware, Router, RouterBuilder, RouterService};
use tokio::task::JoinError;
use tracing::info; use tracing::info;
use std::future::Future; use std::future::Future;
@@ -35,7 +36,13 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
let mut buffer = vec![]; let mut buffer = vec![];
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
let metrics = metrics::gather(); let metrics = tokio::task::spawn_blocking(move || {
// Currently we take a lot of mutexes while collecting metrics, so it's
// better to spawn a blocking task to avoid blocking the event loop.
metrics::gather()
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
encoder.encode(&metrics, &mut buffer).unwrap(); encoder.encode(&metrics, &mut buffer).unwrap();
let response = Response::builder() let response = Response::builder()

View File

@@ -58,6 +58,7 @@ rstar = "0.9.3"
num-traits = "0.2.15" num-traits = "0.2.15"
amplify_num = "0.4.1" amplify_num = "0.4.1"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" } postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" } etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" } metrics = { path = "../libs/metrics" }

View File

@@ -30,10 +30,10 @@ pub mod defaults {
use crate::tenant_config::defaults::*; use crate::tenant_config::defaults::*;
use const_format::formatcp; use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; pub use pageserver_api::{
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; DEFAULT_PG_LISTEN_PORT,
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); };
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes; pub mod routes;
pub use routes::make_router; pub use routes::make_router;
pub use pageserver_api::models;

View File

@@ -207,6 +207,62 @@ paths:
schema: schema:
$ref: "#/components/schemas/Error" $ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get LSN by a timestamp
parameters:
- name: timestamp
in: query
required: true
schema:
type: string
format: date-time
description: A timestamp to get the LSN
responses:
"200":
description: OK
content:
application/json:
schema:
type: string
"400":
description: Error when no tenant id found in path, no timeline id or invalid timestamp
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/attach: /v1/tenant/{tenant_id}/attach:
parameters: parameters:
- name: tenant_id - name: tenant_id

View File

@@ -12,6 +12,7 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo, StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineCreateRequest,
}; };
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::storage_sync; use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline}; use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant::{TenantState, Timeline}; use crate::tenant::{TenantState, Timeline};
@@ -265,6 +266,23 @@ fn query_param_present(request: &Request<Body>, param: &str) -> bool {
.unwrap_or(false) .unwrap_or(false)
} }
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
request.uri().query().map_or(
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|v| {
url::form_urlencoded::parse(v.as_bytes())
.into_owned()
.find(|(k, _)| k == param_name)
.map_or(
Err(ApiError::BadRequest(anyhow!(
"no {param_name} specified in query parameters"
))),
|(_, v)| Ok(v),
)
},
)
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> { async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -329,6 +347,33 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
} }
} }
async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let timeline = tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id))
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
let result = match timeline
.find_lsn_for_timestamp(timestamp_pg)
.map_err(ApiError::InternalServerError)?
{
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
json_response(StatusCode::OK, result)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create // TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> { async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
@@ -337,9 +382,16 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
info!("Handling tenant attach {tenant_id}"); info!("Handling tenant attach {tenant_id}");
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) { tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
Ok(_) => Err(ApiError::Conflict( Ok(tenant) => {
"Tenant is already present locally".to_owned(), if tenant.list_timelines().is_empty() {
)), info!("Attaching to tenant {tenant_id} with zero timelines");
Ok(())
} else {
Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
))
}
}
Err(_) => Ok(()), Err(_) => Ok(()),
}) })
.await .await
@@ -901,6 +953,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id", "/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler, timeline_detail_handler,
) )
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
get_lsn_by_timestamp_handler,
)
.put( .put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", "/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
testing_api!("run timeline GC", timeline_gc_handler), testing_api!("run timeline GC", timeline_gc_handler),

View File

@@ -12,7 +12,6 @@
use anyhow::{bail, ensure, Context, Result}; use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use regex::Regex;
use std::io; use std::io;
use std::net::TcpListener; use std::net::TcpListener;
use std::str; use std::str;
@@ -35,7 +34,6 @@ use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig}; use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar}; use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::profiling::profpoint_start; use crate::profiling::profpoint_start;
use crate::reltag::RelTag; use crate::reltag::RelTag;
use crate::task_mgr; use crate::task_mgr;
@@ -45,7 +43,6 @@ use crate::tenant_mgr;
use crate::CheckpointConfig; use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData // Wrapped in libpq CopyData
@@ -1062,33 +1059,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()), Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))? ]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("get_lsn_by_timestamp ") {
// Locate LSN of last transaction with timestamp less or equal than sppecified
// TODO lazy static
let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$")
.unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
let timestamp_pg = to_pg_timestamp(timestamp);
self.check_permission(Some(tenant_id))?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"lsn",
)]))?;
let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? {
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
pgb.write_message(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else { } else {
bail!("unknown command"); bail!("unknown command");
} }

View File

@@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task(
(storage, remote_index_clone, sync_queue), (storage, remote_index_clone, sync_queue),
max_sync_errors, max_sync_errors,
) )
.instrument(info_span!("storage_sync_loop"))
.await; .await;
Ok(()) Ok(())
}, },

View File

@@ -45,6 +45,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile; use crate::virtual_file::VirtualFile;
use crate::walredo::WalRedoManager; use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX}; use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
use toml_edit; use toml_edit;
use utils::{ use utils::{
@@ -118,18 +119,6 @@ pub struct Tenant {
upload_layers: bool, upload_layers: bool,
} }
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
/// A repository corresponds to one .neon directory. One repository holds multiple /// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'. /// timelines, forked off from the same initial call to 'initdb'.
impl Tenant { impl Tenant {
@@ -400,16 +389,19 @@ impl Tenant {
timeline_id, timeline_id,
metadata.pg_version() metadata.pg_version()
); );
let timeline = self let ancestor = metadata
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor) .ancestor_timeline()
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?; .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
match timelines_accessor.entry(timeline.timeline_id) { match timelines_accessor.entry(timeline_id) {
Entry::Occupied(_) => bail!( Entry::Occupied(_) => warn!(
"Found freshly initialized timeline {} in the tenant map", "Timeline {}/{} already exists in the tenant map, skipping its initialization",
timeline.timeline_id self.tenant_id, timeline_id
), ),
Entry::Vacant(v) => { Entry::Vacant(v) => {
let timeline = self
.initialize_new_timeline(timeline_id, metadata, ancestor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
v.insert(timeline); v.insert(timeline);
} }
} }
@@ -609,21 +601,14 @@ impl Tenant {
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
new_metadata: TimelineMetadata, new_metadata: TimelineMetadata,
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>, ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let ancestor = match new_metadata.ancestor_timeline() { if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
Some(ancestor_timeline_id) => Some( anyhow::ensure!(
timelines ancestor.is_some(),
.get(&ancestor_timeline_id) "Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
.cloned() )
.with_context(|| { }
format!(
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
})?,
),
None => None,
};
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn(); let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version(); let pg_version = new_metadata.pg_version();
@@ -1080,8 +1065,12 @@ impl Tenant {
) )
})?; })?;
let ancestor = new_metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
.cloned();
let new_timeline = self let new_timeline = self
.initialize_new_timeline(new_timeline_id, new_metadata, timelines) .initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
.with_context(|| { .with_context(|| {
format!( format!(
"Failed to initialize timeline {}/{}", "Failed to initialize timeline {}/{}",

View File

@@ -15,19 +15,25 @@ use crate::repository::Key;
use crate::tenant::inmemory_layer::InMemoryLayer; use crate::tenant::inmemory_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer; use crate::tenant::storage_layer::Layer;
use crate::tenant::storage_layer::{range_eq, range_overlaps}; use crate::tenant::storage_layer::{range_eq, range_overlaps};
use amplify_num::i256;
use anyhow::Result; use anyhow::Result;
use num_traits::identities::{One, Zero}; use std::collections::{BTreeMap, VecDeque};
use num_traits::{Bounded, Num, Signed};
use rstar::{RTree, RTreeObject, AABB};
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use std::ops::{Add, Div, Mul, Neg, Rem, Sub};
use std::sync::Arc; use std::sync::Arc;
use tracing::*; use tracing::*;
use utils::lsn::Lsn; use utils::lsn::Lsn;
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
struct BTreeKey {
lsn: Lsn,
seq: usize,
}
impl BTreeKey {
fn new(lsn: Lsn) -> BTreeKey {
BTreeKey { lsn, seq: 0 }
}
}
/// ///
/// LayerMap tracks what layers exist on a timeline. /// LayerMap tracks what layers exist on a timeline.
/// ///
@@ -53,165 +59,14 @@ pub struct LayerMap {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>, pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// All the historic layers are kept here /// All the historic layers are kept here
historic_layers: RTree<LayerRTreeObject>, historic_layers: BTreeMap<BTreeKey, Arc<dyn Layer>>,
layers_seqno: usize,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<dyn Layer>>, l0_delta_layers: Vec<Arc<dyn Layer>>,
} }
struct LayerRTreeObject {
layer: Arc<dyn Layer>,
}
// Representation of Key as numeric type.
// We can not use native implementation of i128, because rstar::RTree
// doesn't handle properly integer overflow during area calculation: sum(Xi*Yi).
// Overflow will cause panic in debug mode and incorrect area calculation in release mode,
// which leads to non-optimally balanced R-Tree (but doesn't fit correctness of R-Tree work).
// By using i256 as the type, even though all the actual values would fit in i128, we can be
// sure that multiplication doesn't overflow.
//
#[derive(Clone, PartialEq, Eq, PartialOrd, Debug)]
struct IntKey(i256);
impl Copy for IntKey {}
impl IntKey {
fn from(i: i128) -> Self {
IntKey(i256::from(i))
}
}
impl Bounded for IntKey {
fn min_value() -> Self {
IntKey(i256::MIN)
}
fn max_value() -> Self {
IntKey(i256::MAX)
}
}
impl Signed for IntKey {
fn is_positive(&self) -> bool {
self.0 > i256::ZERO
}
fn is_negative(&self) -> bool {
self.0 < i256::ZERO
}
fn signum(&self) -> Self {
match self.0.cmp(&i256::ZERO) {
Ordering::Greater => IntKey(i256::ONE),
Ordering::Less => IntKey(-i256::ONE),
Ordering::Equal => IntKey(i256::ZERO),
}
}
fn abs(&self) -> Self {
IntKey(self.0.abs())
}
fn abs_sub(&self, other: &Self) -> Self {
if self.0 <= other.0 {
IntKey(i256::ZERO)
} else {
IntKey(self.0 - other.0)
}
}
}
impl Neg for IntKey {
type Output = Self;
fn neg(self) -> Self::Output {
IntKey(-self.0)
}
}
impl Rem for IntKey {
type Output = Self;
fn rem(self, rhs: Self) -> Self::Output {
IntKey(self.0 % rhs.0)
}
}
impl Div for IntKey {
type Output = Self;
fn div(self, rhs: Self) -> Self::Output {
IntKey(self.0 / rhs.0)
}
}
impl Add for IntKey {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
IntKey(self.0 + rhs.0)
}
}
impl Sub for IntKey {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
IntKey(self.0 - rhs.0)
}
}
impl Mul for IntKey {
type Output = Self;
fn mul(self, rhs: Self) -> Self::Output {
IntKey(self.0 * rhs.0)
}
}
impl One for IntKey {
fn one() -> Self {
IntKey(i256::ONE)
}
}
impl Zero for IntKey {
fn zero() -> Self {
IntKey(i256::ZERO)
}
fn is_zero(&self) -> bool {
self.0 == i256::ZERO
}
}
impl Num for IntKey {
type FromStrRadixErr = <i128 as Num>::FromStrRadixErr;
fn from_str_radix(str: &str, radix: u32) -> Result<Self, Self::FromStrRadixErr> {
Ok(IntKey(i256::from(i128::from_str_radix(str, radix)?)))
}
}
impl PartialEq for LayerRTreeObject {
fn eq(&self, other: &Self) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn'
// references. Clippy complains about this. In practice it
// seems to work, the assertion below would be triggered
// otherwise but this ought to be fixed.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(&self.layer, &other.layer)
}
}
impl RTreeObject for LayerRTreeObject {
type Envelope = AABB<[IntKey; 2]>;
fn envelope(&self) -> Self::Envelope {
let key_range = self.layer.get_key_range();
let lsn_range = self.layer.get_lsn_range();
AABB::from_corners(
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
], // AABB::upper is inclusive, while `key_range.end` and `lsn_range.end` are exclusive
)
}
}
/// Return value of LayerMap::search /// Return value of LayerMap::search
pub struct SearchResult { pub struct SearchResult {
pub layer: Arc<dyn Layer>, pub layer: Arc<dyn Layer>,
@@ -234,23 +89,17 @@ impl LayerMap {
// linear search // linear search
// Find the latest image layer that covers the given key // Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<dyn Layer>> = None; let mut latest_img: Option<Arc<dyn Layer>> = None;
let mut latest_img_lsn: Option<Lsn> = None; let mut latest_img_lsn = Lsn(0);
let envelope = AABB::from_corners( let mut iter = self
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
[
IntKey::from(key.to_i128()),
IntKey::from(end_lsn.0 as i128 - 1),
],
);
for e in self
.historic_layers .historic_layers
.locate_in_envelope_intersecting(&envelope) .range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
{ while let Some((_key, l)) = iter.next_back() {
let l = &e.layer;
if l.is_incremental() { if l.is_incremental() {
continue; continue;
} }
assert!(l.get_key_range().contains(&key)); if !l.get_key_range().contains(&key) {
continue;
}
let img_lsn = l.get_lsn_range().start; let img_lsn = l.get_lsn_range().start;
assert!(img_lsn < end_lsn); assert!(img_lsn < end_lsn);
if Lsn(img_lsn.0 + 1) == end_lsn { if Lsn(img_lsn.0 + 1) == end_lsn {
@@ -260,23 +109,23 @@ impl LayerMap {
lsn_floor: img_lsn, lsn_floor: img_lsn,
})); }));
} }
if img_lsn > latest_img_lsn.unwrap_or(Lsn(0)) { latest_img = Some(Arc::clone(l));
latest_img = Some(Arc::clone(l)); latest_img_lsn = img_lsn;
latest_img_lsn = Some(img_lsn); break;
}
} }
// Search the delta layers // Search the delta layers
let mut latest_delta: Option<Arc<dyn Layer>> = None; let mut latest_delta: Option<Arc<dyn Layer>> = None;
for e in self let mut iter = self
.historic_layers .historic_layers
.locate_in_envelope_intersecting(&envelope) .range(BTreeKey::new(Lsn(0))..BTreeKey::new(end_lsn));
{ while let Some((_key, l)) = iter.next_back() {
let l = &e.layer;
if !l.is_incremental() { if !l.is_incremental() {
continue; continue;
} }
assert!(l.get_key_range().contains(&key)); if !l.get_key_range().contains(&key) {
continue;
}
if l.get_lsn_range().start >= end_lsn { if l.get_lsn_range().start >= end_lsn {
info!( info!(
"Candidate delta layer {}..{} is too new for lsn {}", "Candidate delta layer {}..{} is too new for lsn {}",
@@ -286,6 +135,9 @@ impl LayerMap {
); );
} }
assert!(l.get_lsn_range().start < end_lsn); assert!(l.get_lsn_range().start < end_lsn);
if l.get_lsn_range().end <= latest_img_lsn {
continue;
}
if l.get_lsn_range().end >= end_lsn { if l.get_lsn_range().end >= end_lsn {
// this layer contains the requested point in the key/lsn space. // this layer contains the requested point in the key/lsn space.
// No need to search any further // No need to search any further
@@ -311,10 +163,7 @@ impl LayerMap {
"found (old) layer {} for request on {key} at {end_lsn}", "found (old) layer {} for request on {key} at {end_lsn}",
l.filename().display(), l.filename().display(),
); );
let lsn_floor = std::cmp::max( let lsn_floor = std::cmp::max(Lsn(latest_img_lsn.0 + 1), l.get_lsn_range().start);
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
l.get_lsn_range().start,
);
Ok(Some(SearchResult { Ok(Some(SearchResult {
lsn_floor, lsn_floor,
layer: l, layer: l,
@@ -322,7 +171,7 @@ impl LayerMap {
} else if let Some(l) = latest_img { } else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}"); trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Ok(Some(SearchResult { Ok(Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(), lsn_floor: latest_img_lsn,
layer: l, layer: l,
})) }))
} else { } else {
@@ -338,7 +187,14 @@ impl LayerMap {
if layer.get_key_range() == (Key::MIN..Key::MAX) { if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone()); self.l0_delta_layers.push(layer.clone());
} }
self.historic_layers.insert(LayerRTreeObject { layer }); self.historic_layers.insert(
BTreeKey {
lsn: layer.get_lsn_range().start,
seq: self.layers_seqno,
},
layer,
);
self.layers_seqno += 1;
NUM_ONDISK_LAYERS.inc(); NUM_ONDISK_LAYERS.inc();
} }
@@ -360,10 +216,26 @@ impl LayerMap {
.retain(|other| !Arc::ptr_eq(other, &layer)); .retain(|other| !Arc::ptr_eq(other, &layer));
assert_eq!(self.l0_delta_layers.len(), len_before - 1); assert_eq!(self.l0_delta_layers.len(), len_before - 1);
} }
assert!(self let len_before = self.historic_layers.len();
.historic_layers #[allow(clippy::vtable_address_comparisons)]
.remove(&LayerRTreeObject { layer }) self.historic_layers
.is_some()); .retain(|_key, other| !Arc::ptr_eq(other, &layer));
if self.historic_layers.len() != len_before - 1 {
assert!(self.historic_layers.len() == len_before);
error!(
"Failed to remove {} layer: {}..{}__{}..{}",
if layer.is_incremental() {
"inremental"
} else {
"image"
},
layer.get_key_range().start,
layer.get_key_range().end,
layer.get_lsn_range().start,
layer.get_lsn_range().end
);
}
assert!(self.historic_layers.len() == len_before - 1);
NUM_ONDISK_LAYERS.dec(); NUM_ONDISK_LAYERS.dec();
} }
@@ -380,21 +252,10 @@ impl LayerMap {
loop { loop {
let mut made_progress = false; let mut made_progress = false;
let envelope = AABB::from_corners( for (_key, l) in self
[
IntKey::from(range_remain.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(range_remain.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
.historic_layers .historic_layers
.locate_in_envelope_intersecting(&envelope) .range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{ {
let l = &e.layer;
if l.is_incremental() { if l.is_incremental() {
continue; continue;
} }
@@ -417,39 +278,30 @@ impl LayerMap {
} }
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> { pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
self.historic_layers.iter().map(|e| e.layer.clone()) self.historic_layers
.iter()
.map(|(_key, layer)| layer.clone())
} }
/// Find the last image layer that covers 'key', ignoring any image layers /// Find the last image layer that covers 'key', ignoring any image layers
/// newer than 'lsn'. /// newer than 'lsn'.
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> { fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
let mut candidate_lsn = Lsn(0); let mut iter = self
let mut candidate = None;
let envelope = AABB::from_corners(
[IntKey::from(key.to_i128()), IntKey::from(0)],
[IntKey::from(key.to_i128()), IntKey::from(lsn.0 as i128)],
);
for e in self
.historic_layers .historic_layers
.locate_in_envelope_intersecting(&envelope) .range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1));
{ while let Some((_key, l)) = iter.next_back() {
let l = &e.layer;
if l.is_incremental() { if l.is_incremental() {
continue; continue;
} }
assert!(l.get_key_range().contains(&key)); if !l.get_key_range().contains(&key) {
let this_lsn = l.get_lsn_range().start;
assert!(this_lsn <= lsn);
if this_lsn < candidate_lsn {
// our previous candidate was better
continue; continue;
} }
candidate_lsn = this_lsn; let this_lsn = l.get_lsn_range().start;
candidate = Some(Arc::clone(l)); assert!(this_lsn <= lsn);
return Some(Arc::clone(l));
} }
None
candidate
} }
/// ///
@@ -466,18 +318,10 @@ impl LayerMap {
lsn: Lsn, lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> { ) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
let mut points = vec![key_range.start]; let mut points = vec![key_range.start];
let envelope = AABB::from_corners( for (_lsn, l) in self
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
[
IntKey::from(key_range.end.to_i128()),
IntKey::from(lsn.0 as i128),
],
);
for e in self
.historic_layers .historic_layers
.locate_in_envelope_intersecting(&envelope) .range(BTreeKey::new(Lsn(0))..BTreeKey::new(lsn + 1))
{ {
let l = &e.layer;
assert!(l.get_lsn_range().start <= lsn); assert!(l.get_lsn_range().start <= lsn);
let range = l.get_key_range(); let range = l.get_key_range();
if key_range.contains(&range.start) { if key_range.contains(&range.start) {
@@ -514,26 +358,17 @@ impl LayerMap {
if lsn_range.start >= lsn_range.end { if lsn_range.start >= lsn_range.end {
return Ok(0); return Ok(0);
} }
let envelope = AABB::from_corners( for (_lsn, l) in self
[
IntKey::from(key_range.start.to_i128()),
IntKey::from(lsn_range.start.0 as i128),
],
[
IntKey::from(key_range.end.to_i128() - 1),
IntKey::from(lsn_range.end.0 as i128 - 1),
],
);
for e in self
.historic_layers .historic_layers
.locate_in_envelope_intersecting(&envelope) .range(BTreeKey::new(lsn_range.start)..BTreeKey::new(lsn_range.end))
{ {
let l = &e.layer;
if !l.is_incremental() { if !l.is_incremental() {
continue; continue;
} }
if !range_overlaps(&l.get_key_range(), key_range) {
continue;
}
assert!(range_overlaps(&l.get_lsn_range(), lsn_range)); assert!(range_overlaps(&l.get_lsn_range(), lsn_range));
assert!(range_overlaps(&l.get_key_range(), key_range));
// We ignore level0 delta layers. Unless the whole keyspace fits // We ignore level0 delta layers. Unless the whole keyspace fits
// into one partition // into one partition
@@ -569,8 +404,8 @@ impl LayerMap {
} }
println!("historic_layers:"); println!("historic_layers:");
for e in self.historic_layers.iter() { for (_key, layer) in self.historic_layers.iter() {
e.layer.dump(verbose)?; layer.dump(verbose)?;
} }
println!("End dump LayerMap"); println!("End dump LayerMap");
Ok(()) Ok(())

View File

@@ -627,7 +627,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag); .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard); drop(tenant_conf_guard);
let self_clone = Arc::clone(self); let self_clone = Arc::clone(self);
let _ = spawn_connection_manager_task( spawn_connection_manager_task(
self.conf.broker_etcd_prefix.clone(), self.conf.broker_etcd_prefix.clone(),
self_clone, self_clone,
walreceiver_connect_timeout, walreceiver_connect_timeout,

View File

@@ -107,6 +107,13 @@ pub fn init_tenant_mgr(
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter. /// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken", /// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
/// and the load continues. /// and the load continues.
///
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
///
/// Attach happens on startup and sucessful timeline downloads
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
pub fn attach_local_tenants( pub fn attach_local_tenants(
conf: &'static PageServerConf, conf: &'static PageServerConf,
remote_index: &RemoteIndex, remote_index: &RemoteIndex,
@@ -122,18 +129,20 @@ pub fn attach_local_tenants(
); );
debug!("Timelines to attach: {local_timelines:?}"); debug!("Timelines to attach: {local_timelines:?}");
let tenant = load_local_tenant(conf, tenant_id, remote_index); let mut tenants_accessor = tenants_state::write_tenants();
{ let tenant = match tenants_accessor.entry(tenant_id) {
match tenants_state::write_tenants().entry(tenant_id) { hash_map::Entry::Occupied(o) => {
hash_map::Entry::Occupied(_) => { info!("Tenant {tenant_id} was found in pageserver's memory");
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state"); Arc::clone(o.get())
continue;
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::clone(&tenant));
}
} }
} hash_map::Entry::Vacant(v) => {
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
v.insert(Arc::clone(&tenant));
tenant
}
};
drop(tenants_accessor);
if tenant.current_state() == TenantState::Broken { if tenant.current_state() == TenantState::Broken {
warn!("Skipping timeline load for broken tenant {tenant_id}") warn!("Skipping timeline load for broken tenant {tenant_id}")
@@ -168,16 +177,28 @@ fn load_local_tenant(
remote_index.clone(), remote_index.clone(),
conf.remote_storage_config.is_some(), conf.remote_storage_config.is_some(),
)); ));
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => { let tenant_timelines_dir = conf.timelines_path(&tenant_id);
tenant.update_tenant_config(tenant_conf); if !tenant_timelines_dir.is_dir() {
tenant.activate(false); error!(
} "Tenant {} has no timelines directory at {}",
Err(e) => { tenant_id,
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}"); tenant_timelines_dir.display()
tenant.set_state(TenantState::Broken); );
tenant.set_state(TenantState::Broken);
} else {
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
} }
} }
tenant tenant
} }
@@ -625,14 +646,10 @@ fn collect_timelines_for_tenant(
} }
if tenant_timelines.is_empty() { if tenant_timelines.is_empty() {
match remove_if_empty(&timelines_dir) { // this is normal, we've removed all broken, empty and temporary timeline dirs
Ok(true) => info!( // but should allow the tenant to stay functional and allow creating new timelines
"Removed empty tenant timelines directory {}", // on a restart, we require tenants to have the timelines dir, so leave it on disk
timelines_dir.display() debug!("Tenant {tenant_id} has no timelines loaded");
),
Ok(false) => (),
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
}
} }
Ok((tenant_id, tenant_timelines)) Ok((tenant_id, tenant_timelines))

View File

@@ -12,6 +12,8 @@ use chrono::{NaiveDateTime, Utc};
use fail::fail_point; use fail::fail_point;
use futures::StreamExt; use futures::StreamExt;
use postgres::{SimpleQueryMessage, SimpleQueryRow}; use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_protocol::message::backend::ReplicationMessage; use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn; use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time}; use tokio::{pin, select, sync::watch, time};
@@ -156,6 +158,14 @@ pub async fn handle_walreceiver_connection(
// There might be some padding after the last full record, skip it. // There might be some padding after the last full record, skip it.
startpoint += startpoint.calc_padding(8u32); startpoint += startpoint.calc_padding(8u32);
// If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
// for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
//. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
// but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
// header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
// to the safekeepers.
startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}..."); info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}"); let query = format!("START_REPLICATION PHYSICAL {startpoint}");

View File

@@ -129,13 +129,13 @@ mod tests {
assert!(CANCEL_MAP.contains(&session)); assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send"); tx.send(()).expect("failed to send");
let () = futures::future::pending().await; // sleep forever futures::future::pending::<()>().await; // sleep forever
Ok(()) Ok(())
})); }));
// Wait until the task has been spawned. // Wait until the task has been spawned.
let () = rx.await.context("failed to hear from the task")?; rx.await.context("failed to hear from the task")?;
// Drop the session's entry by cancelling the task. // Drop the session's entry by cancelling the task.
task.abort(); task.abort();

View File

@@ -5,6 +5,7 @@ filterwarnings =
ignore:record_property is incompatible with junit_family:pytest.PytestWarning ignore:record_property is incompatible with junit_family:pytest.PytestWarning
addopts = addopts =
-m 'not remote_cluster' -m 'not remote_cluster'
--ignore=test_runner/performance
markers = markers =
remote_cluster remote_cluster
testpaths = testpaths =

View File

@@ -1,11 +1,10 @@
[toolchain] [toolchain]
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people # We try to stick to a toolchain version that is widely available on popular distributions, so that most people
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later # can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
# version, we can consider updating. As of this writing, 1.60 is available on Debian 'experimental' but not yet on # version, we can consider updating.
# 'testing' or even 'unstable', which is a bit more cutting-edge than we'd like. Hopefully the 1.60 packages reach # See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package,
# 'testing' soon (and similarly for the other distributions). # we use "unstable" version number as the highest version used in the project by default.
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package. channel = "1.61" # do update GitHub CI cache values for rust builds, when changing this value
channel = "1.60" # do update GitHub CI cache values for rust builds, when changing this value
profile = "default" profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy. # The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html # https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -33,6 +33,7 @@ toml_edit = { version = "0.13", features = ["easy"] }
thiserror = "1" thiserror = "1"
parking_lot = "0.12.1" parking_lot = "0.12.1"
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" } postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" } metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" } utils = { path = "../libs/utils" }

View File

@@ -291,9 +291,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
// Register metrics collector for active timelines. It's important to do this // Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset. // after daemonizing, otherwise process collector will be upset.
let registry = metrics::default_registry();
let timeline_collector = safekeeper::metrics::TimelineCollector::new(); let timeline_collector = safekeeper::metrics::TimelineCollector::new();
registry.register(Box::new(timeline_collector))?; metrics::register_internal(Box::new(timeline_collector))?;
let signals = signals::install_shutdown_handlers()?; let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![]; let mut threads = vec![];

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes; pub mod routes;
pub use routes::make_router; pub use routes::make_router;
pub use safekeeper_api::models;

View File

@@ -27,14 +27,13 @@ mod timelines_global_map;
pub use timelines_global_map::GlobalTimelines; pub use timelines_global_map::GlobalTimelines;
pub mod defaults { pub mod defaults {
use const_format::formatcp;
use std::time::Duration; use std::time::Duration;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454; pub use safekeeper_api::{
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8; pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
} }

View File

@@ -56,6 +56,14 @@ If you want to run all tests that have the string "bench" in their names:
`./scripts/pytest -k bench` `./scripts/pytest -k bench`
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
`./scripts/pytest -n4`
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
`./scripts/pytest test_runner/performance`
Useful environment variables: Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found. `NEON_BIN`: The directory where neon binaries can be found.

View File

@@ -455,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs" LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3" MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3" REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]: def available_remote_storages() -> List[RemoteStorageKind]:
@@ -583,7 +586,9 @@ class NeonEnvBuilder:
test_name: str, test_name: str,
force_enable: bool = True, force_enable: bool = True,
): ):
if remote_storage_kind == RemoteStorageKind.LOCAL_FS: if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable) self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3: elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable) self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
@@ -1131,6 +1136,19 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None assert res_json is None
return res_json return res_json
def timeline_get_lsn_by_timestamp(
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put( res = self.put(
@@ -1182,6 +1200,7 @@ class AbstractNeonCli(abc.ABC):
arguments: List[str], arguments: List[str],
extra_env_vars: Optional[Dict[str, str]] = None, extra_env_vars: Optional[Dict[str, str]] = None,
check_return_code=True, check_return_code=True,
timeout=None,
) -> "subprocess.CompletedProcess[str]": ) -> "subprocess.CompletedProcess[str]":
""" """
Run the command with the specified arguments. Run the command with the specified arguments.
@@ -1228,6 +1247,7 @@ class AbstractNeonCli(abc.ABC):
universal_newlines=True, universal_newlines=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
timeout=timeout,
) )
if not res.returncode: if not res.returncode:
log.info(f"Run success: {res.stdout}") log.info(f"Run success: {res.stdout}")
@@ -1601,6 +1621,14 @@ class WalCraft(AbstractNeonCli):
res.check_returncode() res.check_returncode()
class ComputeCtl(AbstractNeonCli):
"""
A typed wrapper around the `compute_ctl` CLI tool.
"""
COMMAND = "compute_ctl"
class NeonPageserver(PgProtocol): class NeonPageserver(PgProtocol):
""" """
An object representing a running pageserver. An object representing a running pageserver.

View File

@@ -6,6 +6,8 @@ from typing import List
import pytest import pytest
from fixtures.log_helper import log from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix from performance.test_perf_pgbench import get_scales_matrix
@@ -88,3 +90,39 @@ def test_branching_with_pgbench(
for pg in pgs: for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts") res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,) assert res[0] == (100000 * scale,)
# Test branching from an "unnormalized" LSN.
#
# Context:
# When doing basebackup for a newly created branch, pageserver generates
# 'pg_control' file to bootstrap WAL segment by specifying the redo position
# a "normalized" LSN based on the timeline's starting LSN:
#
# checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0;
#
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
#
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
XLOG_BLCKSZ = 8192
env = neon_simple_env
env.neon_cli.create_branch("b0")
pg0 = env.postgres.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
with pg0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
# and is smaller than `curr_lsn`.
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
pg1 = env.postgres.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])

View File

@@ -0,0 +1,203 @@
import os
from subprocess import TimeoutExpired
from fixtures.log_helper import log
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(pg.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
if "=" in line:
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = pg.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
pg.stop_and_destroy()
spec = (
"""
{
"format_version": 1.0,
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Neon Test",
"state": "restarted",
"roles": [
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": """
+ f'"{cfg_map["neon.safekeepers"]}"'
+ """,
"vartype": "string"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "32768",
"vartype": "integer"
},
{
"name": "port",
"value": """
+ f'"{cfg_map["port"]}"'
+ """,
"vartype": "integer"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "wal_sender_timeout",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "neon.tenant_id",
"value": """
+ f'"{cfg_map["neon.tenant_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.timeline_id",
"value": """
+ f'"{cfg_map["neon.timeline_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": """
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
+ """,
"vartype": "string"
}
]
},
"delta_operations": [
]
}
"""
)
ps_connstr = cfg_map["neon.pageserver_connstring"]
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
# run compute_ctl and wait for 10s
try:
ctl.raw_cli(
["--connstr", ps_connstr, "--pgdata", pgdata, "--spec", spec, "--pgbin", pg_bin_path],
timeout=10,
)
except TimeoutExpired as exc:
ctl_logs = exc.stderr.decode("utf-8")
log.info("compute_ctl output:\n" + ctl_logs)
start = "starting safekeepers syncing"
end = "safekeepers synced at LSN"
start_pos = ctl_logs.index(start)
assert start_pos != -1
end_pos = ctl_logs.index(end, start_pos)
assert end_pos != -1
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
# assert that --sync-safekeepers logs are present in the output
assert "connecting with node" in sync_safekeepers_logs
assert "connected with node" in sync_safekeepers_logs
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
assert "got votes from majority (2)" in sync_safekeepers_logs
assert "sending elected msg to node" in sync_safekeepers_logs

View File

@@ -15,7 +15,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start("test_lsn_mapping") pgmain = env.postgres.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch") log.info("postgres is running on 'test_lsn_mapping' branch")
ps_cur = env.pageserver.connect().cursor()
cur = pgmain.connect().cursor() cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction # Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster. # Disable synchronous_commit to make this initialization go faster.
@@ -38,37 +37,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Wait until WAL is received by pageserver # Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id) wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
# Check edge cases: timestamp in the future with env.pageserver.http_client() as client:
probe_timestamp = tbl[-1][1] + timedelta(hours=1) # Check edge cases: timestamp in the future
result = query_scalar( probe_timestamp = tbl[-1][1] + timedelta(hours=1)
ps_cur, result = client.timeline_get_lsn_by_timestamp(
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'", env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result == "future"
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
)
assert result == "past"
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
lsn = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
) )
assert result == "future"
# Launch a new read-only node at that LSN, and check that only the rows # timestamp too the far history
# that were supposed to be committed at that point in time are visible. probe_timestamp = tbl[0][1] - timedelta(hours=10)
pg_here = env.postgres.create_start( result = client.timeline_get_lsn_by_timestamp(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
) )
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i assert result == "past"
pg_here.stop_and_destroy() # Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
lsn = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
pg_here.stop_and_destroy()

View File

@@ -1,4 +1,5 @@
import os import os
import shutil
from contextlib import closing from contextlib import closing
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@@ -7,8 +8,13 @@ from typing import List
import pytest import pytest
from fixtures.log_helper import log from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.neon_fixtures import (
from fixtures.types import Lsn, TenantId NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from prometheus_client.samples import Sample from prometheus_client.samples import Sample
@@ -201,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)]) post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert post_detach_samples == set() assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_with_empty_tenants",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_without_timelines_dir = env.initial_tenant
log.info(
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
tenant_with_empty_timelines_dir = client.tenant_create()
log.info(
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
)
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline reinitialization after pageserver restart
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert (
len(tenants) == 1
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
loaded_tenant = tenants[0]
assert loaded_tenant["id"] == str(
tenant_with_empty_timelines_dir
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Empty tenant should be loaded and ready for timeline creation"

View File

@@ -7,19 +7,25 @@
# #
import asyncio import asyncio
import os
from pathlib import Path
from typing import List, Tuple from typing import List, Tuple
import pytest import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import ( from fixtures.neon_fixtures import (
NeonEnv, NeonEnv,
NeonEnvBuilder, NeonEnvBuilder,
NeonPageserverHttpClient,
Postgres, Postgres,
RemoteStorageKind, RemoteStorageKind,
available_remote_storages, available_remote_storages,
wait_for_last_record_lsn, wait_for_last_record_lsn,
wait_for_upload, wait_for_upload,
wait_until,
) )
from fixtures.types import Lsn, TenantId, TimelineId from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
async def tenant_workload(env: NeonEnv, pg: Postgres): async def tenant_workload(env: NeonEnv, pg: Postgres):
@@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
# run final checkpoint manually to flush all the data to remote storage # run final checkpoint manually to flush all the data to remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id) pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_tenants_attached_after_download(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="remote_storage_kind",
)
data_id = 1
data_secret = "very secret secret"
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
for checkpoint_number in range(1, 3):
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(path)
local_layer_deleted = True
break
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
env.pageserver.start()
client = env.pageserver.http_client()
wait_until(
number_of_iterations=5,
interval=1,
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
)
restored_timelines = client.timeline_list(tenant_id)
assert (
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
retored_timeline = restored_timelines[0]
assert retored_timeline["timeline_id"] == str(
timeline_id
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
def expect_tenant_to_download_timeline(
client: NeonPageserverHttpClient,
tenant_id: TenantId,
):
for tenant in client.tenant_list():
if tenant["id"] == str(tenant_id):
assert not tenant.get(
"has_in_progress_downloads", True
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"

View File

@@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] } bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
bytes = { version = "1", features = ["serde", "std"] } bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] } chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] } either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] } hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }