Compare commits

..

33 Commits

Author SHA1 Message Date
Bojan Serafimov
781897b983 Don't run perf tests in parallel 2022-10-11 10:05:25 -04:00
Lassi Pölönen
e520293090 Add build info metric to pageserver, safekeeper and proxy (#2596)
* Test that we emit build info metric for pageserver, safekeeper and proxy with some non-zero length revision label

* Emit libmetrics_build_info on startup of pageserver, safekeeper and
proxy with label "revision" which tells the git revision.
2022-10-11 09:54:32 +03:00
Sergey Melnikov
241e549757 Switch neon-stress etcd to dedicatd instance (#2602) 2022-10-10 22:07:19 +00:00
Sergey Melnikov
34bea270f0 Fix POSTGRES_DISTRIB_DIR for benchmarks on ec2 runner (#2594) 2022-10-10 09:12:50 +00:00
Kirill Bulatov
13f0e7a5b4 Deploy pageserver_binutils to the envs 2022-10-09 08:21:11 +03:00
Kirill Bulatov
3e35f10adc Add a script to reformat the project 2022-10-09 08:21:11 +03:00
Kirill Bulatov
3be3bb7730 Be more verbose with initdb for pageserver timeline creation 2022-10-09 08:21:11 +03:00
Kirill Bulatov
01d2c52c82 Tidy up feature reporting 2022-10-09 08:21:11 +03:00
Kirill Bulatov
9f79e7edea Merge pageserver helper binaries and provide it for deployment (#2590) 2022-10-08 12:42:17 +00:00
Heikki Linnakangas
a22165d41e Add tests for comparing root and child branch performance.
Author: Thang Pham <thang@neon.tech>
2022-10-08 10:07:33 +03:00
Arseny Sher
725be60bb7 Storage messaging rfc 2. 2022-10-07 21:22:17 +04:00
Dmitry Ivanov
e516c376d6 [proxy] Improve logging (#2554)
* [proxy] Use `tracing::*` instead of `println!` for logging

* Fix a minor misnomer

* Log more stuff
2022-10-07 14:34:57 +03:00
Kirill Bulatov
8e51c27e1a Restore artifact versions (#2578)
Context: https://github.com/neondatabase/neon/pull/2128/files#r989489965

Co-authored-by: Rory de Zoete <rory@neon.tech>
2022-10-07 10:58:31 +00:00
Heikki Linnakangas
9e1eb69d55 Increase default compaction_period setting to 20 s.
The previous default of 1 s caused excessive CPU usage when there were
a lot of projects. Polling every timeline once a second was too aggressive
so let's reduce it.

Fixes https://github.com/neondatabase/neon/issues/2542, but we
probably also want do to something so that we don't poll timelines
that have received no new WAL or layers since last check.
2022-10-07 13:55:19 +03:00
Arthur Petukhovsky
687ba81366 Display sync safekeepers output in compute_ctl (#2571)
Pipe postgres output to compute_ctl stdout and create a test to check that compute_ctl works and prints postgres logs.
2022-10-06 13:53:52 +00:00
Andrés
47bae68a2e Make get_lsn_by_timestamp available in mgmt API (#2536) (#2560)
Co-authored-by: andres <andres.rodriguez@outlook.es>
2022-10-06 12:42:50 +03:00
Joonas Koivunen
e8b195acb7 fix: apply notify workaround on m1 mac docker (#2564)
workaround as discussed in the notify repository.
2022-10-06 11:13:40 +03:00
Anastasia Lubennikova
254cb7dc4f Update CI script to push compute-node-v15 to dockerhub 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
ed85d97f17 bump vendor/postgres-v15. Rebase it to Stamp 15rc2 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
4a216c5f7f Use PostGIS 3.3.1 that is compatible with pg 15 2022-10-06 10:50:08 +03:00
Anastasia Lubennikova
c5a428a61a Update Dockerfile.compute-node-v15 to match v14 version.
Fix build script to promote the image for v15 to neon dockerhub
2022-10-06 10:50:08 +03:00
Konstantin Knizhnik
ff8c481777 Normalize last_record LSN in wal receiver (#2529)
* Add test for branching on page boundary

* Normalize start recovery point

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>

Co-authored-by:  Thang Pham <thang@neon.tech>
2022-10-06 09:01:56 +03:00
Arthur Petukhovsky
f25dd75be9 Fix deadlock in safekeeper metrics (#2566)
We had a problem where almost all of the threads were waiting on a futex syscall. More specifically:
- `/metrics` handler was inside `TimelineCollector::collect()`, waiting on a mutex for a single Timeline
- This exact timeline was inside `control_file::FileStorage::persist()`, waiting on a mutex for Lazy initialization of `PERSIST_CONTROL_FILE_SECONDS`
- `PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram>` was blocked on `prometheus::register`
- `prometheus::register` calls `DEFAULT_REGISTRY.write().register()` to take a write lock on Registry and add a new metric
- `DEFAULT_REGISTRY` lock was already taken inside `DEFAULT_REGISTRY.gather()`, which was called by `/metrics` handler to collect all metrics

This commit creates another Registry with a separate lock, to avoid deadlock in a case where `TimelineCollector` triggers registration of new metrics inside default registry.
2022-10-06 01:07:02 +03:00
Sergey Melnikov
b99bed510d Move proxies to neon-proxy namespace (#2555) 2022-10-05 16:14:09 +03:00
sharnoff
580584c8fc Remove control_plane deps on pageserver/safekeeper (#2513)
Creates new `pageserver_api` and `safekeeper_api` crates to serve as the
shared dependencies. Should reduce both recompile times and cold compile
times.

Decreases the size of the optimized `neon_local` binary: 380M -> 179M.
No significant changes for anything else (mostly as expected).
2022-10-04 11:14:45 -07:00
Kirill Bulatov
d823e84ed5 Allow attaching tenants with zero timelines 2022-10-04 18:13:51 +03:00
Kirill Bulatov
231dfbaed6 Do not remove empty timelines/ directory for tenants 2022-10-04 18:13:51 +03:00
Dmitry Rodionov
5cf53786f9 Improve pytest ergonomics
1. Disable perf tests by default
2. Add instruction to run tests in parallel
2022-10-04 14:53:01 +03:00
Heikki Linnakangas
9b9bbad462 Use 'notify' crate to wait for PostgreSQL startup.
Compute node startup time is very important. After launching
PostgreSQL, use 'notify' to be notified immediately when it has
updated the PID file, instead of polling. The polling loop had 100 ms
interval so this shaves up to 100 ms from the startup time.
2022-10-04 13:00:15 +03:00
Heikki Linnakangas
537b2c1ae6 Remove unnecessary check for open PostgreSQL TCP port.
The loop checked if the TCP port is open for connections, by trying to
connect to it. That seems unnecessary. By the time the postmaster.pid
file says that it's ready, the port should be open. Remove that check.
2022-10-04 12:09:13 +03:00
Joonas Koivunen
31123d1fa8 Silence clippies, minor doc fix (#2543)
* doc: remove stray backtick

* chore: clippy::let_unit_value

* chore: silence useless_transmute, duplicate_mod

* chore: remove allowing deref_nullptr

not needed since bindgen 0.60.0.

* chore: remove repeated allowed lints

they are already allowed from the crate root.
2022-10-03 17:44:17 +03:00
Kirill Bulatov
4f2ac51bdd Bump rustc to 1.61 2022-10-03 16:36:03 +03:00
Kirill Bulatov
7b2f9dc908 Reuse existing tenants during attach (#2540) 2022-10-03 13:33:55 +03:00
86 changed files with 1725 additions and 553 deletions

View File

@@ -23,6 +23,7 @@ docker cp ${ID}:/data/postgres_install.tar.gz .
tar -xzf postgres_install.tar.gz -C neon_install
mkdir neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver_binutils neon_install/bin/
docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/
docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/
docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/

View File

@@ -16,5 +16,5 @@ env_name = neon-stress
console_mgmt_base_url = http://neon-stress-console.local
bucket_name = neon-storage-ireland
bucket_region = eu-west-1
etcd_endpoints = etcd-stress.local:2379
etcd_endpoints = neon-stress-etcd.local:2379
safekeeper_enable_s3_offload = false

View File

@@ -46,7 +46,7 @@ jobs:
runs-on: [self-hosted, zenith-benchmarker]
env:
POSTGRES_DISTRIB_DIR: /tmp/pg_install
POSTGRES_DISTRIB_DIR: /usr/pgsql
DEFAULT_PG_VERSION: 14
steps:
@@ -149,6 +149,7 @@ jobs:
strategy:
fail-fast: false
max-parallel: 1
matrix:
# neon-captest-new: Run pgbench in a freshly created project
# neon-captest-reuse: Same, but reusing existing project

View File

@@ -127,8 +127,8 @@ jobs:
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-
- name: Cache postgres v14 build
id: cache_pg_14
@@ -389,7 +389,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git/
target/
key: v8-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
key: v9-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact
uses: ./.github/actions/download
@@ -494,7 +494,7 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build neon
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
compute-tools-image:
runs-on: dev
@@ -508,7 +508,7 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute tools
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
compute-node-image:
runs-on: dev
@@ -527,7 +527,7 @@ jobs:
# cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated
- name: Kaniko build compute node with extensions v14 (compatibility)
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
compute-node-image-v14:
runs-on: dev
@@ -543,7 +543,7 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v14
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID
compute-node-image-v15:
@@ -560,11 +560,11 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v15
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID
promote-images:
runs-on: dev
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-tools-image ]
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli
strategy:
@@ -573,7 +573,7 @@ jobs:
# compute-node uses postgres 14, which is default now
# cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated
name: [ neon, compute-node, compute-node-v14, compute-tools ]
name: [ neon, compute-node, compute-node-v14, compute-node-v15, compute-tools ]
steps:
- name: Promote image to latest
@@ -608,6 +608,9 @@ jobs:
- name: Pull compute node v14 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
- name: Pull compute node v15 image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
@@ -638,6 +641,9 @@ jobs:
- name: Push compute node v14 image to Docker Hub
run: crane push compute-node-v14 neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}}
- name: Push compute node v15 image to Docker Hub
run: crane push compute-node-v15 neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}}
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
@@ -650,6 +656,7 @@ jobs:
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
calculate-deploy-targets:
runs-on: [ self-hosted, Linux, k8s-runner ]
@@ -768,5 +775,5 @@ jobs:
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -106,7 +106,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git
target
key: v4-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
key: v5-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust
- name: Run cargo clippy
run: ./run_clippy.sh

101
Cargo.lock generated
View File

@@ -497,8 +497,10 @@ dependencies = [
"chrono",
"clap 3.2.16",
"env_logger",
"futures",
"hyper",
"log",
"notify",
"postgres",
"regex",
"serde",
@@ -540,11 +542,11 @@ dependencies = [
"git-version",
"nix",
"once_cell",
"pageserver",
"pageserver_api",
"postgres",
"regex",
"reqwest",
"safekeeper",
"safekeeper_api",
"serde",
"serde_with",
"tar",
@@ -1072,6 +1074,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures"
version = "0.3.21"
@@ -1493,6 +1504,26 @@ dependencies = [
"str_stack",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -1552,6 +1583,26 @@ dependencies = [
"simple_asn1",
]
[[package]]
name = "kqueue"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "kstring"
version = "1.0.6"
@@ -1797,6 +1848,24 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -1975,6 +2044,7 @@ dependencies = [
"nix",
"num-traits",
"once_cell",
"pageserver_api",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -2003,6 +2073,17 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -2371,6 +2452,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"atty",
"base64",
"bstr",
"bytes",
@@ -2404,6 +2486,8 @@ dependencies = [
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls",
"tracing",
"tracing-subscriber",
"url",
"utils",
"uuid",
@@ -2891,6 +2975,7 @@ dependencies = [
"postgres_ffi",
"regex",
"remote_storage",
"safekeeper_api",
"serde",
"serde_json",
"serde_with",
@@ -2906,6 +2991,17 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "safekeeper_api"
version = "0.1.0"
dependencies = [
"const_format",
"serde",
"serde_with",
"utils",
"workspace_hack",
]
[[package]]
name = "same-file"
version = "1.0.6"
@@ -4142,6 +4238,7 @@ dependencies = [
"bstr",
"bytes",
"chrono",
"crossbeam-utils",
"either",
"fail",
"hashbrown",

View File

@@ -44,7 +44,7 @@ COPY . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& mold -run cargo build --bin pageserver --bin safekeeper --bin proxy --locked --release \
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin safekeeper --bin proxy --locked --release \
&& cachepot -s
# Build final image
@@ -63,9 +63,10 @@ RUN set -e \
&& useradd -d /data neon \
&& chown -R neon:neon /data
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/
@@ -85,4 +86,3 @@ VOLUME ["/data"]
USER neon
EXPOSE 6400
EXPOSE 9898
CMD ["/bin/bash"]

View File

@@ -57,31 +57,6 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_tiger_geocoder.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgis_topology.control
# Layer "pg-stat-contribs-build"
# Build pg_stat_statements, pg_wait_sampling, pg_query_state
FROM build-deps AS pg-query-state-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/postgrespro/pg_query_state/archive/refs/heads/master.tar.gz && \
tar xvzf master.tar.gz && \
cd pg_query_state-master && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
USE_PGXS=true PG_CONFIG=/usr/local/pgsql/bin/pg_config make -j $(getconf _NPROCESSORS_ONLN) && \
USE_PGXS=true PG_CONFIG=/usr/local/pgsql/bin/pg_config make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_query_state.control
FROM build-deps AS pg-wait-sampling-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/postgrespro/pg_wait_sampling/archive/refs/heads/master.tar.gz && \
tar xvzf master.tar.gz && \
cd pg_wait_sampling-master && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
USE_PGXS=true PG_CONFIG=/usr/local/pgsql/bin/pg_config make -j $(getconf _NPROCESSORS_ONLN) && \
USE_PGXS=true PG_CONFIG=/usr/local/pgsql/bin/pg_config make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_wait_sampling.control
#
# Layer "plv8-build"
# Build plv8
@@ -141,8 +116,6 @@ RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-query-state-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-wait-sampling-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/

View File

@@ -5,7 +5,7 @@
ARG TAG=pinned
# apparently, ARGs don't get replaced in RUN commands in kaniko
# ARG POSTGIS_VERSION=3.3.0
# ARG POSTGIS_VERSION=3.3.1
# ARG PLV8_VERSION=3.1.4
# ARG PG_VERSION=v15
@@ -13,9 +13,12 @@ ARG TAG=pinned
# Layer "build-deps"
#
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
#
# Layer "pg-build"
@@ -42,11 +45,11 @@ RUN cd postgres && \
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
tar xvzf postgis-3.3.0.tar.gz && \
cd postgis-3.3.0 && \
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.1.tar.gz && \
tar xvzf postgis-3.3.1.tar.gz && \
cd postgis-3.3.1 && \
./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure && \
@@ -64,15 +67,13 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
# Build plv8
#
FROM build-deps AS plv8-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
# https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -84,12 +85,46 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#
# Layer "h3-pg-build"
# Build h3_pg
#
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
cd h3-4.0.1 && \
mkdir build && \
cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/h3 make install && \
cp -R /h3/usr / && \
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
tar xvzf h3-pg.tgz && \
cd h3-pg-4.0.1 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -137,8 +172,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
chmod 0750 /var/db/postgres/compute && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# TODO: Check if we can make the extension setup more modular versus a linear build
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -116,21 +116,6 @@ postgres-v14: postgres-v14-configure \
+@echo "Compiling pageinspect v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pageinspect install
.PHONY: pg_wait_sampling-v14
pg_wait_sampling-v14: postgres-v14-configure
+@echo "Compiling pg_wait_sampling v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_wait_sampling install
.PHONY: pg_stat_statements-v14
pg_stat_statements-v14: postgres-v14-configure
+@echo "Compiling pg_stat_statements v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_stat_statements install
.PHONY: pg_query_state-v14
pg_query_state-v14: postgres-v14-configure
+@echo "Compiling pg_query_state v14"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/v14/contrib/pg_query_state install
.PHONY: postgres-v15
postgres-v15: postgres-v15-configure \
postgres-v15-headers # to prevent `make install` conflicts with neon's `postgres-headers`

View File

@@ -8,8 +8,10 @@ anyhow = "1.0"
chrono = "0.4"
clap = "3.0"
env_logger = "0.9"
futures = "0.3.13"
hyper = { version = "0.14", features = ["full"] }
log = { version = "0.4", features = ["std", "serde"] }
notify = "5.0.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -26,7 +26,6 @@ use chrono::{DateTime, Utc};
use log::info;
use postgres::{Client, NoTls};
use serde::{Serialize, Serializer};
use tokio_postgres;
use crate::checker::create_writablity_check_data;
use crate::config;
@@ -51,19 +50,6 @@ pub struct ComputeNode {
pub state: RwLock<ComputeState>,
}
#[derive(Debug, Serialize)]
pub struct InsightsRow {
pub query: String,
pub total_runtime: String,
pub mean_runtime: String,
}
#[derive(Serialize)]
pub struct Insights {
count: u64,
statements: Vec<InsightsRow>,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
@@ -192,7 +178,6 @@ impl ComputeNode {
.args(&["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("postgres --sync-safekeepers failed to start");
@@ -205,10 +190,10 @@ impl ComputeNode {
if !sync_output.status.success() {
anyhow::bail!(
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}, stderr: {}",
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
sync_output.status,
String::from_utf8(sync_output.stdout).expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
String::from_utf8(sync_output.stderr).expect("postgres --sync-safekeepers exited, and stderr is not utf-8"),
String::from_utf8(sync_output.stdout)
.expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
);
}
@@ -272,14 +257,7 @@ impl ComputeNode {
.spawn()
.expect("cannot start postgres process");
// Try default Postgres port if it is not provided
let port = self
.spec
.cluster
.settings
.find("port")
.unwrap_or_else(|| "5432".to_string());
wait_for_postgres(&mut pg, &port, pgdata_path)?;
wait_for_postgres(&mut pg, pgdata_path)?;
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
@@ -310,7 +288,6 @@ impl ComputeNode {
Ok(client) => client,
};
create_system_extensions(&mut client)?;
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(self, &mut client)?;
@@ -366,38 +343,4 @@ impl ComputeNode {
self.prepare_pgdata()?;
self.run()
}
pub async fn collect_insights(&self) -> String {
let mut result_rows: Vec<String> = Vec::new();
let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
let (client, connection) = connect_result.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
for message in (client
.simple_query(
"
SELECT
row_to_json(pg_stat_statements)
FROM
pg_stat_statements
WHERE
userid != 'cloud_admin'::regrole::oid
ORDER BY
(mean_exec_time + mean_plan_time) DESC
LIMIT 100",
)
.await)
.unwrap()
.iter()
{
if let postgres::SimpleQueryMessage::Row(row) = message {
result_rows.push(row.get(0).unwrap().to_string());
}
}
format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
}
}

View File

@@ -46,13 +46,6 @@ async fn routes(req: Request<Body>, compute: Arc<ComputeNode>) -> Response<Body>
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
}
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");
let insights = compute.collect_insights().await;
Response::new(Body::from(insights))
}
// DEPRECATED, use POST instead
(&Method::GET, "/check_writability") => {
info!("serving /check_writability GET request");

View File

@@ -1,18 +1,19 @@
use std::fmt::Write;
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::net::{SocketAddr, TcpStream};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Child;
use std::str::FromStr;
use std::{fs, thread, time};
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use postgres::{Client, Transaction};
use serde::Deserialize;
const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds
use notify::{RecursiveMode, Watcher};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
@@ -61,9 +62,7 @@ impl GenericOption {
/// Represent `GenericOption` as configuration option.
pub fn to_pg_setting(&self) -> String {
if let Some(val) = self.value.as_ref() {
// XXX: such overrides don't look as they should be here, but I cannot find a better place now
// Probably, we need to add some override pass before the first compute start steps.
if let Some(val) = &self.value {
let name = match self.name.as_str() {
"safekeepers" => "neon.safekeepers",
"wal_acceptor_reconnect" => "neon.safekeeper_reconnect_timeout",
@@ -71,14 +70,9 @@ impl GenericOption {
it => it,
};
let mut res_val = val.to_owned();
if self.name == "shared_preload_libraries" && !res_val.contains("pg_stat_statements") {
res_val = format!("{},pg_stat_statements", res_val)
}
match self.vartype.as_ref() {
"string" => format!("{} = '{}'", name, res_val),
_ => format!("{} = {}", name, res_val),
"string" => format!("{} = '{}'", name, val),
_ => format!("{} = {}", name, val),
}
} else {
self.name.to_owned()
@@ -237,52 +231,112 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
Ok(postgres_dbs)
}
/// Wait for Postgres to become ready to accept connections:
/// - state should be `ready` in the `pgdata/postmaster.pid`
/// - and we should be able to connect to 127.0.0.1:5432
pub fn wait_for_postgres(pg: &mut Child, port: &str, pgdata: &Path) -> Result<()> {
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");
let mut slept: u64 = 0; // ms
let pause = time::Duration::from_millis(100);
let timeout = time::Duration::from_millis(10);
let addr = SocketAddr::from_str(&format!("127.0.0.1:{}", port)).unwrap();
// PostgreSQL writes line "ready" to the postmaster.pid file, when it has
// completed initialization and is ready to accept connections. We want to
// react quickly and perform the rest of our initialization as soon as
// PostgreSQL starts accepting connections. Use 'notify' to be notified
// whenever the PID file is changed, and whenever it changes, read it to
// check if it's now "ready".
//
// You cannot actually watch a file before it exists, so we first watch the
// data directory, and once the postmaster.pid file appears, we switch to
// watch the file instead. We also wake up every 100 ms to poll, just in
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
let _ = tx.send(res);
}) {
Ok(watcher) => (Box::new(watcher), rx),
Err(e) => {
match e.kind {
notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
// docker on m1 macs does not support recommended_watcher
// but return "Function not implemented (os error 38)"
// see https://github.com/notify-rs/notify/issues/423
let (tx, rx) = std::sync::mpsc::channel();
loop {
// Sleep POSTGRES_WAIT_TIMEOUT at max (a bit longer actually if consider a TCP timeout,
// but postgres starts listening almost immediately, even if it is not really
// ready to accept connections).
if slept >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
// let's poll it faster than what we check the results for (100ms)
let config =
notify::Config::default().with_poll_interval(Duration::from_millis(50));
let watcher = notify::PollWatcher::new(
move |res| {
let _ = tx.send(res);
},
config,
)?;
(Box::new(watcher), rx)
}
_ => return Err(e.into()),
}
}
};
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
let started_at = Instant::now();
let mut postmaster_pid_seen = false;
loop {
if let Ok(Some(status)) = pg.try_wait() {
// Postgres exited, that is not what we expected, bail out earlier.
let code = status.code().unwrap_or(-1);
bail!("Postgres exited unexpectedly with code {}", code);
}
let res = rx.recv_timeout(Duration::from_millis(100));
log::debug!("woken up by notify: {res:?}");
// If there are multiple events in the channel already, we only need to be
// check once. Swallow the extra events before we go ahead to check the
// pid file.
while let Ok(res) = rx.try_recv() {
log::debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
log::debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
watcher
.watch(&pid_path, RecursiveMode::NonRecursive)
.expect("Failed to add postmaster.pid file watch");
postmaster_pid_seen = true;
}
let file = BufReader::new(file);
let last_line = file.lines().last();
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
let can_connect = TcpStream::connect_timeout(&addr, timeout).is_ok();
log::debug!("last line of postmaster.pid: {status:?}");
// Now Postgres is ready to accept connections
if status == "ready" && can_connect {
if status == "ready" {
break;
}
}
}
thread::sleep(pause);
slept += 100;
// Give up after POSTGRES_WAIT_TIMEOUT.
let duration = started_at.elapsed();
if duration >= POSTGRES_WAIT_TIMEOUT {
bail!("timed out while waiting for Postgres to start");
}
}
log::info!("PostgreSQL is now running, continuing to configure it");
Ok(())
}

View File

@@ -426,14 +426,3 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
Ok(())
}
pub fn create_system_extensions(client: &mut Client) -> Result<()> {
for ext in ["pg_stat_statements", "neon"].iter() {
let query: String = format!("CREATE EXTENSION IF NOT EXISTS {}", ext);
info!("creating extension '{}'", ext);
client.execute(query.as_str(), &[])?;
}
Ok(())
}

View File

@@ -28,7 +28,8 @@ mod pg_helpers_tests {
assert_eq!(
spec.cluster.settings.as_pg_settings(),
"fsync = off\nwal_level = replica\nhot_standby = on\nneon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'\nwal_log_hints = on\nlog_connections = on\nshared_buffers = 32768\nport = 55432\nmax_connections = 100\nmax_wal_senders = 10\nlisten_addresses = '0.0.0.0'\nwal_sender_timeout = 0\npassword_encryption = md5\nmaintenance_work_mem = 65536\nmax_parallel_workers = 8\nmax_worker_processes = 8\nneon.tenant_id = 'b0554b632bd4d547a63b86c3630317e8'\nmax_replication_slots = 10\nneon.timeline_id = '2414a61ffc94e428f14b5758fe308e13'\nshared_preload_libraries = 'neon,pg_stat_statements'\nsynchronous_standby_names = 'walproposer'\nneon.pageserver_connstring = 'host=127.0.0.1 port=6400'" );
"fsync = off\nwal_level = replica\nhot_standby = on\nneon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'\nwal_log_hints = on\nlog_connections = on\nshared_buffers = 32768\nport = 55432\nmax_connections = 100\nmax_wal_senders = 10\nlisten_addresses = '0.0.0.0'\nwal_sender_timeout = 0\npassword_encryption = md5\nmaintenance_work_mem = 65536\nmax_parallel_workers = 8\nmax_worker_processes = 8\nneon.tenant_id = 'b0554b632bd4d547a63b86c3630317e8'\nmax_replication_slots = 10\nneon.timeline_id = '2414a61ffc94e428f14b5758fe308e13'\nshared_preload_libraries = 'neon'\nsynchronous_standby_names = 'walproposer'\nneon.pageserver_connstring = 'host=127.0.0.1 port=6400'"
);
}
#[test]

View File

@@ -19,7 +19,9 @@ thiserror = "1"
nix = "0.23"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
pageserver = { path = "../pageserver" }
safekeeper = { path = "../safekeeper" }
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.
pageserver_api = { path = "../libs/pageserver_api" }
safekeeper_api = { path = "../libs/safekeeper_api" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -12,12 +12,12 @@ use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env};
use pageserver::config::defaults::{
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use pageserver::http::models::TimelineInfo;
use safekeeper::defaults::{
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};

View File

@@ -321,10 +321,7 @@ impl PostgresNode {
// uses only needed variables namely host, port, user, password.
format!("postgresql://no_user:{password}@{host}:{port}")
};
conf.append(
"shared_preload_libraries",
"neon, pg_stat_statements, pg_wait_sampling, pg_query_state",
);
conf.append("shared_preload_libraries", "neon");
conf.append_line("");
conf.append("neon.pageserver_connstring", &pageserver_connstr);
conf.append("neon.tenant_id", &self.tenant_id.to_string());

View File

@@ -12,7 +12,7 @@ use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use safekeeper::http::models::TimelineCreateRequest;
use safekeeper_api::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
connstring::connection_address,

View File

@@ -11,7 +11,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{
use pageserver_api::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres::{Config, NoTls};

View File

@@ -0,0 +1,163 @@
# Storage messaging
Safekeepers need to communicate to each other to
* Trim WAL on safekeepers;
* Decide on which SK should push WAL to the S3;
* Decide on when to shut down SK<->pageserver connection;
* Understand state of each other to perform peer recovery;
Pageservers need to communicate to safekeepers to decide which SK should provide
WAL to the pageserver.
This is an iteration on [015-storage-messaging](https://github.com/neondatabase/neon/blob/main/docs/rfcs/015-storage-messaging.md) describing current situation,
potential performance issue and ways to address it.
## Background
What we have currently is very close to etcd variant described in
015-storage-messaging. Basically, we have single `SkTimelineInfo` message
periodically sent by all safekeepers to etcd for each timeline.
* Safekeepers subscribe to it to learn status of peers (currently they subscribe to
'everything', but they can and should fetch data only for timelines they hold).
* Pageserver subscribes to it (separate watch per timeline) to learn safekeepers
positions; based on that, it decides from which safekeepers to pull WAL.
Also, safekeepers use etcd elections API to make sure only single safekeeper
offloads WAL.
It works, and callmemaybe is gone. However, this has a performance
hazard. Currently deployed etcd can do about 6k puts per second (using its own
`benchmark` tool); on my 6 core laptop, while running on tmpfs, this gets to
35k. Making benchmark closer to our usage [etcd watch bench](https://github.com/arssher/etcd-client/blob/watch-bench/examples/watch_bench.rs),
I get ~10k received messages per second with various number of publisher-subscribers
(laptop, tmpfs). Diving this by 12 (3 sks generate msg, 1 ps + 3 sk consume them) we
get about 800 active timelines, if message is sent each second. Not extremely
low, but quite reachable.
A lot of idle watches seem to be ok though -- which is good, as pageserver
subscribes to all its timelines regardless of their activity.
Also, running etcd with fsyncs disabled is messy -- data dir must be wiped on
each restart or there is a risk of corruption errors.
The reason is etcd making much more than what we need; it is a fault tolerant
store with strong consistency, but I claim all we need here is just simplest pub
sub with best effort delivery, because
* We already have centralized source of truth for long running data, like which
tlis are on which nodes -- the console.
* Momentary data (safekeeper/pageserver progress) doesn't make sense to persist.
Instead of putting each change to broker, expecting it to reliably deliver it
is better to just have constant flow of data for active timelines: 1) they
serve as natural heartbeats -- if node can't send, we shouldn't pull WAL from
it 2) it is simpler -- no need to track delivery to/from the broker.
Moreover, latency here is important: the faster we obtain fresh data, the
faster we can switch to proper safekeeper after failure.
* As for WAL offloading leader election, it is trivial to achieve through these
heartbeats -- just take suitable node through deterministic rule (min node
id). Once network is stable, this is a converging process (well, except
complicated failure topology, but even then making it converge is not
hard). Such elections bear some risk of several offloaders running
concurrently for a short period of time, but that's harmless.
Generally, if one needs strong consistency, electing leader per se is not
enough; it must be accompanied with number (logical clock ts), checked at
every action to track causality. s3 doesn't provide CAS, so it can't
differentiate old/new leader, this must be solved differently.
We could use etcd CAS (its most powerful/useful primitive actually) to issue
these leader numbers (and e.g. prefix files in s3), but currently I don't see
need for that.
Obviously best effort pub sub is much more simpler and performant; the one proposed is
## gRPC broker
I took tonic and [prototyped](https://github.com/neondatabase/neon/blob/asher/neon-broker/broker/src/broker.rs) the replacement of functionality we currently use
with grpc streams and tokio mpsc channels. The implementation description is at the file header.
It is just 500 lines of code and core functionality is complete. 1-1 pub sub
gives about 120k received messages per second; having multiple subscribers in
different connecitons quickly scales to 1 million received messages per second.
I had concerns about many concurrent streams in singe connection, but 2^20
subscribers still work (though eat memory, with 10 publishers 20GB are consumed;
in this implementation each publisher holds full copy of all subscribers). There
is `bench.rs` nearby which I used for testing.
`SkTimelineInfo` is wired here, but another message can be added (e.g. if
pageservers want to communicate with each other) with templating.
### Fault tolerance
Since such broker is stateless, we can run it under k8s. Or add proxying to
other members, with best-effort this is simple.
### Security implications
Communication happens in a private network that is not exposed to users;
additionaly we can add auth to the broker.
## Alternative: get existing pub-sub
We could take some existing pub sub solution, e.g. RabbitMQ, Redis. But in this
case IMV simplicity of our own outweights external dependency costs (RabbitMQ is
much more complicated and needs VM; Redis Rust client maintenance is not
ideal...). Also note that projects like CockroachDB and TiDB are based on gRPC
as well.
## Alternative: direct communication
Apart from being transport, broker solves one more task: discovery, i.e. letting
safekeepers and pageservers find each other. We can let safekeepers know, for
each timeline, both other safekeepers for this timeline and pageservers serving
it. In this case direct communication is possible:
- each safekeeper pushes to each other safekeeper status of timelines residing
on both of them, letting remove WAL, decide who offloads, decide on peer
recovery;
- each safekeeper pushes to each pageserver status of timelines residing on
both of them, letting pageserver choose from which sk to pull WAL;
It was mostly described in [014-safekeeper-gossip](https://github.com/neondatabase/neon/blob/main/docs/rfcs/014-safekeepers-gossip.md), but I want to recap on that.
The main pro is less one dependency: less moving parts, easier to run Neon
locally/manually, less places to monitor. Fault tolerance for broker disappears,
no kuber or something. To me this is a big thing.
Also (though not a big thing) idle watches for inactive timelines disappear:
naturally safekeepers learn about compute connection first and start pushing
status to pageserver(s), notifying it should pull.
Importantly, I think that eventually knowing and persisting peers and
pageservers on safekeepers is inevitable:
- Knowing peer safekeepers for the timeline is required for correct
automatic membership change -- new member set must be hardened on old
majority before proceeding. It is required to get rid of sync-safekeepers
as well (peer recovery up to flush_lsn).
- Knowing pageservers where the timeline is attached is needed to
1. Understand when to shut down activity on the timeline, i.e. push data to
the broker. We can have a lot of timelines sleeping quietly which
shouldn't occupy resources.
2. Preserve WAL for these (currently we offload to s3 and take it from there,
but serving locally is better, and we get one less condition on which WAL
can be removed from s3).
I suppose this membership data should be passed to safekeepers directly from the
console because
1. Console is the original source of this data, conceptually this is the
simplest way (rather than passing it through compute or something).
2. We already have similar code for deleting timeline on safekeepers
(and attaching/detaching timeline on pageserver), this is a typical
action -- queue operation against storage node and execute it until it
completes (or timeline is dropped).
Cons of direct communication are
- It is more complicated: each safekeeper should maintain set of peers it talks
to, and set of timelines for each such peer -- they ought to be multiplexed
into single connection.
- Totally, we have O(n^2) connections instead of O(n) with broker schema
(still O(n) on each node). However, these are relatively stable, async and
thus not very expensive, I don't think this is a big problem. Up to 10k
storage nodes I doubt connection overhead would be noticeable.
I'd use gRPC for direct communication, and in this sense gRPC based broker is a
step towards it.

View File

@@ -96,7 +96,7 @@ A single virtual environment with all dependencies is described in the single `P
sudo apt install python3.9
```
- Install `poetry`
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation)`.
- Exact version of `poetry` is not important, see installation instructions available at poetry's [website](https://python-poetry.org/docs/#installation).
- Install dependencies via `./scripts/pysync`.
- Note that CI uses specific Python version (look for `PYTHON_VERSION` [here](https://github.com/neondatabase/docker-images/blob/main/rust/Dockerfile))
so if you have different version some linting tools can yield different result locally vs in the CI.

View File

@@ -3,7 +3,7 @@
//! Otherwise, we might not see all metrics registered via
//! a default registry.
use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::{core, default_registry, proto};
@@ -17,6 +17,7 @@ pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
use prometheus::{Registry, Result};
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
@@ -32,13 +33,27 @@ macro_rules! register_uint_gauge_vec {
}};
}
/// Special internal registry, to collect metrics independently from the default registry.
/// Was introduced to fix deadlock with lazy registration of metrics in the default registry.
static INTERNAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
/// Register a collector in the internal registry. MUST be called before the first call to `gather()`.
/// Otherwise, we can have a deadlock in the `gather()` call, trying to register a new collector
/// while holding the lock.
pub fn register_internal(c: Box<dyn Collector>) -> Result<()> {
INTERNAL_REGISTRY.register(c)
}
/// Gathers all Prometheus metrics and records the I/O stats just before that.
///
/// Metrics gathering is a relatively simple and standalone operation, so
/// it might be fine to do it this way to keep things simple.
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
update_rusage_metrics();
prometheus::gather()
let mut mfs = prometheus::gather();
let mut internal_mfs = INTERNAL_REGISTRY.gather();
mfs.append(&mut internal_mfs);
mfs
}
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {
@@ -62,6 +77,16 @@ pub const DISK_WRITE_SECONDS_BUCKETS: &[f64] = &[
0.000_050, 0.000_100, 0.000_500, 0.001, 0.003, 0.005, 0.01, 0.05, 0.1, 0.3, 0.5,
];
pub fn set_build_info_metric(revision: &str) {
let metric = register_int_gauge_vec!(
"libmetrics_build_info",
"Build/version information",
&["revision"]
)
.expect("Failed to register build info metric");
metric.with_label_values(&[revision]).set(1);
}
// Records I/O stats in a "cross-platform" way.
// Compiles both on macOS and Linux, but current macOS implementation always returns 0 as values for I/O stats.
// An alternative is to read procfs (`/proc/[pid]/io`) which does not work under macOS at all, hence abandoned.

View File

@@ -0,0 +1,12 @@
[package]
name = "pageserver_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,9 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -7,7 +7,17 @@ use utils::{
lsn::Lsn,
};
use crate::tenant::TenantState;
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
#[serde_as]
#[derive(Serialize, Deserialize)]

View File

@@ -3,9 +3,11 @@
#![allow(non_snake_case)]
// bindgen creates some unsafe code with no doc comments.
#![allow(clippy::missing_safety_doc)]
// suppress warnings on rust 1.53 due to bindgen unit tests.
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
#![allow(clippy::useless_transmute)]
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;

View File

@@ -57,12 +57,10 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
}
#[allow(non_snake_case)]
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,
offset: u32,
@@ -71,7 +69,6 @@ pub fn XLogSegNoOffsetToRecPtr(
segno * (wal_segsz_bytes as u64) + (offset as u64)
}
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
format!(
"{:>08X}{:>08X}{:>08X}",
@@ -81,7 +78,6 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}
#[allow(non_snake_case)]
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
@@ -89,12 +85,10 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
}
#[allow(non_snake_case)]
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
}
#[allow(non_snake_case)]
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
}

View File

@@ -0,0 +1,12 @@
[package]
name = "safekeeper_api"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
const_format = "0.2.21"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,10 @@
use const_format::formatcp;
/// Public API types
pub mod models;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");

View File

@@ -9,6 +9,7 @@ use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use tokio::task::JoinError;
use tracing::info;
use std::future::Future;
@@ -35,7 +36,13 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metrics = metrics::gather();
let metrics = tokio::task::spawn_blocking(move || {
// Currently we take a lot of mutexes while collecting metrics, so it's
// better to spawn a blocking task to avoid blocking the event loop.
metrics::gather()
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
encoder.encode(&metrics, &mut buffer).unwrap();
let response = Response::builder()

View File

@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
collections::HashMap,
fmt,
future::Future,
io::{self, Cursor},
str,
@@ -124,6 +125,19 @@ pub struct CancelKeyData {
pub cancel_key: i32,
}
impl fmt::Display for CancelKeyData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let hi = (self.backend_pid as u64) << 32;
let lo = self.cancel_key as u64;
let id = hi | lo;
// This format is more compact and might work better for logs.
f.debug_tuple("CancelKeyData")
.field(&format_args!("{:x}", id))
.finish()
}
}
use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {

View File

@@ -58,6 +58,7 @@ rstar = "0.9.3"
num-traits = "0.2.15"
amplify_num = "0.4.1"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }

View File

@@ -1,35 +0,0 @@
//! Main entry point for the dump_layerfile executable
//!
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use pageserver::page_cache;
use pageserver::tenant::dump_layerfile_from_path;
use pageserver::virtual_file;
use std::path::PathBuf;
use utils::project_git_version;
project_git_version!(GIT_VERSION);
fn main() -> Result<()> {
let arg_matches = App::new("Neon dump_layerfile utility")
.about("Dump contents of one layer file, for debugging")
.version(GIT_VERSION)
.arg(
Arg::new("path")
.help("Path to file to dump")
.required(true)
.index(1),
)
.get_matches();
let path = PathBuf::from(arg_matches.value_of("path").unwrap());
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
dump_layerfile_from_path(&path, true)?;
Ok(())
}

View File

@@ -10,6 +10,8 @@ use clap::{App, Arg};
use daemonize::Daemonize;
use fail::FailScenario;
use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, task_mgr,
@@ -31,11 +33,20 @@ use utils::{
project_git_version!(GIT_VERSION);
const FEATURES: &[&str] = &[
#[cfg(feature = "testing")]
"testing",
#[cfg(feature = "fail/failpoints")]
"fail/failpoints",
#[cfg(feature = "profiling")]
"profiling",
];
fn version() -> String {
format!(
"{GIT_VERSION} profiling:{} failpoints:{}",
cfg!(feature = "profiling"),
fail::has_failpoints()
"{GIT_VERSION} failpoints: {}, features: {:?}",
fail::has_failpoints(),
FEATURES,
)
}
@@ -86,13 +97,7 @@ fn main() -> anyhow::Result<()> {
.get_matches();
if arg_matches.is_present("enabled-features") {
let features: &[&str] = &[
#[cfg(feature = "testing")]
"testing",
#[cfg(feature = "profiling")]
"profiling",
];
println!("{{\"features\": {features:?} }}");
println!("{{\"features\": {FEATURES:?} }}");
return Ok(());
}
@@ -356,6 +361,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
},
);
set_build_info_metric(GIT_VERSION);
// All started up! Now just sit and wait for shutdown signal.
signals.handle(|signal| match signal {
Signal::Quit => {

View File

@@ -0,0 +1,144 @@
//! A helper tool to manage pageserver binary files.
//! Accepts a file as an argument, attempts to parse it with all ways possible
//! and prints its interpreted context.
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
use std::{
path::{Path, PathBuf},
str::FromStr,
};
use anyhow::Context;
use clap::{App, Arg};
use pageserver::{
page_cache,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
const METADATA_SUBCOMMAND: &str = "metadata";
fn main() -> anyhow::Result<()> {
let arg_matches = App::new("Neon Pageserver binutils")
.about("Reads pageserver (and related) binary files management utility")
.version(GIT_VERSION)
.arg(Arg::new("path").help("Input file path").required(false))
.subcommand(
App::new(METADATA_SUBCOMMAND)
.about("Read and update pageserver metadata file")
.arg(
Arg::new("metadata_path")
.help("Input metadata file path")
.required(false),
)
.arg(
Arg::new("disk_consistent_lsn")
.long("disk_consistent_lsn")
.takes_value(true)
.help("Replace disk consistent Lsn"),
)
.arg(
Arg::new("prev_record_lsn")
.long("prev_record_lsn")
.takes_value(true)
.help("Replace previous record Lsn"),
),
)
.get_matches();
match arg_matches.subcommand() {
Some((subcommand_name, subcommand_matches)) => {
let path = PathBuf::from(
subcommand_matches
.value_of("metadata_path")
.context("'metadata_path' argument is missing")?,
);
anyhow::ensure!(
subcommand_name == METADATA_SUBCOMMAND,
"Unknown subcommand {subcommand_name}"
);
handle_metadata(&path, subcommand_matches)?;
}
None => {
let path = PathBuf::from(
arg_matches
.value_of("path")
.context("'path' argument is missing")?,
);
println!(
"No subcommand specified, attempting to guess the format for file {}",
path.display()
);
if let Err(e) = read_pg_control_file(&path) {
println!(
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&path)?;
}
}
};
Ok(())
}
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(&control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
println!(
"pg_initdb_lsn: {}, aligned: {}",
control_file_initdb,
control_file_initdb.align()
);
Ok(())
}
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
dump_layerfile_from_path(path, true)
}
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(&path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
if let Some(disk_consistent_lsn) = arg_matches.value_of("disk_consistent_lsn") {
meta = TimelineMetadata::new(
Lsn::from_str(disk_consistent_lsn)?,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_record_lsn) = arg_matches.value_of("prev_record_lsn") {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(Lsn::from_str(prev_record_lsn)?),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(&path, &metadata_bytes)?;
}
Ok(())
}

View File

@@ -1,75 +0,0 @@
//! Main entry point for the edit_metadata executable
//!
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use pageserver::tenant::metadata::TimelineMetadata;
use std::path::PathBuf;
use std::str::FromStr;
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
fn main() -> Result<()> {
let arg_matches = App::new("Neon update metadata utility")
.about("Dump or update metadata file")
.version(GIT_VERSION)
.arg(
Arg::new("path")
.help("Path to metadata file")
.required(true),
)
.arg(
Arg::new("disk_lsn")
.short('d')
.long("disk_lsn")
.takes_value(true)
.help("Replace disk constistent lsn"),
)
.arg(
Arg::new("prev_lsn")
.short('p')
.long("prev_lsn")
.takes_value(true)
.help("Previous record LSN"),
)
.get_matches();
let path = PathBuf::from(arg_matches.value_of("path").unwrap());
let metadata_bytes = std::fs::read(&path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{:?}", &meta);
let mut update_meta = false;
if let Some(disk_lsn) = arg_matches.value_of("disk_lsn") {
meta = TimelineMetadata::new(
Lsn::from_str(disk_lsn)?,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_lsn) = arg_matches.value_of("prev_lsn") {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(Lsn::from_str(prev_lsn)?),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(&path, &metadata_bytes)?;
}
Ok(())
}

View File

@@ -30,10 +30,10 @@ pub mod defaults {
use crate::tenant_config::defaults::*;
use const_format::formatcp;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;

View File

@@ -207,6 +207,62 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get LSN by a timestamp
parameters:
- name: timestamp
in: query
required: true
schema:
type: string
format: date-time
description: A timestamp to get the LSN
responses:
"200":
description: OK
content:
application/json:
schema:
type: string
"400":
description: Error when no tenant id found in path, no timeline id or invalid timestamp
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id

View File

@@ -12,6 +12,7 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant::{TenantState, Timeline};
@@ -265,6 +266,23 @@ fn query_param_present(request: &Request<Body>, param: &str) -> bool {
.unwrap_or(false)
}
fn get_query_param(request: &Request<Body>, param_name: &str) -> Result<String, ApiError> {
request.uri().query().map_or(
Err(ApiError::BadRequest(anyhow!("empty query in request"))),
|v| {
url::form_urlencoded::parse(v.as_bytes())
.into_owned()
.find(|(k, _)| k == param_name)
.map_or(
Err(ApiError::BadRequest(anyhow!(
"no {param_name} specified in query parameters"
))),
|(_, v)| Ok(v),
)
},
)
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -329,6 +347,33 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
}
}
async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(timestamp_raw.as_str())
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let timeline = tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id))
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.map_err(ApiError::NotFound)?;
let result = match timeline
.find_lsn_for_timestamp(timestamp_pg)
.map_err(ApiError::InternalServerError)?
{
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
json_response(StatusCode::OK, result)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
@@ -337,9 +382,16 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
info!("Handling tenant attach {tenant_id}");
tokio::task::spawn_blocking(move || match tenant_mgr::get_tenant(tenant_id, false) {
Ok(_) => Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
)),
Ok(tenant) => {
if tenant.list_timelines().is_empty() {
info!("Attaching to tenant {tenant_id} with zero timelines");
Ok(())
} else {
Err(ApiError::Conflict(
"Tenant is already present locally".to_owned(),
))
}
}
Err(_) => Ok(()),
})
.await
@@ -732,7 +784,7 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, ())
}
#[cfg(any(feature = "testing", feature = "failpoints"))]
#[cfg(feature = "testing")]
async fn failpoints_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow!(
@@ -901,6 +953,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
get_lsn_by_timestamp_handler,
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
testing_api!("run timeline GC", timeline_gc_handler),

View File

@@ -12,7 +12,6 @@
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
use std::net::TcpListener;
use std::str;
@@ -35,7 +34,6 @@ use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::task_mgr;
@@ -45,7 +43,6 @@ use crate::tenant_mgr;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
@@ -1062,33 +1059,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("get_lsn_by_timestamp ") {
// Locate LSN of last transaction with timestamp less or equal than sppecified
// TODO lazy static
let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$")
.unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?;
let timestamp_pg = to_pg_timestamp(timestamp);
self.check_permission(Some(tenant_id))?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
pgb.write_message(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"lsn",
)]))?;
let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? {
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
pgb.write_message(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {
bail!("unknown command");
}

View File

@@ -639,6 +639,7 @@ pub fn spawn_storage_sync_task(
(storage, remote_index_clone, sync_queue),
max_sync_errors,
)
.instrument(info_span!("storage_sync_loop"))
.await;
Ok(())
},

View File

@@ -45,6 +45,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
use toml_edit;
use utils::{
@@ -118,18 +119,6 @@ pub struct Tenant {
upload_layers: bool,
}
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Tenant {
@@ -400,16 +389,19 @@ impl Tenant {
timeline_id,
metadata.pg_version()
);
let timeline = self
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
match timelines_accessor.entry(timeline.timeline_id) {
Entry::Occupied(_) => bail!(
"Found freshly initialized timeline {} in the tenant map",
timeline.timeline_id
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
match timelines_accessor.entry(timeline_id) {
Entry::Occupied(_) => warn!(
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
self.tenant_id, timeline_id
),
Entry::Vacant(v) => {
let timeline = self
.initialize_new_timeline(timeline_id, metadata, ancestor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
v.insert(timeline);
}
}
@@ -609,21 +601,14 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
new_metadata: TimelineMetadata,
timelines: &mut MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<Arc<Timeline>> {
let ancestor = match new_metadata.ancestor_timeline() {
Some(ancestor_timeline_id) => Some(
timelines
.get(&ancestor_timeline_id)
.cloned()
.with_context(|| {
format!(
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
})?,
),
None => None,
};
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
anyhow::ensure!(
ancestor.is_some(),
"Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found"
)
}
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version();
@@ -1080,8 +1065,12 @@ impl Tenant {
)
})?;
let ancestor = new_metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines.get(&ancestor_timeline_id))
.cloned();
let new_timeline = self
.initialize_new_timeline(new_timeline_id, new_metadata, timelines)
.initialize_new_timeline(new_timeline_id, new_metadata, ancestor)
.with_context(|| {
format!(
"Failed to initialize timeline {}/{}",
@@ -1105,12 +1094,22 @@ impl Tenant {
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization.
fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path, pg_version: u32) -> Result<()> {
info!("running initdb in {}... ", initdbpath.display());
fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Path,
pg_version: u32,
) -> Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version).join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version);
info!(
"running {} in {}, libdir: {}",
initdb_bin_path.display(),
initdb_target_dir.display(),
initdb_lib_dir.display(),
);
let initdb_path = conf.pg_bin_dir(pg_version).join("initdb");
let initdb_output = Command::new(initdb_path)
.args(&["-D", &initdbpath.to_string_lossy()])
let initdb_output = Command::new(initdb_bin_path)
.args(&["-D", &initdb_target_dir.to_string_lossy()])
.args(&["-U", &conf.superuser])
.args(&["-E", "utf8"])
.arg("--no-instructions")
@@ -1118,8 +1117,8 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path, pg_version: u32)
// so no need to fsync it
.arg("--no-sync")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("LD_LIBRARY_PATH", &initdb_lib_dir)
.env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
.stdout(Stdio::null())
.output()
.context("failed to execute initdb")?;

View File

@@ -556,7 +556,7 @@ impl DeltaLayer {
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'dump_layerfile' binary.
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
pub fn new_for_path<F>(path: &Path, file: F) -> Result<Self>
where
F: FileExt,

View File

@@ -177,7 +177,7 @@ impl fmt::Display for ImageFileName {
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'dump_layerfile' binary, we need to construct a Layer
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.

View File

@@ -357,7 +357,7 @@ impl ImageLayer {
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'dump_layerfile' binary.
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
pub fn new_for_path<F>(path: &Path, file: F) -> Result<ImageLayer>
where
F: std::os::unix::prelude::FileExt,

View File

@@ -627,7 +627,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
let _ = spawn_connection_manager_task(
spawn_connection_manager_task(
self.conf.broker_etcd_prefix.clone(),
self_clone,
walreceiver_connect_timeout,

View File

@@ -24,7 +24,7 @@ pub mod defaults {
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;

View File

@@ -107,6 +107,13 @@ pub fn init_tenant_mgr(
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
/// and the load continues.
///
/// For successful tenant attach, it first has to have a `timelines/` subdirectory and a tenant config file that's loaded into memory successfully.
/// If either of the conditions fails, the tenant will be added to memory with [`TenantState::Broken`] state, otherwise we start to load its timelines.
/// Alternatively, tenant is considered loaded successfully, if it's already in pageserver's memory (i.e. was loaded already before).
///
/// Attach happens on startup and sucessful timeline downloads
/// (some subset of timeline files, always including its metadata, after which the new one needs to be registered).
pub fn attach_local_tenants(
conf: &'static PageServerConf,
remote_index: &RemoteIndex,
@@ -122,18 +129,20 @@ pub fn attach_local_tenants(
);
debug!("Timelines to attach: {local_timelines:?}");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
{
match tenants_state::write_tenants().entry(tenant_id) {
hash_map::Entry::Occupied(_) => {
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
continue;
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::clone(&tenant));
}
let mut tenants_accessor = tenants_state::write_tenants();
let tenant = match tenants_accessor.entry(tenant_id) {
hash_map::Entry::Occupied(o) => {
info!("Tenant {tenant_id} was found in pageserver's memory");
Arc::clone(o.get())
}
}
hash_map::Entry::Vacant(v) => {
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
let tenant = load_local_tenant(conf, tenant_id, remote_index);
v.insert(Arc::clone(&tenant));
tenant
}
};
drop(tenants_accessor);
if tenant.current_state() == TenantState::Broken {
warn!("Skipping timeline load for broken tenant {tenant_id}")
@@ -168,16 +177,28 @@ fn load_local_tenant(
remote_index.clone(),
conf.remote_storage_config.is_some(),
));
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
if !tenant_timelines_dir.is_dir() {
error!(
"Tenant {} has no timelines directory at {}",
tenant_id,
tenant_timelines_dir.display()
);
tenant.set_state(TenantState::Broken);
} else {
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
}
}
tenant
}
@@ -625,14 +646,10 @@ fn collect_timelines_for_tenant(
}
if tenant_timelines.is_empty() {
match remove_if_empty(&timelines_dir) {
Ok(true) => info!(
"Removed empty tenant timelines directory {}",
timelines_dir.display()
),
Ok(false) => (),
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
}
// this is normal, we've removed all broken, empty and temporary timeline dirs
// but should allow the tenant to stay functional and allow creating new timelines
// on a restart, we require tenants to have the timelines dir, so leave it on disk
debug!("Tenant {tenant_id} has no timelines loaded");
}
Ok((tenant_id, tenant_timelines))

View File

@@ -12,6 +12,8 @@ use chrono::{NaiveDateTime, Utc};
use fail::fail_point;
use futures::StreamExt;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
@@ -156,6 +158,14 @@ pub async fn handle_walreceiver_connection(
// There might be some padding after the last full record, skip it.
startpoint += startpoint.calc_padding(8u32);
// If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
// for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
//. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
// but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
// header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
// to the safekeepers.
startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");

View File

@@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
atty = "0.2.14"
base64 = "0.13.0"
bstr = "0.2.17"
bytes = { version = "1.0.1", features = ['serde'] }
@@ -35,6 +35,8 @@ thiserror = "1.0.30"
tokio = { version = "1.17", features = ["macros"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-rustls = "0.23.0"
tracing = "0.1.36"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2.2"
uuid = { version = "0.8.2", features = ["v4", "serde"]}
x509-parser = "0.13.2"
@@ -44,6 +46,7 @@ metrics = { path = "../libs/metrics" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]
async-trait = "0.1"
rcgen = "0.8.14"
rstest = "0.12"
tokio-postgres-rustls = "0.9.0"

View File

@@ -15,6 +15,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
static CPLANE_WAITERS: Lazy<Waiters<mgmt::ComputeReady>> = Lazy::new(Default::default);
@@ -171,6 +172,8 @@ impl BackendType<'_, ClientCredentials<'_>> {
// support SNI or other means of passing the project name.
// We now expect to see a very specific payload in the place of password.
if creds.project().is_none() {
warn!("project name not specified, resorting to the password hack auth flow");
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)
.await?
@@ -179,6 +182,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
// Finally we may finish the initialization of `creds`.
// TODO: add missing type safety to ClientCredentials.
info!(project = &payload.project, "received missing parameter");
creds.project = Some(payload.project.into());
let mut config = match &self {
@@ -196,6 +200,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
// We should use a password from payload as well.
config.password(payload.password);
info!("user successfully authenticated (using the password hack)");
return Ok(compute::NodeInfo {
reported_auth_ok: false,
config,
@@ -203,19 +208,31 @@ impl BackendType<'_, ClientCredentials<'_>> {
}
}
match self {
let res = match self {
Console(endpoint, creds) => {
info!(
user = creds.user,
project = creds.project(),
"performing authentication using the console"
);
console::Api::new(&endpoint, extra, &creds)
.handle_user(client)
.await
}
Postgres(endpoint, creds) => {
info!("performing mock authentication using a local postgres instance");
postgres::Api::new(&endpoint, &creds)
.handle_user(client)
.await
}
// NOTE: this auth backend doesn't use client credentials.
Link(url) => link::handle_user(&url, client).await,
}
Link(url) => {
info!("performing link authentication");
link::handle_user(&url, client).await
}
}?;
info!("user successfully authenticated");
Ok(res)
}
}

View File

@@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize};
use std::future::Future;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
const REQUEST_FAILED: &str = "Console request failed";
@@ -148,10 +149,11 @@ impl<'a> Api<'a> {
}
async fn get_auth_info(&self) -> Result<AuthInfo, GetAuthInfoError> {
let request_id = uuid::Uuid::new_v4().to_string();
let req = self
.endpoint
.get("proxy_get_role_secret")
.header("X-Request-ID", uuid::Uuid::new_v4().to_string())
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
@@ -160,9 +162,7 @@ impl<'a> Api<'a> {
])
.build()?;
// TODO: use a proper logger
println!("cplane request: {}", req.url());
info!(id = request_id, url = req.url().as_str(), "request");
let resp = self.endpoint.execute(req).await?;
if !resp.status().is_success() {
return Err(TransportError::HttpStatus(resp.status()).into());
@@ -177,10 +177,11 @@ impl<'a> Api<'a> {
/// Wake up the compute node and return the corresponding connection info.
pub(super) async fn wake_compute(&self) -> Result<ComputeConnCfg, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
let req = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", uuid::Uuid::new_v4().to_string())
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
@@ -188,9 +189,7 @@ impl<'a> Api<'a> {
])
.build()?;
// TODO: use a proper logger
println!("cplane request: {}", req.url());
info!(id = request_id, url = req.url().as_str(), "request");
let resp = self.endpoint.execute(req).await?;
if !resp.status().is_success() {
return Err(TransportError::HttpStatus(resp.status()).into());
@@ -227,15 +226,18 @@ where
GetAuthInfo: Future<Output = Result<AuthInfo, GetAuthInfoError>>,
WakeCompute: Future<Output = Result<ComputeConnCfg, WakeComputeError>>,
{
info!("fetching user's authentication info");
let auth_info = get_auth_info(endpoint).await?;
let flow = AuthFlow::new(client);
let scram_keys = match auth_info {
AuthInfo::Md5(_) => {
// TODO: decide if we should support MD5 in api v2
info!("auth endpoint chooses MD5");
return Err(auth::AuthError::bad_auth_method("MD5"));
}
AuthInfo::Scram(secret) => {
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret);
Some(compute::ScramKeys {
client_key: flow.begin(scram).await?.authenticate().await?.as_bytes(),

View File

@@ -1,6 +1,7 @@
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
#[derive(Debug, Error)]
@@ -53,14 +54,16 @@ pub async fn handle_user(
let greeting = hello_message(link_uri, &psql_session_id);
let db_info = super::with_waiter(psql_session_id, |waiter| async {
// Give user a URL to spawn a new database
// Give user a URL to spawn a new database.
info!("sending the auth URL to the user");
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message(&Be::NoticeResponse(&greeting))
.await?;
// Wait for web console response (see `mgmt`)
// Wait for web console response (see `mgmt`).
info!("waiting for console's reply...");
waiter.await?.map_err(LinkAuthError::AuthFailed)
})
.await?;

View File

@@ -3,6 +3,7 @@
use crate::error::UserFacingError;
use std::borrow::Cow;
use thiserror::Error;
use tracing::info;
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error, PartialEq, Eq, Clone)]
@@ -82,6 +83,13 @@ impl<'a> ClientCredentials<'a> {
}
.transpose()?;
info!(
user = user,
dbname = dbname,
project = project.as_deref(),
"credentials"
);
Ok(Self {
user,
dbname,

View File

@@ -4,6 +4,7 @@ use parking_lot::Mutex;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_postgres::{CancelToken, NoTls};
use tracing::info;
use utils::pq_proto::CancelKeyData;
/// Enables serving `CancelRequest`s.
@@ -18,8 +19,9 @@ impl CancelMap {
.lock()
.get(&key)
.and_then(|x| x.clone())
.with_context(|| format!("unknown session: {:?}", key))?;
.with_context(|| format!("query cancellation key not found: {key}"))?;
info!("cancelling query per user's request using key {key}");
cancel_closure.try_cancel_query().await
}
@@ -41,14 +43,16 @@ impl CancelMap {
self.0
.lock()
.try_insert(key, None)
.map_err(|_| anyhow!("session already exists: {:?}", key))?;
.map_err(|_| anyhow!("query cancellation key already exists: {key}"))?;
// This will guarantee that the session gets dropped
// as soon as the future is finished.
scopeguard::defer! {
self.0.lock().remove(&key);
info!("dropped query cancellation key {key}");
}
info!("registered new query cancellation key {key}");
let session = Session::new(key, self);
f(session).await
}
@@ -102,10 +106,13 @@ impl<'a> Session<'a> {
fn new(key: CancelKeyData, cancel_map: &'a CancelMap) -> Self {
Self { key, cancel_map }
}
}
impl Session<'_> {
/// Store the cancel token for the given session.
/// This enables query cancellation in [`crate::proxy::handshake`].
pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
info!("enabling query cancellation for this session");
self.cancel_map
.0
.lock()
@@ -129,13 +136,13 @@ mod tests {
assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send");
let () = futures::future::pending().await; // sleep forever
futures::future::pending::<()>().await; // sleep forever
Ok(())
}));
// Wait until the task has been spawned.
let () = rx.await.context("failed to hear from the task")?;
rx.await.context("failed to hear from the task")?;
// Drop the session's entry by cancelling the task.
task.abort();

View File

@@ -5,6 +5,7 @@ use std::{io, net::SocketAddr};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use tracing::{error, info};
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error)]
@@ -54,6 +55,7 @@ impl NodeInfo {
use tokio_postgres::config::Host;
let connect_once = |host, port| {
info!("trying to connect to a compute node at {host}:{port}");
TcpStream::connect((host, port)).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
// This prevents load balancer from severing the connection.
@@ -72,7 +74,11 @@ impl NodeInfo {
if ports.len() > 1 && ports.len() != hosts.len() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("couldn't connect: bad compute config, ports and hosts entries' count does not match: {:?}", self.config),
format!(
"couldn't connect: bad compute config, \
ports and hosts entries' count does not match: {:?}",
self.config
),
));
}
@@ -88,7 +94,7 @@ impl NodeInfo {
Ok(socket) => return Ok(socket),
Err(err) => {
// We can't throw an error here, as there might be more hosts to try.
println!("failed to connect to compute `{host}:{port}`: {err}");
error!("failed to connect to a compute node at {host}:{port}: {err}");
connection_error = Some(err);
}
}
@@ -160,8 +166,8 @@ impl NodeInfo {
.ok_or(ConnectionError::FailedToFetchPgVersion)?
.into();
info!("connected to user's compute node at {socket_addr}");
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
let db = PostgresConnection { stream, version };
Ok((db, cancel_closure))

View File

@@ -1,6 +1,7 @@
use anyhow::anyhow;
use hyper::{Body, Request, Response, StatusCode};
use std::net::TcpListener;
use tracing::info;
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -12,9 +13,9 @@ fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
router.get("/v1/status", status_handler)
}
pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
pub async fn task_main(http_listener: TcpListener) -> anyhow::Result<()> {
scopeguard::defer! {
println!("http has shut down");
info!("http has shut down");
}
let service = || RouterService::new(make_router().build()?);

View File

@@ -23,8 +23,10 @@ use anyhow::{bail, Context};
use clap::{self, Arg};
use config::ProxyConfig;
use futures::FutureExt;
use metrics::set_build_info_metric;
use std::{borrow::Cow, future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use tracing::info;
use utils::project_git_version;
project_git_version!(GIT_VERSION);
@@ -38,6 +40,11 @@ async fn flatten_err(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_ansi(atty::is(atty::Stream::Stdout))
.with_target(false)
.init();
let arg_matches = clap::App::new("Neon proxy/router")
.version(GIT_VERSION)
.arg(
@@ -140,26 +147,27 @@ async fn main() -> anyhow::Result<()> {
auth_backend,
}));
println!("Version: {GIT_VERSION}");
println!("Authentication backend: {}", config.auth_backend);
info!("Version: {GIT_VERSION}");
info!("Authentication backend: {}", config.auth_backend);
// Check that we can bind to address before further initialization
println!("Starting http on {}", http_address);
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
println!("Starting mgmt on {}", mgmt_address);
info!("Starting mgmt on {mgmt_address}");
let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?;
println!("Starting proxy on {}", proxy_address);
info!("Starting proxy on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let tasks = [
tokio::spawn(http::server::thread_main(http_listener)),
tokio::spawn(proxy::thread_main(config, proxy_listener)),
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(proxy::task_main(config, proxy_listener)),
tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)),
]
.map(flatten_err);
set_build_info_metric(GIT_VERSION);
// This will block until all tasks have completed.
// Furthermore, the first one to fail will cancel the rest.
let _: Vec<()> = futures::future::try_join_all(tasks).await?;

View File

@@ -5,6 +5,7 @@ use std::{
net::{TcpListener, TcpStream},
thread,
};
use tracing::{error, info};
use utils::{
postgres_backend::{self, AuthType, PostgresBackend},
pq_proto::{BeMessage, SINGLE_COL_ROWDESC},
@@ -19,7 +20,7 @@ use utils::{
///
pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> {
scopeguard::defer! {
println!("mgmt has shut down");
info!("mgmt has shut down");
}
listener
@@ -27,14 +28,14 @@ pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> {
.context("failed to set listener to blocking")?;
loop {
let (socket, peer_addr) = listener.accept().context("failed to accept a new client")?;
println!("accepted connection from {}", peer_addr);
info!("accepted connection from {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set client socket option")?;
thread::spawn(move || {
if let Err(err) = handle_connection(socket) {
println!("error: {}", err);
error!("{err}");
}
});
}
@@ -102,14 +103,14 @@ impl postgres_backend::Handler for MgmtHandler {
let res = try_process_query(pgb, query_string);
// intercept and log error message
if res.is_err() {
println!("Mgmt query failed: #{:?}", res);
error!("mgmt query failed: {res:?}");
}
res
}
}
fn try_process_query(pgb: &mut PostgresBackend, query_string: &str) -> anyhow::Result<()> {
println!("Got mgmt query [redacted]"); // Content contains password, don't print it
info!("got mgmt query [redacted]"); // Content contains password, don't print it
let resp: PsqlSessionResponse = serde_json::from_str(query_string)?;

View File

@@ -8,6 +8,7 @@ use metrics::{register_int_counter, IntCounter};
use once_cell::sync::Lazy;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span, Instrument};
use utils::pq_proto::{BeMessage as Be, *};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
@@ -43,17 +44,17 @@ where
F: std::future::Future<Output = anyhow::Result<R>>,
{
future.await.map_err(|err| {
println!("error: {}", err);
error!("{err}");
err
})
}
pub async fn thread_main(
pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
) -> anyhow::Result<()> {
scopeguard::defer! {
println!("proxy has shut down");
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
@@ -63,22 +64,29 @@ pub async fn thread_main(
let cancel_map = Arc::new(CancelMap::default());
loop {
let (socket, peer_addr) = listener.accept().await?;
println!("accepted connection from {}", peer_addr);
info!("accepted connection from {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
tokio::spawn(log_error(async move {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
tokio::spawn(
log_error(async move {
info!("spawned a task for {peer_addr}");
handle_client(config, &cancel_map, socket).await
}));
socket
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket).await
})
.instrument(info_span!("client", session = format_args!("{session_id}"))),
);
}
}
async fn handle_client(
config: &ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
@@ -88,7 +96,8 @@ async fn handle_client(
}
let tls = config.tls_config.as_ref();
let (mut stream, params) = match handshake(stream, tls, cancel_map).await? {
let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake"));
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
@@ -106,7 +115,7 @@ async fn handle_client(
async { result }.or_else(|e| stream.throw_error(e)).await?
};
let client = Client::new(stream, creds, &params);
let client = Client::new(stream, creds, &params, session_id);
cancel_map
.with_session(|session| client.connect_to_db(session))
.await
@@ -127,7 +136,7 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let mut stream = PqStream::new(Stream::from_raw(stream));
loop {
let msg = stream.read_startup_packet().await?;
println!("got message: {:?}", msg);
info!("received {msg:?}");
use FeStartupPacket::*;
match msg {
@@ -164,11 +173,13 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream.throw_error_str(ERR_INSECURE_CONNECTION).await?;
}
info!(session_type = "normal", "successful handshake");
break Ok(Some((stream, params)));
}
CancelRequest(cancel_key_data) => {
cancel_map.cancel_session(cancel_key_data).await?;
info!(session_type = "cancellation", "successful handshake");
break Ok(None);
}
}
@@ -183,6 +194,8 @@ struct Client<'a, S> {
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
/// KV-dictionary with PostgreSQL connection params.
params: &'a StartupMessageParams,
/// Unique connection ID.
session_id: uuid::Uuid,
}
impl<'a, S> Client<'a, S> {
@@ -191,11 +204,13 @@ impl<'a, S> Client<'a, S> {
stream: PqStream<S>,
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
params: &'a StartupMessageParams,
session_id: uuid::Uuid,
) -> Self {
Self {
stream,
creds,
params,
session_id,
}
}
}
@@ -207,17 +222,20 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
mut stream,
creds,
params,
session_id,
} = self;
let extra = auth::ConsoleReqExtra {
// Currently it's OK to generate a new UUID **here**, but
// it might be better to move this to `cancellation::Session`.
session_id: uuid::Uuid::new_v4(),
session_id, // aka this connection's id
application_name: params.get("application_name"),
};
// Authenticate and connect to a compute node.
let auth = creds.authenticate(&extra, &mut stream).await;
let auth = creds
.authenticate(&extra, &mut stream)
.instrument(info_span!("auth"))
.await;
let node = async { auth }.or_else(|e| stream.throw_error(e)).await?;
let reported_auth_ok = node.reported_auth_ok;
@@ -251,6 +269,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
}
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let mut db = MetricsStream::new(db.stream, inc_proxied);
let mut client = MetricsStream::new(stream.into_inner(), inc_proxied);
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;

View File

@@ -5,6 +5,7 @@ filterwarnings =
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
addopts =
-m 'not remote_cluster'
--ignore=test_runner/performance
markers =
remote_cluster
testpaths =

View File

@@ -13,7 +13,7 @@
# avoid running regular linting script that checks every feature.
if [[ "$OSTYPE" == "darwin"* ]]; then
# no extra features to test currently, add more here when needed
cargo clippy --locked --all --all-targets -- -A unknown_lints -D warnings
cargo clippy --locked --all --all-targets --features testing -- -A unknown_lints -D warnings
else
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use

View File

@@ -1,11 +1,10 @@
[toolchain]
# We try to stick to a toolchain version that is widely available on popular distributions, so that most people
# can use the toolchain that comes with their operating system. But if there's a feature we miss badly from a later
# version, we can consider updating. As of this writing, 1.60 is available on Debian 'experimental' but not yet on
# 'testing' or even 'unstable', which is a bit more cutting-edge than we'd like. Hopefully the 1.60 packages reach
# 'testing' soon (and similarly for the other distributions).
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package.
channel = "1.60" # do update GitHub CI cache values for rust builds, when changing this value
# version, we can consider updating.
# See https://tracker.debian.org/pkg/rustc for more details on Debian rustc package,
# we use "unstable" version number as the highest version used in the project by default.
channel = "1.61" # do update GitHub CI cache values for rust builds, when changing this value
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -33,6 +33,7 @@ toml_edit = { version = "0.13", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }

View File

@@ -17,6 +17,7 @@ use toml_edit::Document;
use tracing::*;
use url::{ParseError, Url};
use metrics::set_build_info_metric;
use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{
@@ -291,9 +292,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let registry = metrics::default_registry();
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
registry.register(Box::new(timeline_collector))?;
metrics::register_internal(Box::new(timeline_collector))?;
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
@@ -364,6 +364,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
})?,
);
set_build_info_metric(GIT_VERSION);
// TODO: put more thoughts into handling of failed threads
// We probably should restart them.

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
pub use safekeeper_api::models;

View File

@@ -27,14 +27,13 @@ mod timelines_global_map;
pub use timelines_global_map::GlobalTimelines;
pub mod defaults {
use const_format::formatcp;
use std::time::Duration;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub use safekeeper_api::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
}

11
scripts/reformat Executable file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env bash
set -euox pipefail
# Runs all formatting tools to ensure the project is up to date
echo 'Reformatting Rust code'
cargo fmt
echo 'Reformatting Python code'
poetry run isort test_runner
poetry run flake8 test_runner
poetry run black test_runner

View File

@@ -56,6 +56,14 @@ If you want to run all tests that have the string "bench" in their names:
`./scripts/pytest -k bench`
To run tests in parellel we utilize `pytest-xdist` plugin. By default everything runs single threaded. Number of workers can be specified with `-n` argument:
`./scripts/pytest -n4`
By default performance tests are excluded. To run them explicitly pass performance tests selection to the script:
`./scripts/pytest test_runner/performance`
Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.

View File

@@ -455,6 +455,9 @@ class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]:
@@ -583,7 +586,9 @@ class NeonEnvBuilder:
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
@@ -1131,6 +1136,19 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def timeline_get_lsn_by_timestamp(
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
@@ -1182,6 +1200,7 @@ class AbstractNeonCli(abc.ABC):
arguments: List[str],
extra_env_vars: Optional[Dict[str, str]] = None,
check_return_code=True,
timeout=None,
) -> "subprocess.CompletedProcess[str]":
"""
Run the command with the specified arguments.
@@ -1228,6 +1247,7 @@ class AbstractNeonCli(abc.ABC):
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
if not res.returncode:
log.info(f"Run success: {res.stdout}")
@@ -1601,6 +1621,14 @@ class WalCraft(AbstractNeonCli):
res.check_returncode()
class ComputeCtl(AbstractNeonCli):
"""
A typed wrapper around the `compute_ctl` CLI tool.
"""
COMMAND = "compute_ctl"
class NeonPageserver(PgProtocol):
"""
An object representing a running pageserver.
@@ -1933,6 +1961,11 @@ class NeonProxy(PgProtocol):
def _wait_until_ready(self):
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
def get_metrics(self) -> str:
request_result = requests.get(f"http://{self.host}:{self.http_port}/metrics")
request_result.raise_for_status()
return request_result.text
def __enter__(self):
return self

View File

@@ -3,12 +3,15 @@ import statistics
import threading
import time
import timeit
from contextlib import closing
from typing import List
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import wait_for_last_record_lsn
from fixtures.types import Lsn
def _record_branch_creation_durations(neon_compare: NeonCompare, durs: List[float]):
@@ -107,3 +110,43 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int):
branch_creation_durations.append(dur)
_record_branch_creation_durations(neon_compare, branch_creation_durations)
# Test measures the branch creation time when branching from a timeline with a lot of relations.
#
# This test measures the latency of branch creation under two scenarios
# 1. The ancestor branch is not under any workloads
# 2. The ancestor branch is under a workload (busy)
#
# To simulate the workload, the test runs a concurrent insertion on the ancestor branch right before branching.
def test_branch_creation_many_relations(neon_compare: NeonCompare):
env = neon_compare.env
timeline_id = env.neon_cli.create_branch("root")
pg = env.postgres.create_start("root")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
for i in range(10000):
cur.execute(f"CREATE TABLE t{i} as SELECT g FROM generate_series(1, 1000) g")
# Wait for the pageserver to finish processing all the pending WALs,
# as we don't want the LSN wait time to be included during the branch creation
flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(
env.pageserver.http_client(), env.initial_tenant, timeline_id, flush_lsn
)
with neon_compare.record_duration("create_branch_time_not_busy_root"):
env.neon_cli.create_branch("child_not_busy", "root")
# run a concurrent insertion to make the ancestor "busy" during the branch creation
thread = threading.Thread(
target=pg.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
)
thread.start()
with neon_compare.record_duration("create_branch_time_busy_root"):
env.neon_cli.create_branch("child_busy", "root")
thread.join()

View File

@@ -0,0 +1,94 @@
import timeit
from pathlib import Path
from typing import List
from fixtures.benchmark_fixture import PgBenchRunResult
from fixtures.compare_fixtures import NeonCompare
from performance.test_perf_pgbench import utc_now_timestamp
# -----------------------------------------------------------------------
# Start of `test_compare_child_and_root_*` tests
# -----------------------------------------------------------------------
# `test_compare_child_and_root_*` tests compare the performance of a branch and its child branch(s).
# A common pattern in those tests is initializing a root branch then creating a child branch(s) from the root.
# Each test then runs a similar workload for both child branch and root branch. Each measures and reports
# some latencies/metrics during the workload for performance comparison between a branch and its ancestor.
def test_compare_child_and_root_pgbench_perf(neon_compare: NeonCompare):
env = neon_compare.env
pg_bin = neon_compare.pg_bin
def run_pgbench_on_branch(branch: str, cmd: List[str]):
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(
cmd,
)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
neon_compare.zenbenchmark.record_pg_bench_result(branch, res)
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
pg_bin.run_capture(["pgbench", "-i", pg_root.connstr(), "-s10"])
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", pg_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", pg_child.connstr()])
def test_compare_child_and_root_write_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
pg_root.safe_psql(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
pg_root.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("SELECT count(*) from foo")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("SELECT count(*) from foo")
# -----------------------------------------------------------------------
# End of `test_compare_child_and_root_*` tests
# -----------------------------------------------------------------------

View File

@@ -6,6 +6,8 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
@@ -88,3 +90,39 @@ def test_branching_with_pgbench(
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)
# Test branching from an "unnormalized" LSN.
#
# Context:
# When doing basebackup for a newly created branch, pageserver generates
# 'pg_control' file to bootstrap WAL segment by specifying the redo position
# a "normalized" LSN based on the timeline's starting LSN:
#
# checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0;
#
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
#
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
XLOG_BLCKSZ = 8192
env = neon_simple_env
env.neon_cli.create_branch("b0")
pg0 = env.postgres.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
with pg0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
# and is smaller than `curr_lsn`.
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
pg1 = env.postgres.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])

View File

@@ -0,0 +1,19 @@
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import NeonEnvBuilder, NeonProxy
def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonProxy):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
parsed_metrics = {}
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics())
parsed_metrics["safekeeper"] = parse_metrics(env.safekeepers[0].http_client().get_metrics_str())
parsed_metrics["proxy"] = parse_metrics(link_proxy.get_metrics())
for component, metrics in parsed_metrics.items():
sample = metrics.query_one("libmetrics_build_info")
assert "revision" in sample.labels
assert len(sample.labels["revision"]) > 0

View File

@@ -0,0 +1,203 @@
import os
from subprocess import TimeoutExpired
from fixtures.log_helper import log
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(pg.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
if "=" in line:
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = pg.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
pg.stop_and_destroy()
spec = (
"""
{
"format_version": 1.0,
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Neon Test",
"state": "restarted",
"roles": [
],
"databases": [
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "replica",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": """
+ f'"{cfg_map["neon.safekeepers"]}"'
+ """,
"vartype": "string"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "32768",
"vartype": "integer"
},
{
"name": "port",
"value": """
+ f'"{cfg_map["port"]}"'
+ """,
"vartype": "integer"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "wal_sender_timeout",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "neon.tenant_id",
"value": """
+ f'"{cfg_map["neon.tenant_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.timeline_id",
"value": """
+ f'"{cfg_map["neon.timeline_id"]}"'
+ """,
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": """
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
+ """,
"vartype": "string"
}
]
},
"delta_operations": [
]
}
"""
)
ps_connstr = cfg_map["neon.pageserver_connstring"]
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
# run compute_ctl and wait for 10s
try:
ctl.raw_cli(
["--connstr", ps_connstr, "--pgdata", pgdata, "--spec", spec, "--pgbin", pg_bin_path],
timeout=10,
)
except TimeoutExpired as exc:
ctl_logs = exc.stderr.decode("utf-8")
log.info("compute_ctl output:\n" + ctl_logs)
start = "starting safekeepers syncing"
end = "safekeepers synced at LSN"
start_pos = ctl_logs.index(start)
assert start_pos != -1
end_pos = ctl_logs.index(end, start_pos)
assert end_pos != -1
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
# assert that --sync-safekeepers logs are present in the output
assert "connecting with node" in sync_safekeepers_logs
assert "connected with node" in sync_safekeepers_logs
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
assert "got votes from majority (2)" in sync_safekeepers_logs
assert "sending elected msg to node" in sync_safekeepers_logs

View File

@@ -15,7 +15,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
ps_cur = env.pageserver.connect().cursor()
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
@@ -38,37 +37,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
)
assert result == "future"
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
)
assert result == "past"
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
lsn = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant} {new_timeline_id} '{probe_timestamp.isoformat()}Z'",
with env.pageserver.http_client() as client:
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result == "future"
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
assert result == "past"
pg_here.stop_and_destroy()
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
lsn = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
pg_here.stop_and_destroy()

View File

@@ -54,7 +54,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 10000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 100,
@@ -74,7 +74,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 20000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 30,
@@ -102,7 +102,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 15000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 80,
@@ -125,7 +125,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 15000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 80,

View File

@@ -1,4 +1,5 @@
import os
import shutil
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -7,8 +8,13 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.metrics import PAGESERVER_PER_TENANT_METRICS, parse_metrics
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from prometheus_client.samples import Sample
@@ -201,3 +207,63 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
post_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_pageserver_with_empty_tenants",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_without_timelines_dir = env.initial_tenant
log.info(
f"Tenant {tenant_without_timelines_dir} becomes broken: it abnormally looses tenants/ directory and is expected to be completely ignored when pageserver restarts"
)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
tenant_with_empty_timelines_dir = client.tenant_create()
log.info(
f"Tenant {tenant_with_empty_timelines_dir} gets all of its timelines deleted: still should be functional"
)
temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir)
for temp_timeline in temp_timelines:
client.timeline_delete(
tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines"
)
)
assert (
files_in_timelines_dir == 0
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline reinitialization after pageserver restart
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert (
len(tenants) == 1
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
loaded_tenant = tenants[0]
assert loaded_tenant["id"] == str(
tenant_with_empty_timelines_dir
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Empty tenant should be loaded and ready for timeline creation"

View File

@@ -7,19 +7,25 @@
#
import asyncio
import os
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
Postgres,
RemoteStorageKind,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_upload,
wait_until,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
async def tenant_workload(env: NeonEnv, pg: Postgres):
@@ -93,3 +99,93 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
# run final checkpoint manually to flush all the data to remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_tenants_attached_after_download(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="remote_storage_kind",
)
data_id = 1
data_secret = "very secret secret"
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
for checkpoint_number in range(1, 3):
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(path)
local_layer_deleted = True
break
assert local_layer_deleted, f"Found no local layer files to delete in directory {timeline_dir}"
##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
env.pageserver.start()
client = env.pageserver.http_client()
wait_until(
number_of_iterations=5,
interval=1,
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
)
restored_timelines = client.timeline_list(tenant_id)
assert (
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
retored_timeline = restored_timelines[0]
assert retored_timeline["timeline_id"] == str(
timeline_id
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
def expect_tenant_to_download_timeline(
client: NeonPageserverHttpClient,
tenant_id: TenantId,
):
for tenant in client.tenant_list():
if tenant["id"] == str(tenant_id):
assert not tenant.get(
"has_in_progress_downloads", True
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"

View File

@@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] }
bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] }
bytes = { version = "1", features = ["serde", "std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }