Compare commits

..

83 Commits

Author SHA1 Message Date
Bojan Serafimov
edc77e8a24 Add pyo3 prototype 2022-08-30 11:28:22 -04:00
Bojan Serafimov
324c09d19f Cleanup 2022-08-18 15:17:30 -04:00
Bojan Serafimov
1501dbd5a5 Cleanup 2022-08-18 15:16:14 -04:00
Bojan Serafimov
4e91ff31ca Fmt 2022-08-18 13:20:01 -04:00
Bojan Serafimov
fe1b471cfc Cleanup 2022-08-18 13:14:05 -04:00
Bojan Serafimov
3e0b57e766 Convert bin to lib 2022-08-18 12:41:46 -04:00
Bojan Serafimov
7bff7c4014 Add library 2022-08-18 12:40:14 -04:00
Bojan Serafimov
11a1f2a1d8 Initial experiment 2022-08-18 12:27:43 -04:00
Rory de Zoete
2db675a2f2 Re-enable test dependency for deploy (#2300)
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
2022-08-18 15:18:59 +02:00
Anton Galitsyn
77a2bdf3d7 on safekeeper registration pass availability zone param (#2292) 2022-08-18 15:05:40 +03:00
Arthur Petukhovsky
976576ae59 Fix walreceiver and safekeeper bugs (#2295)
- There was an issue with zero commit_lsn `reason: LaggingWal { current_commit_lsn: 0/0, new_commit_lsn: 1/6FD90D38, threshold: 10485760 } }`. The problem was in `send_wal.rs`, where we initialized `end_pos = Lsn(0)` and in some cases sent it to the pageserver.
- IDENTIFY_SYSTEM previously returned `flush_lsn` as a physical end of WAL. Now it returns `flush_lsn` (as it was) to walproposer and `commit_lsn` to everyone else including pageserver.
- There was an issue with backoff where connection was cancelled right after initialization: `connected!` -> `safekeeper_handle_db: Connection cancelled` -> `Backoff: waiting 3 seconds`. The problem was in sleeping before establishing the connection. This is fixed by reworking retry logic.
- There was an issue with getting `NoKeepAlives` reason in a loop. The issue is probably the same as the previous.
- There was an issue with filtering safekeepers based on retry attempts, which could filter some safekeepers indefinetely. This is fixed by using retry cooldown duration instead of retry attempts.
- Some `send_wal.rs` connections failed with errors without context. This is fixed by adding a timeline to safekeepers errors.

New retry logic works like this:
- Every candidate has a `next_retry_at` timestamp and is not considered for connection until that moment
- When walreceiver connection is closed, we update `next_retry_at` using exponential backoff, increasing the cooldown on every disconnect.
- When `last_record_lsn` was advanced using the WAL from the safekeeper, we reset the retry cooldown and exponential backoff, allowing walreceiver to reconnect to the same safekeeper instantly.
2022-08-18 13:38:23 +03:00
Anastasia Lubennikova
1a07ddae5f fix cargo test 2022-08-18 13:25:00 +03:00
Heikki Linnakangas
9bc12f7444 Move auto-generated 'bindings' to a separate inner module.
Re-export only things that are used by other modules.

In the future, I'm imagining that we run bindgen twice, for Postgres
v14 and v15. The two sets of bindings would go into separate
'bindings_v14' and 'bindings_v15' modules.

Rearrange postgres_ffi modules.

Move function, to avoid Postgres version dependency in timelines.rs
Move function to generate a logical-message WAL record to postgres_ffi.
2022-08-18 13:25:00 +03:00
Rory de Zoete
92bdf04758 Fix: Always build images (#2296)
* Always build images

* Remove unused

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-18 09:41:24 +02:00
Kirill Bulatov
67e091c906 Rework init in pageserver CLI (#2272)
* Do not create initial tenant and timeline (adjust Python tests for that)
* Rework config handling during init, add --update-config to manage local config updates
2022-08-17 23:24:47 +03:00
Alexander Bayandin
dc102197df workflows/benchmarking: increase timeout (#2294) 2022-08-17 17:16:26 +01:00
Rory de Zoete
262cdf8344 Update cachepot endpoint (#2290)
* Update cachepot endpoint

* Update dockerfile & remove env

* Update image building process

* Cannot use metadata endpoint for this

* Update workflow

* Conform to kaniko syntax

* Update syntax

* Update approach

* Update dockerfiles

* Force update

* Update dockerfiles

* Update dockerfile

* Cleanup dockerfiles

* Update s3 test location

* Revert s3 experiment

* Add more debug

* Specify aws region

* Remove debug, add prefix

* Remove one more debug

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-17 18:02:03 +02:00
Kirill Bulatov
3b819ee159 Remove extra type aliases (#2280) 2022-08-17 17:51:53 +03:00
bojanserafimov
e9a3499e87 Fix flaky pageserver restarts in tests (#2261) 2022-08-17 08:17:35 -04:00
bojanserafimov
3414feae03 Make local mypy behave like CI mypy (#2291) 2022-08-17 08:17:09 -04:00
Heikki Linnakangas
e94a5ce360 Rename pg_control_ffi.h to bindgen_deps.h, for clarity.
The pg_control_ffi.h name implies that it only includes stuff related to
pg_control.h. That's mostly true currently, but really the point of the
file is to include everything that we need to generate Rust definitions
from.
2022-08-16 19:37:36 +03:00
Dmitry Rodionov
d5ec84b87b reset rust cache for clippy run to avoid an ICE
additionally remove trailing whitespaces
2022-08-16 18:49:32 +03:00
Dmitry Rodionov
b21f7382cc split out timeline metrics, track layer map loading and size calculation 2022-08-16 18:49:32 +03:00
Kirill Bulatov
648e8bbefe Fix 1.63 clippy lints (#2282) 2022-08-16 18:49:22 +03:00
Rory de Zoete
9218426e41 Fix docker zombie process issue (#2289)
* Fix docker zombie process issue

* Init everywhere

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-16 17:24:58 +02:00
Rory de Zoete
1d4114183c Use main, not branch for ref check (#2288)
* Use main, not branch for ref check

* Add more debug

* Count main, not head

* Try new approach

* Conform to syntax

* Update approach

* Get full history

* Skip checkout

* Cleanup debug

* Remove more debug

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-16 15:41:31 +02:00
Rory de Zoete
4cde0e7a37 Error for fatal not git repo (#2286)
Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-16 13:59:41 +02:00
Rory de Zoete
83f7b8ed22 Add missing step output, revert one deploy step (#2285)
* Add missing step output, revert one deploy step

* Conform to syntax

* Update approach

* Add missing value

* Add missing needs

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
2022-08-16 13:41:51 +02:00
Rory de Zoete
b8f0f37de2 Gen2 GH runner (#2128)
* Re-add rustup override

* Try s3 bucket

* Set git version

* Use v4 cache key to prevent problems

* Switch to v5 for key

* Add second rustup fix

* Rebase

* Add kaniko steps

* Fix typo and set compress level

* Disable global run default

* Specify shell for step

* Change approach with kaniko

* Try less verbose shell spec

* Add submodule pull

* Add promote step

* Adjust dependency chain

* Try default swap again

* Use env

* Don't override aws key

* Make kaniko build conditional

* Specify runs on

* Try without dependency link

* Try soft fail

* Use image with git

* Try passing to next step

* Fix duplicate

* Try other approach

* Try other approach

* Fix typo

* Try other syntax

* Set env

* Adjust setup

* Try step 1

* Add link

* Try global env

* Fix mistake

* Debug

* Try other syntax

* Try other approach

* Change order

* Move output one step down

* Put output up one level

* Try other syntax

* Skip build

* Try output

* Re-enable build

* Try other syntax

* Skip middle step

* Update check

* Try first step of dockerhub push

* Update needs dependency

* Try explicit dir

* Add missing package

* Try other approach

* Try other approach

* Specify region

* Use with

* Try other approach

* Add debug

* Try other approach

* Set region

* Follow AWS example

* Try github approach

* Skip Qemu

* Try stdin

* Missing steps

* Add missing close

* Add echo debug

* Try v2 endpoint

* Use v1 endpoint

* Try without quotes

* Revert

* Try crane

* Add debug

* Split steps

* Fix duplicate

* Add shell step

* Conform to options

* Add verbose flag

* Try single step

* Try workaround

* First request fails hunch

* Try bullseye image

* Try other approach

* Adjust verbose level

* Try previous step

* Add more debug

* Remove debug step

* Remove rogue indent

* Try with larger image

* Add build tag step

* Update workflow for testing

* Add tag step for test

* Remove unused

* Update dependency chain

* Add ownership fix

* Use matrix for promote

* Force update

* Force build

* Remove unused

* Add new image

* Add missing argument

* Update dockerfile copy

* Update Dockerfile

* Update clone

* Update dockerfile

* Go to correct folder

* Use correct format

* Update dockerfile

* Remove cd

* Debug find where we are

* Add debug on first step

* Changedir to postgres

* Set workdir

* Use v1 approach

* Use other dependency

* Try other approach

* Try other approach

* Update dockerfile

* Update approach

* Update dockerfile

* Update approach

* Update dockerfile

* Update dockerfile

* Add workspace hack

* Update Dockerfile

* Update Dockerfile

* Update Dockerfile

* Change last step

* Cleanup pull in prep for review

* Force build images

* Add condition for latest tagging

* Use pinned version

* Try without name value

* Remove more names

* Shorten names

* Add kaniko comments

* Pin kaniko

* Pin crane and ecr helper

* Up one level

* Switch to pinned tag for rust image

* Force update for test

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@b04468bf-cdf4-41eb-9c94-aff4ca55e4bf.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@4795e9ee-4f32-401f-85f3-f316263b62b8.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@2f8bc4e5-4ec2-4ea2-adb1-65d863c4a558.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@27565b2b-72d5-4742-9898-a26c9033e6f9.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@ecc96c26-c6c4-4664-be6e-34f7c3f89a3c.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@7caff3a5-bf03-4202-bd0e-f1a93c86bdae.fritz.box>
2022-08-16 11:15:35 +02:00
Kirill Bulatov
18f251384d Check for entire range during sasl validation (#2281) 2022-08-16 11:10:38 +03:00
Alexander Bayandin
4cddb0f1a4 Set up a workflow to run pgbench against captest (#2077) 2022-08-15 18:54:31 +01:00
Arseny Sher
7b12deead7 Bump vendor/postgres to include XLP_FIRST_IS_CONTRECORD fix. (#2274) 2022-08-15 18:24:24 +03:00
Dmitry Rodionov
63a72d99bb increase timeout in wait_for_upload to avoid spurious failures when testing with real s3 2022-08-15 18:02:27 +03:00
Arthur Petukhovsky
116ecdf87a Improve walreceiver logic (#2253)
This patch makes walreceiver logic more complicated, but it should work better in most cases. Added `test_wal_lagging` to test scenarios where alive safekeepers can lag behind other alive safekeepers.

- There was a bug which looks like `etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn())` filtered all safekeepers in some strange cases. I removed this filter, it should probably help with #2237
- Now walreceiver_connection reports status, including commit_lsn. This allows keeping safekeeper connection even when etcd is down.
- Safekeeper connection now fails if pageserver doesn't receive safekeeper messages for some time. Usually safekeeper sends messages at least once per second.
- `LaggingWal` check now uses `commit_lsn` directly from safekeeper. This fixes the issue with often reconnects, when compute generates WAL really fast.
- `NoWalTimeout` is rewritten to trigger only when we know about the new WAL and the connected safekeeper doesn't stream any WAL. This allows setting a small `lagging_wal_timeout` because it will trigger only when we observe that the connected safekeeper has stuck.
2022-08-15 13:31:26 +03:00
Arseny Sher
431393e361 Find end of WAL on safekeepers using WalStreamDecoder.
We could make it inside wal_storage.rs, but taking into account that
 - wal_storage.rs reading is async
 - we don't need s3 here
 - error handling is different; error during decoding is normal
I decided to put it separately.

Test
cargo test test_find_end_of_wal_last_crossing_segment
prepared earlier by @yeputons passes now.

Fixes https://github.com/neondatabase/neon/issues/544
      https://github.com/neondatabase/cloud/issues/2004
Supersedes https://github.com/neondatabase/neon/pull/2066
2022-08-14 14:47:14 +03:00
Kirill Bulatov
f38f45b01d Better storage sync logs (#2268) 2022-08-13 10:58:14 +03:00
Andrey Taranik
a5154dce3e get_binaries script fix (#2263)
* get_binaries uses DOCKER_TAG taken from docker image build step

* remove docker tag discovery at all and fix get_binaries for version variable
2022-08-12 20:35:26 +03:00
Alexander Bayandin
da5f8486ce test_runner/pg_clients: collect docker logs (#2259) 2022-08-12 17:03:09 +01:00
Dmitry Ivanov
ad08c273d3 [proxy] Rework wire format of the password hack and some errors (#2236)
The new format has a few benefits: it's shorter, simpler and
human-readable as well. We don't use base64 anymore, since
url encoding got us covered.

We also show a better error in case we couldn't parse the
payload; the users should know it's all about passing the
correct project name.
2022-08-12 17:38:43 +03:00
Andrey Taranik
7f97269277 get_binaries uses DOCKER_TAG taken from docker image build step (#2260) 2022-08-12 16:01:22 +03:00
Thang Pham
6d99b4f1d8 disable test_import_from_pageserver_multisegment (#2258)
This test failed consistently on `main` now. It's better to temporarily disable it to avoid blocking others' PRs while investigating the root cause for the test failure.

See: #2255, #2256
2022-08-12 19:13:42 +07:00
Egor Suvorov
a7bf60631f postgres_ffi/waldecoder: introduce explicit enum State
Previously it was emulated with a combination of nullable fields.
This change should make the logic more readable.
2022-08-12 11:40:46 +03:00
Egor Suvorov
07bb7a2afe postgres_ffi/waldecoder: remove unused startlsn 2022-08-12 11:40:46 +03:00
Egor Suvorov
142e247e85 postgres_ffi/waldecoder: validate more header fields 2022-08-12 11:40:46 +03:00
Thang Pham
7da47d8a0a Fix timeline physical size flaky tests (#2244)
Resolves #2212.

- use `wait_for_last_flush_lsn` in `test_timeline_physical_size_*` tests

## Context
Need to wait for the pageserver to catch up with the compute's last flush LSN because during the timeline physical size API call, it's possible that there are running `LayerFlushThread` threads. These threads flush new layers into disk and hence update the physical size. This results in a mismatch between the physical size reported by the API and the actual physical size on disk.

### Note
The `LayerFlushThread` threads are processed **concurrently**, so it's possible that the above error still persists even with this patch. However, making the tests wait to finish processing all the WALs (not flushing) before calculating the physical size should help reduce the "flakiness" significantly
2022-08-12 14:28:50 +07:00
Thang Pham
dc52436a8f Fix bug when import large (>1GB) relations (#2172)
Resolves #2097 

- use timeline modification's `lsn` and timeline's `last_record_lsn` to determine the corresponding LSN to query data in `DatadirModification::get`
- update `test_import_from_pageserver`. Split the test into 2 variants: `small` and `multisegment`. 
  + `small` is the old test
  + `multisegment` is to simulate #2097 by using a larger number of inserted rows to create multiple segment files of a relation. `multisegment` is configured to only run with a `release` build
2022-08-12 09:24:20 +07:00
Kirill Bulatov
995a2de21e Share exponential backoff code and fix logic for delete task failure (#2252) 2022-08-11 23:21:06 +03:00
Arseny Sher
e593cbaaba Add pageserver checkpoint_timeout option.
To flush inmemory layer eventually when no new data arrives, which helps
safekeepers to suspend activity (stop pushing to the broker). Default 10m should
be ok.
2022-08-11 22:54:09 +03:00
Heikki Linnakangas
4b9e02be45 Update back vendor/postgres back; it was changed accidentally. (#2251)
Commit 4227cfc96e accidentally reverted vendor/postgres to an older
version. Update it back.
2022-08-11 19:25:08 +03:00
Kirill Bulatov
7a36d06cc2 Fix exponential backoff values 2022-08-11 08:34:57 +03:00
Konstantin Knizhnik
4227cfc96e Safe truncate (#2218)
* Move relation sie cache to layered timeline

* Fix obtaining current LSN for relation size cache

* Resolve merge conflicts

* Resolve merge conflicts

* Reestore 'lsn' field in DatadirModification

* adjust DatadirModification lsn in ingest_record

* Fix formatting

* Pass lsn to get_relsize

* Fix merge conflict

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Check if relation exists before trying to truncat it

refer #1932

* Add test reporducing FSM truncate problem

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
2022-08-09 22:45:33 +03:00
Dmitry Rodionov
1fc761983f support node id and remote storage params in docker_entrypoint.sh 2022-08-09 18:59:00 +03:00
Stas Kelvich
227d47d2f3 Update CONTRIBUTING.md 2022-08-09 14:18:25 +03:00
Stas Kelvich
0290893bcc Update CONTRIBUTING.md 2022-08-09 14:18:25 +03:00
Heikki Linnakangas
32fd709b34 Fix links to safekeeper protocol docs. (#2188)
safekeeper/README_PROTO.md was moved to docs/safekeeper-protocol.md in
commit 0b14fdb078, as part of reorganizing the docs into 'mdbook' format.

Fixes issue #1475. Thanks to @banks for spotting the outdated references.

In addition to fixing the above issue, this patch also fixes other broken links as a result of 0b14fdb078. See https://github.com/neondatabase/neon/pull/2188#pullrequestreview-1055918480.

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: Thang Pham <thang@neon.tech>
2022-08-09 10:19:18 +07:00
Kirill Bulatov
3a9bff81db Fix etcd typos 2022-08-08 19:04:46 +03:00
bojanserafimov
743370de98 Major migration script (#2073)
This script can be used to migrate a tenant across breaking storage versions, or (in the future) upgrading postgres versions. See the comment at the top for an overview.

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
2022-08-08 17:52:28 +02:00
Dmitry Rodionov
cdfa9fe705 avoid duplicate parameter, increase timeout 2022-08-08 12:15:16 +03:00
Dmitry Rodionov
7cd68a0c27 increase timeout to pass test with real s3 2022-08-08 12:15:16 +03:00
Dmitry Rodionov
beaa991f81 remove debug log 2022-08-08 12:15:16 +03:00
Dmitry Rodionov
9430abae05 use event so it fires only if workload thread successfully finished 2022-08-08 12:15:16 +03:00
Dmitry Rodionov
4da4c7f769 increase statement timeout 2022-08-08 12:15:16 +03:00
Dmitry Rodionov
0d14d4a1a8 ignore record property warning to fix benchmarks 2022-08-08 12:15:16 +03:00
bojanserafimov
8c8431ebc6 Add more buckets to pageserver latency metrics (#2225) 2022-08-06 11:45:47 +02:00
Ankur Srivastava
84d1bc06a9 refactor: replace lazy-static with once-cell (#2195)
- Replacing all the occurrences of lazy-static with `once-cell::sync::Lazy`
- fixes #1147

Signed-off-by: Ankur Srivastava <best.ankur@gmail.com>
2022-08-05 19:34:04 +02:00
Konstantin Knizhnik
5133db44e1 Move relation size cache from WalIngest to DatadirTimeline (#2094)
* Move relation sie cache to layered timeline

* Fix obtaining current LSN for relation size cache

* Resolve merge conflicts

* Resolve merge conflicts

* Reestore 'lsn' field in DatadirModification

* adjust DatadirModification lsn in ingest_record

* Fix formatting

* Pass lsn to get_relsize

* Fix merge conflict

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
2022-08-05 16:28:59 +03:00
Alexander Bayandin
4cb1074fe5 github/workflows: Fix git dubious ownership (#2223) 2022-08-05 13:44:57 +01:00
Arthur Petukhovsky
0a958b0ea1 Check find_end_of_wal errors instead of unwrap 2022-08-04 17:56:19 +03:00
Vadim Kharitonov
1bbc8090f3 [issue #1591] Add neon_local pageserver status handler 2022-08-04 16:38:29 +03:00
Dmitry Rodionov
f7d8db7e39 silence https://github.com/neondatabase/neon/issues/2211 2022-08-04 16:32:19 +03:00
Dmitry Rodionov
e54941b811 treat pytest warnings as errors 2022-08-04 16:32:19 +03:00
Heikki Linnakangas
52ce1c9d53 Speed up test shutdown, by polling more frequently.
A fair amount of the time in our python tests is spent waiting for the
pageserver and safekeeper processes to shut down. It doesn't matter so
much when you're running a lot of tests in parallel, but it's quite
noticeable when running them sequentially.

A big part of the slowness is that is that after sending the SIGTERM
signal, we poll to see if the process is still running, and the
polling happened at 1 s interval. Reduce it to 0.1 s.
2022-08-04 12:57:15 +03:00
Dmitry Rodionov
bc2cb5382b run real s3 tests in CI 2022-08-04 11:14:05 +03:00
Dmitry Rodionov
5f71aa09d3 support running tests against real s3 implementation without mocking 2022-08-04 11:14:05 +03:00
Dmitry Rodionov
b4f2c5b514 run benchmarks conditionally, on main or if run_benchmarks label is set 2022-08-03 01:36:14 +03:00
Alexander Bayandin
71f39bac3d github/workflows: upload artifacts to S3 (#2071) 2022-08-02 13:57:26 +01:00
Stas Kelvich
177d5b1f22 Bump postgres to get uuid extension 2022-08-02 11:16:26 +03:00
dependabot[bot]
8ba41b8c18 Bump pywin32 from 227 to 301 (#2202) 2022-08-01 19:08:09 +01:00
Dmitry Rodionov
1edf3eb2c8 increase timeout so mac os job can finish the build with all cache misses 2022-08-01 18:28:49 +03:00
Dmitry Rodionov
0ebb6bc4b0 Temporary pin Werkzeug version because moto hangs with newer one. See https://github.com/spulec/moto/issues/5341 2022-08-01 18:28:49 +03:00
Dmitry Rodionov
092a9b74d3 use only s3 in boto3-stubs and update mypy
Newer version of mypy fixes buggy error when trying to update only boto3 stubs.
However it brings new checks and starts to yell when we index into
cusror.fetchone without checking for None first. So this introduces a wrapper
to simplify quering for scalar values. I tried to use cursor_factory connection
argument but without success. There can be a better way to do that,
but this looks the simplest
2022-08-01 18:28:49 +03:00
Ankur Srivastava
e73b95a09d docs: linked poetry related step in tests section
Added the link to the dependencies which should be installed
before running the tests.
2022-08-01 18:13:01 +03:00
Alexander Bayandin
539007c173 github/workflows: make bash more strict (#2197) 2022-08-01 12:54:39 +01:00
181 changed files with 6498 additions and 3994 deletions

56
.github/actions/download/action.yml vendored Normal file
View File

@@ -0,0 +1,56 @@
name: "Download an artifact"
description: "Custom download action"
inputs:
name:
description: "Artifact name"
required: true
path:
description: "A directory to put artifact into"
default: "."
required: false
skip-if-does-not-exist:
description: "Allow to skip if file doesn't exist, fail otherwise"
default: false
required: false
runs:
using: "composite"
steps:
- name: Download artifact
id: download-artifact
shell: bash -euxo pipefail {0}
env:
TARGET: ${{ inputs.path }}
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
echo '::set-output name=SKIPPED::true'
exit 0
else
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
fi
echo '::set-output name=SKIPPED::false'
mkdir -p $(dirname $ARCHIVE)
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} ${ARCHIVE}
- name: Extract artifact
if: ${{ steps.download-artifact.outputs.SKIPPED == 'false' }}
shell: bash -euxo pipefail {0}
env:
TARGET: ${{ inputs.path }}
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
run: |
mkdir -p ${TARGET}
time tar -xf ${ARCHIVE} -C ${TARGET}
rm -f ${ARCHIVE}

View File

@@ -27,22 +27,35 @@ inputs:
description: 'Whether to upload the performance report'
required: false
default: 'false'
run_with_real_s3:
description: 'Whether to pass real s3 credentials to the test suite'
required: false
default: 'false'
real_s3_bucket:
description: 'Bucket name for real s3 tests'
required: false
default: ''
real_s3_region:
description: 'Region name for real s3 tests'
required: false
default: ''
real_s3_access_key_id:
description: 'Access key id'
required: false
default: ''
real_s3_secret_access_key:
description: 'Secret access key'
required: false
default: ''
runs:
using: "composite"
steps:
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
- name: Get Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-artifact
path: ./neon-artifact/
- name: Extract Neon artifact
shell: bash -ex {0}
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
rm -rf ./neon-artifact/
path: /tmp/neon
- name: Checkout
if: inputs.needs_postgres_source == 'true'
@@ -59,7 +72,7 @@ runs:
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Run pytest
@@ -70,7 +83,10 @@ runs:
# this variable will be embedded in perf test report
# and is needed to distinguish different environments
PLATFORM: github-actions-selfhosted
shell: bash -ex {0}
BUILD_TYPE: ${{ inputs.build_type }}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
shell: bash -euxo pipefail {0}
run: |
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
rm -rf $PERF_REPORT_DIR
@@ -84,6 +100,14 @@ runs:
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
fi
if [[ "${{ inputs.run_with_real_s3 }}" == "true" ]]; then
echo "REAL S3 ENABLED"
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=${{ inputs.real_s3_bucket }}
export REMOTE_STORAGE_S3_REGION=${{ inputs.real_s3_region }}
fi
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
mkdir -p "$PERF_REPORT_DIR"
@@ -123,7 +147,7 @@ runs:
fi
- name: Delete all data but logs
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
if: always()
run: |
du -sh /tmp/test_output/*
@@ -132,9 +156,7 @@ runs:
- name: Upload python test logs
if: always()
uses: actions/upload-artifact@v3
uses: ./.github/actions/upload
with:
retention-days: 7
if-no-files-found: error
name: python-test-${{ inputs.test_selection }}-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-logs
path: /tmp/test_output/

View File

@@ -5,13 +5,18 @@ runs:
using: "composite"
steps:
- name: Merge coverage data
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Upload coverage data
uses: actions/upload-artifact@v3
- name: Download previous coverage data into the same directory
uses: ./.github/actions/download
with:
retention-days: 7
if-no-files-found: error
name: coverage-data-artifact
path: /tmp/coverage/
path: /tmp/coverage
skip-if-does-not-exist: true # skip if there's no previous coverage to download
- name: Upload coverage data
uses: ./.github/actions/upload
with:
name: coverage-data-artifact
path: /tmp/coverage

55
.github/actions/upload/action.yml vendored Normal file
View File

@@ -0,0 +1,55 @@
name: "Upload an artifact"
description: "Custom upload action"
inputs:
name:
description: "Artifact name"
required: true
path:
description: "A directory or file to upload"
required: true
runs:
using: "composite"
steps:
- name: Prepare artifact
shell: bash -euxo pipefail {0}
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
run: |
mkdir -p $(dirname $ARCHIVE)
if [ -f ${ARCHIVE} ]; then
echo 2>&1 "File ${ARCHIVE} already exist. Something went wrong before"
exit 1
fi
ZSTD_NBTHREADS=0
if [ -d ${SOURCE} ]; then
time tar -C ${SOURCE} -cf ${ARCHIVE} --zstd .
elif [ -f ${SOURCE} ]; then
time tar -cf ${ARCHIVE} --zstd ${SOURCE}
elif ! ls ${SOURCE} > /dev/null 2>&1; then
echo 2>&1 "${SOURCE} does not exist"
exit 2
else
echo 2>&1 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
exit 3
fi
- name: Upload artifact
shell: bash -euxo pipefail {0}
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
FILESIZE=$(du -sh ${ARCHIVE} | cut -f1)
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}
# Ref https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-job-summary
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}

View File

@@ -2,30 +2,14 @@
set -e
RELEASE=${RELEASE:-false}
# look at docker hub for latest tag for neon docker image
if [ "${RELEASE}" = "true" ]; then
echo "search latest release tag"
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/neondatabase/neon/tags |jq -r -S '.[].name' | grep release | sed 's/release-//g' | grep -E '^[0-9]+$' | sort -n | tail -1)
if [ -z "${VERSION}" ]; then
echo "no any docker tags found, exiting..."
exit 1
else
TAG="release-${VERSION}"
fi
if [ -n "${DOCKER_TAG}" ]; then
# Verson is DOCKER_TAG but without prefix
VERSION=$(echo $DOCKER_TAG | sed 's/^.*-//g')
else
echo "search latest dev tag"
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/neondatabase/neon/tags |jq -r -S '.[].name' | grep -E '^[0-9]+$' | sort -n | tail -1)
if [ -z "${VERSION}" ]; then
echo "no any docker tags found, exiting..."
exit 1
else
TAG="${VERSION}"
fi
echo "Please set DOCKER_TAG environment variable"
exit 1
fi
echo "found ${VERSION}"
# do initial cleanup
rm -rf neon_install postgres_install.tar.gz neon_install.tar.gz .neon_current_version
@@ -33,8 +17,8 @@ mkdir neon_install
# retrieve binaries from docker image
echo "getting binaries from docker image"
docker pull --quiet neondatabase/neon:${TAG}
ID=$(docker create neondatabase/neon:${TAG})
docker pull --quiet neondatabase/neon:${DOCKER_TAG}
ID=$(docker create neondatabase/neon:${DOCKER_TAG})
docker cp ${ID}:/data/postgres_install.tar.gz .
tar -xzf postgres_install.tar.gz -C neon_install
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/

View File

@@ -1,7 +1,8 @@
#!/bin/sh
# get instance id from meta-data service
# fetch params from meta-data service
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
AZ_ID=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone)
# store fqdn hostname in var
HOST=$(hostname -f)
@@ -14,7 +15,8 @@ cat <<EOF | tee /tmp/payload
"port": 6500,
"http_port": 7676,
"region_id": {{ console_region_id }},
"instance_id": "${INSTANCE_ID}"
"instance_id": "${INSTANCE_ID}",
"availability_zone_id": "${AZ_ID}"
}
EOF

View File

@@ -1,4 +1,4 @@
name: benchmarking
name: Benchmarking
on:
# uncomment to run on push for debugging your PR
@@ -15,6 +15,15 @@ on:
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
bench:
# this workflow runs on self hosteed runner
@@ -60,7 +69,6 @@ jobs:
- name: Setup cluster
env:
BENCHMARK_CONNSTR: "${{ secrets.BENCHMARK_STAGING_CONNSTR }}"
shell: bash
run: |
set -e
@@ -96,7 +104,9 @@ jobs:
# since it might generate duplicates when calling ingest_perf_test_result.py
rm -rf perf-report-staging
mkdir -p perf-report-staging
./scripts/pytest test_runner/performance/ -v -m "remote_cluster" --skip-interfering-proc-check --out-dir perf-report-staging --timeout 3600
# Set --sparse-ordering option of pytest-order plugin to ensure tests are running in order of appears in the file,
# it's important for test_perf_pgbench.py::test_pgbench_remote_* tests
./scripts/pytest test_runner/performance/ -v -m "remote_cluster" --sparse-ordering --skip-interfering-proc-check --out-dir perf-report-staging --timeout 5400
- name: Submit result
env:
@@ -113,3 +123,106 @@ jobs:
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
pgbench-compare:
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
REMOTE_ENV: "1"
POSTGRES_DISTRIB_DIR: /usr
TEST_OUTPUT: /tmp/test_output
strategy:
fail-fast: false
matrix:
connstr: [ BENCHMARK_CAPTEST_CONNSTR, BENCHMARK_RDS_CONNSTR ]
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2817580636
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
- name: Calculate platform
id: calculate-platform
env:
CONNSTR: ${{ matrix.connstr }}
run: |
if [ "${CONNSTR}" = "BENCHMARK_CAPTEST_CONNSTR" ]; then
PLATFORM=neon-captest
elif [ "${CONNSTR}" = "BENCHMARK_RDS_CONNSTR" ]; then
PLATFORM=rds-aurora
else
echo 2>&1 "Unknown CONNSTR=${CONNSTR}. Allowed are BENCHMARK_CAPTEST_CONNSTR, and BENCHMARK_RDS_CONNSTR only"
exit 1
fi
echo "::set-output name=PLATFORM::${PLATFORM}"
- name: Install Deps
run: |
echo "deb http://apt.postgresql.org/pub/repos/apt focal-pgdg main" | sudo tee /etc/apt/sources.list.d/pgdg.list
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt -y update
sudo apt install -y postgresql-14 postgresql-client-14
- name: Benchmark init
env:
PLATFORM: ${{ steps.calculate-platform.outputs.PLATFORM }}
BENCHMARK_CONNSTR: ${{ secrets[matrix.connstr] }}
run: |
mkdir -p perf-report-captest
psql $BENCHMARK_CONNSTR -c "SELECT 1;"
./scripts/pytest test_runner/performance/test_perf_pgbench.py::test_pgbench_remote_init -v -m "remote_cluster" --skip-interfering-proc-check --out-dir perf-report-captest --timeout 21600
- name: Benchmark simple-update
env:
PLATFORM: ${{ steps.calculate-platform.outputs.PLATFORM }}
BENCHMARK_CONNSTR: ${{ secrets[matrix.connstr] }}
run: |
psql $BENCHMARK_CONNSTR -c "SELECT 1;"
./scripts/pytest test_runner/performance/test_perf_pgbench.py::test_pgbench_remote_simple_update -v -m "remote_cluster" --skip-interfering-proc-check --out-dir perf-report-captest --timeout 21600
- name: Benchmark select-only
env:
PLATFORM: ${{ steps.calculate-platform.outputs.PLATFORM }}
BENCHMARK_CONNSTR: ${{ secrets[matrix.connstr] }}
run: |
psql $BENCHMARK_CONNSTR -c "SELECT 1;"
./scripts/pytest test_runner/performance/test_perf_pgbench.py::test_pgbench_remote_select_only -v -m "remote_cluster" --skip-interfering-proc-check --out-dir perf-report-captest --timeout 21600
- name: Submit result
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
run: |
REPORT_FROM=$(realpath perf-report-captest) REPORT_TO=staging scripts/generate_and_push_perf_report.sh
- name: Upload logs
if: always()
uses: ./.github/actions/upload
with:
name: bench-captest-${{ steps.calculate-platform.outputs.PLATFORM }}
path: /tmp/test_output/
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -3,14 +3,10 @@ name: Test and Deploy
on:
push:
branches:
- main
- release
- main
- release
pull_request:
defaults:
run:
shell: bash -ex {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
@@ -21,8 +17,39 @@ env:
COPT: '-Werror'
jobs:
tag:
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:latest
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "::set-output name=tag::$(git rev-list --count HEAD)"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "::set-output name=tag::release-$(git rev-list --count HEAD)"
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
echo "::set-output name=tag::$GITHUB_RUN_ID"
fi
shell: bash
id: build-tag
build-neon:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
strategy:
fail-fast: false
matrix:
@@ -31,8 +58,19 @@ jobs:
env:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.sha }}
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- name: Checkout
uses: actions/checkout@v3
with:
@@ -42,6 +80,7 @@ jobs:
- name: Set pg revision for caching
id: pg_ver
run: echo ::set-output name=pg_rev::$(git rev-parse HEAD:vendor/postgres)
shell: bash -euxo pipefail {0}
# Set some environment variables used by all the steps.
#
@@ -65,6 +104,7 @@ jobs:
echo "cov_prefix=${cov_prefix}" >> $GITHUB_ENV
echo "CARGO_FEATURES=${CARGO_FEATURES}" >> $GITHUB_ENV
echo "CARGO_FLAGS=${CARGO_FLAGS}" >> $GITHUB_ENV
shell: bash -euxo pipefail {0}
# Don't include the ~/.cargo/registry/src directory. It contains just
# uncompressed versions of the crates in ~/.cargo/registry/cache
@@ -81,8 +121,8 @@ jobs:
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
v6-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
v6-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
- name: Cache postgres build
id: cache_pg
@@ -94,14 +134,17 @@ jobs:
- name: Build postgres
if: steps.cache_pg.outputs.cache-hit != 'true'
run: mold -run make postgres -j$(nproc)
shell: bash -euxo pipefail {0}
- name: Run cargo build
run: |
${cov_prefix} mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
shell: bash -euxo pipefail {0}
- name: Run cargo test
run: |
${cov_prefix} cargo test $CARGO_FLAGS
shell: bash -euxo pipefail {0}
- name: Install rust binaries
run: |
@@ -123,6 +166,7 @@ jobs:
mkdir -p /tmp/coverage/
mkdir -p /tmp/neon/test_bin/
test_exe_paths=$(
${cov_prefix} cargo test $CARGO_FLAGS --message-format=json --no-run |
jq -r '.executable | select(. != null)'
@@ -141,29 +185,28 @@ jobs:
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
done
fi
shell: bash -euxo pipefail {0}
- name: Install postgres binaries
run: cp -a tmp_install /tmp/neon/pg_install
shell: bash -euxo pipefail {0}
- name: Prepare neon artifact
run: ZSTD_NBTHREADS=0 tar -C /tmp/neon/ -cf ./neon.tar.zst --zstd .
- name: Upload neon binaries
uses: actions/upload-artifact@v3
- name: Upload Neon artifact
uses: ./.github/actions/upload
with:
retention-days: 7
if-no-files-found: error
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon.tar.zst
path: /tmp/neon
# XXX: keep this after the binaries.list is formed, so the coverage can properly work later
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
pg_regress-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ build-neon ]
strategy:
fail-fast: false
@@ -190,7 +233,10 @@ jobs:
uses: ./.github/actions/save-coverage-data
other-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ build-neon ]
strategy:
fail-fast: false
@@ -210,14 +256,22 @@ jobs:
build_type: ${{ matrix.build_type }}
rust_toolchain: ${{ matrix.rust_toolchain }}
test_selection: batch_others
run_with_real_s3: true
real_s3_bucket: ci-tests-s3
real_s3_region: us-west-2
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
benchmarks:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ build-neon ]
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
strategy:
fail-fast: false
matrix:
@@ -245,7 +299,10 @@ jobs:
# while coverage is currently collected for the debug ones
coverage-report:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ other-tests, pg_regress-tests ]
strategy:
fail-fast: false
@@ -268,28 +325,23 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git/
target/
key: v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
key: v5-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
- name: Get Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon-artifact/
path: /tmp/neon
- name: Extract Neon artifact
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
rm -rf ./neon-artifact/
- name: Restore coverage data
uses: actions/download-artifact@v3
- name: Get coverage artifact
uses: ./.github/actions/download
with:
name: coverage-data-artifact
path: /tmp/coverage/
path: /tmp/coverage
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
shell: bash -euxo pipefail {0}
- name: Build and upload coverage report
run: |
@@ -322,187 +374,171 @@ jobs:
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
shell: bash -euxo pipefail {0}
trigger-e2e-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ build-neon ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\"
}
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\"
}
}"
neon-image:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
docker-image:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ pg_regress-tests, other-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v1 # v3 won't work with kaniko
with:
submodules: true
fetch-depth: 0
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
with:
driver: docker
- name: Kaniko build neon
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
- name: Get build tag
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "::set-output name=tag::$(git rev-list --count HEAD)"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "::set-output name=tag::release-$(git rev-list --count HEAD)"
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
id: build-tag
compute-tools-image:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
- name: Get legacy build tag
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "::set-output name=tag::latest"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "::set-output name=tag::release"
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
id: legacy-build-tag
- name: Build neon Docker image
uses: docker/build-push-action@v2
with:
context: .
build-args: |
GIT_VERSION="${{github.sha}}"
AWS_ACCESS_KEY_ID="${{secrets.CACHEPOT_AWS_ACCESS_KEY_ID}}"
AWS_SECRET_ACCESS_KEY="${{secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY}}"
pull: true
push: true
tags: neondatabase/neon:${{steps.legacy-build-tag.outputs.tag}}, neondatabase/neon:${{steps.build-tag.outputs.tag}}
docker-image-compute:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ pg_regress-tests, other-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v1 # v3 won't work with kaniko
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute tools
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
compute-node-image:
runs-on: dev
container: gcr.io/kaniko-project/executor:v1.9.0-debug
steps:
- name: Checkout
uses: actions/checkout@v1 # v3 won't work with kaniko
with:
submodules: true
fetch-depth: 0
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Configure ECR login
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
with:
driver: docker
- name: Kaniko build compute node
working-directory: ./vendor/postgres/
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
- name: Get build tag
promote-images:
runs-on: dev
needs: [ neon-image, compute-tools-image, compute-node-image ]
if: github.event_name != 'workflow_dispatch'
container: amazon/aws-cli
strategy:
fail-fast: false
matrix:
name: [ neon, compute-tools, compute-node ]
steps:
- name: Promote image to latest
run:
MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=$GITHUB_RUN_ID --query 'images[].imageManifest' --output text) && aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
push-docker-hub:
runs-on: dev
needs: [ promote-images, tag ]
container: golang:1.19-bullseye
steps:
- name: Install Crane & ECR helper
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "::set-output name=tag::$(git rev-list --count HEAD)"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "::set-output name=tag::release-$(git rev-list --count HEAD)"
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
id: build-tag
go install github.com/google/go-containerregistry/cmd/crane@31786c6cbb82d6ec4fb8eb79cd9387905130534e # v0.11.0
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0
# - name: Get build tag
# run: |
# if [[ "$GITHUB_REF_NAME" == "main" ]]; then
# echo "::set-output name=tag::$(git rev-list --count HEAD)"
# elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
# echo "::set-output name=tag::release-$(git rev-list --count HEAD)"
# else
# echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release' "
# echo "::set-output name=tag::$GITHUB_RUN_ID"
# fi
# id: build-tag
- name: Get legacy build tag
- name: Configure ECR login
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "::set-output name=tag::latest"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "::set-output name=tag::release"
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1
fi
id: legacy-build-tag
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Build compute-tools Docker image
uses: docker/build-push-action@v2
with:
context: .
build-args: |
GIT_VERSION="${{github.sha}}"
AWS_ACCESS_KEY_ID="${{secrets.CACHEPOT_AWS_ACCESS_KEY_ID}}"
AWS_SECRET_ACCESS_KEY="${{secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY}}"
push: false
file: Dockerfile.compute-tools
tags: neondatabase/compute-tools:local
- name: Pull neon image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:latest neon
- name: Push compute-tools Docker image
uses: docker/build-push-action@v2
with:
context: .
build-args: |
GIT_VERSION="${{github.sha}}"
AWS_ACCESS_KEY_ID="${{secrets.CACHEPOT_AWS_ACCESS_KEY_ID}}"
AWS_SECRET_ACCESS_KEY="${{secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY}}"
push: true
file: Dockerfile.compute-tools
tags: neondatabase/compute-tools:${{steps.legacy-build-tag.outputs.tag}}
- name: Pull compute tools image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest compute-tools
- name: Build compute-node Docker image
uses: docker/build-push-action@v2
with:
context: ./vendor/postgres/
build-args:
COMPUTE_TOOLS_TAG=local
push: true
tags: neondatabase/compute-node:${{steps.legacy-build-tag.outputs.tag}}, neondatabase/compute-node:${{steps.build-tag.outputs.tag}}
- name: Pull compute node image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:latest compute-node
- name: Configure docker login
run: |
# ECR Credential Helper & Docker Hub don't work together in config, hence reset
echo "" > /github/home/.docker/config.json
crane auth login -u ${{ secrets.NEON_DOCKERHUB_USERNAME }} -p ${{ secrets.NEON_DOCKERHUB_PASSWORD }} index.docker.io
- name: Push neon image to Docker Hub
run: crane push neon neondatabase/neon:${{needs.tag.outputs.build-tag}}
- name: Push compute tools image to Docker Hub
run: crane push compute-tools neondatabase/compute-tools:${{needs.tag.outputs.build-tag}}
- name: Push compute node image to Docker Hub
run: crane push compute-node neondatabase/compute-node:${{needs.tag.outputs.build-tag}}
- name: Add latest tag to images
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane tag neondatabase/neon:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node:${{needs.tag.outputs.build-tag}} latest
calculate-deploy-targets:
runs-on: [ self-hosted, Linux, k8s-runner ]
@@ -528,14 +564,16 @@ jobs:
deploy:
runs-on: [ self-hosted, Linux, k8s-runner ]
# We need both storage **and** compute images for deploy, because control plane
# picks the compute version based on the storage version. If it notices a fresh
# storage it may bump the compute version. And if compute image failed to build
# it may break things badly.
needs: [ docker-image, docker-image-compute, calculate-deploy-targets ]
#container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:latest
# We need both storage **and** compute images for deploy, because control plane picks the compute version based on the storage version.
# If it notices a fresh storage it may bump the compute version. And if compute image failed to build it may break things badly
needs: [ push-docker-hub, calculate-deploy-targets, tag, other-tests, pg_regress-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include: ${{fromJSON(needs.calculate-deploy-targets.outputs.matrix-include)}}
@@ -546,12 +584,19 @@ jobs:
submodules: true
fetch-depth: 0
- name: Setup python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Setup ansible
run: |
export PATH="/root/.local/bin:$PATH"
pip install --progress-bar off --user ansible boto3
- name: Redeploy
run: |
export DOCKER_TAG=${{needs.tag.outputs.build-tag}}
cd "$(pwd)/.github/ansible"
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
@@ -574,13 +619,16 @@ jobs:
rm -f neon_install.tar.gz .neon_current_version
deploy-proxy:
runs-on: [ self-hosted, Linux, k8s-runner ]
# Compute image isn't strictly required for proxy deploy, but let's still wait for it
# to run all deploy jobs consistently.
needs: [ docker-image, docker-image-compute, calculate-deploy-targets ]
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:latest
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, other-tests, pg_regress-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
strategy:
matrix:
include: ${{fromJSON(needs.calculate-deploy-targets.outputs.matrix-include)}}
@@ -593,6 +641,9 @@ jobs:
submodules: true
fetch-depth: 0
- name: Add curl
run: apt update && apt install curl -y
- name: Store kubeconfig file
run: |
echo "${{ secrets[matrix.kubeconfig_secret] }}" | base64 --decode > ${KUBECONFIG}
@@ -605,6 +656,6 @@ jobs:
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.docker-image.outputs.build-tag}}
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace default --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -8,7 +8,7 @@ on:
defaults:
run:
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
@@ -27,7 +27,7 @@ jobs:
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
rust_toolchain: [1.58]
os: [ubuntu-latest, macos-latest]
timeout-minutes: 50
timeout-minutes: 60
name: run regression test suite
runs-on: ${{ matrix.os }}
@@ -101,7 +101,7 @@ jobs:
!~/.cargo/registry/src
~/.cargo/git
target
key: v1-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }}
key: v2-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }}
- name: Run cargo clippy
run: ./run_clippy.sh

View File

@@ -19,8 +19,12 @@ concurrency:
jobs:
test-postgres-client-libs:
# TODO: switch to gen2 runner, requires docker
runs-on: [ ubuntu-latest ]
env:
TEST_OUTPUT: /tmp/test_output
steps:
- name: Checkout
uses: actions/checkout@v3
@@ -40,16 +44,16 @@ jobs:
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Run pytest
env:
REMOTE_ENV: 1
BENCHMARK_CONNSTR: "${{ secrets.BENCHMARK_STAGING_CONNSTR }}"
TEST_OUTPUT: /tmp/test_output
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: |
# Test framework expects we have psql binary;
# but since we don't really need it in this test, let's mock it
@@ -61,9 +65,18 @@ jobs:
-m "remote_cluster" \
-rA "test_runner/pg_clients"
# We use GitHub's action upload-artifact because `ubuntu-latest` doesn't have configured AWS CLI.
# It will be fixed after switching to gen2 runner
- name: Upload python test logs
if: always()
uses: actions/upload-artifact@v3
with:
retention-days: 7
name: python-test-pg_clients-${{ runner.os }}-stage-logs
path: ${{ env.TEST_OUTPUT }}
- name: Post to a Slack channel
if: failure()
id: slack
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
/target
/bindings/python/neon-dev-utils/target
/tmp_check
/tmp_install
/tmp_check_cli

View File

@@ -11,17 +11,15 @@ than it was before.
## Submitting changes
1. Make a PR for every change.
Even seemingly trivial patches can break things in surprising ways.
Use of common sense is OK. If you're only fixing a typo in a comment,
it's probably fine to just push it. But if in doubt, open a PR.
2. Get at least one +1 on your PR before you push.
1. Get at least one +1 on your PR before you push.
For simple patches, it will only take a minute for someone to review
it.
2. Don't force push small changes after making the PR ready for review.
Doing so will force readers to re-read your entire PR, which will delay
the review process.
3. Always keep the CI green.
Do not push, if the CI failed on your PR. Even if you think it's not

83
Cargo.lock generated
View File

@@ -48,9 +48,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.58"
version = "1.0.62"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704"
checksum = "1485d4d2cc45e7b201ee3767015c96faa5904387c9d87c6efdd0fb511f12d305"
dependencies = [
"backtrace",
]
@@ -154,9 +154,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.12"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e"
checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648"
dependencies = [
"async-trait",
"axum-core",
@@ -317,15 +317,6 @@ dependencies = [
"serde",
]
[[package]]
name = "cast"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a"
dependencies = [
"rustc_version",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -504,8 +495,8 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"lazy_static",
"nix",
"once_cell",
"pageserver",
"postgres",
"regex",
@@ -579,7 +570,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f"
dependencies = [
"atty",
"cast 0.3.0",
"cast",
"clap 2.34.0",
"criterion-plot",
"csv",
@@ -600,11 +591,11 @@ dependencies = [
[[package]]
name = "criterion-plot"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57"
checksum = "2673cc8207403546f45f5fd319a974b1e6983ad1a3ee7e6041650013be041876"
dependencies = [
"cast 0.2.7",
"cast",
"itertools",
]
@@ -680,9 +671,9 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ccfd8c0ee4cce11e45b3fd6f9d5e69e0cc62912aa6a0cb1bf4617b0eba5a12f"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
@@ -1116,9 +1107,9 @@ dependencies = [
[[package]]
name = "gimli"
version = "0.26.1"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d"
[[package]]
name = "git-version"
@@ -1184,9 +1175,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.12.2"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "heck"
@@ -1388,7 +1379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg",
"hashbrown 0.12.2",
"hashbrown 0.12.3",
]
[[package]]
@@ -1418,6 +1409,17 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "integration_tests"
version = "0.1.0"
dependencies = [
"anyhow",
"pg_bin",
"tokio",
"tokio-postgres",
"utils",
]
[[package]]
name = "ipnet"
version = "2.5.0"
@@ -1600,8 +1602,8 @@ dependencies = [
name = "metrics"
version = "0.1.0"
dependencies = [
"lazy_static",
"libc",
"once_cell",
"prometheus",
"workspace_hack",
]
@@ -1851,9 +1853,9 @@ dependencies = [
[[package]]
name = "os_str_bytes"
version = "6.1.0"
version = "6.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa"
checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4"
[[package]]
name = "pageserver"
@@ -1879,7 +1881,6 @@ dependencies = [
"humantime-serde",
"hyper",
"itertools",
"lazy_static",
"metrics",
"nix",
"once_cell",
@@ -1988,6 +1989,14 @@ dependencies = [
"indexmap",
]
[[package]]
name = "pg_bin"
version = "0.1.0"
dependencies = [
"tokio-postgres",
"utils",
]
[[package]]
name = "phf"
version = "0.10.1"
@@ -2125,9 +2134,9 @@ dependencies = [
"crc32c",
"env_logger",
"hex",
"lazy_static",
"log",
"memoffset",
"once_cell",
"postgres",
"rand",
"regex",
@@ -2279,6 +2288,7 @@ dependencies = [
"anyhow",
"async-trait",
"base64",
"bstr",
"bytes",
"clap 3.2.12",
"futures",
@@ -2287,9 +2297,9 @@ dependencies = [
"hex",
"hmac 0.12.1",
"hyper",
"lazy_static",
"md5",
"metrics",
"once_cell",
"parking_lot 0.12.1",
"pin-project-lite",
"rand",
@@ -2735,9 +2745,9 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.7"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0a5f7c728f5d284929a1cccb5bc19884422bfe6ef4d6c409da2c41838983fcf"
checksum = "24c8ad4f0c00e1eb5bc7614d236a7f1300e3dbd76b68cac8e06fb00b015ad8d8"
[[package]]
name = "ryu"
@@ -2763,7 +2773,6 @@ dependencies = [
"hex",
"humantime",
"hyper",
"lazy_static",
"metrics",
"once_cell",
"postgres",
@@ -3617,9 +3626,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c"
checksum = "15c61ba63f9235225a22310255a29b806b907c9b8c964bcbd0a2c70f3f2deea7"
[[package]]
name = "unicode-normalization"
@@ -3680,9 +3689,9 @@ dependencies = [
"hex-literal",
"hyper",
"jsonwebtoken",
"lazy_static",
"metrics",
"nix",
"once_cell",
"pin-project-lite",
"postgres",
"postgres-protocol",

View File

@@ -7,8 +7,13 @@ members = [
"safekeeper",
"workspace_hack",
"neon_local",
"integration_tests",
"libs/*",
]
exclude = [
"bindings/python/neon-dev-utils",
]
[profile.release]
# This is useful for profiling and, to some extent, debug.

View File

@@ -1,8 +1,6 @@
# Build Postgres
FROM neondatabase/rust:1.58 AS pg-build
WORKDIR /pg
USER root
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned AS pg-build
WORKDIR /home/nonroot
COPY vendor/postgres vendor/postgres
COPY Makefile Makefile
@@ -11,27 +9,30 @@ ENV BUILD_TYPE release
RUN set -e \
&& mold -run make -j $(nproc) -s postgres \
&& rm -rf tmp_install/build \
&& tar -C tmp_install -czf /postgres_install.tar.gz .
&& tar -C tmp_install -czf /home/nonroot/postgres_install.tar.gz .
# Build zenith binaries
FROM neondatabase/rust:1.58 AS build
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build
ARG RUSTC_WRAPPER=cachepot
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
ENV AWS_REGION=eu-central-1
ENV CACHEPOT_S3_KEY_PREFIX=cachepot
ARG CACHEPOT_BUCKET=neon-github-dev
#ARG AWS_ACCESS_KEY_ID
#ARG AWS_SECRET_ACCESS_KEY
COPY --from=pg-build /pg/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY --from=pg-build /home/nonroot/tmp_install/include/postgresql/server tmp_install/include/postgresql/server
COPY . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& sudo -E "PATH=$PATH" mold -run cargo build --release \
&& mold -run cargo build --release \
&& cachepot -s
# Build final image
@@ -40,8 +41,8 @@ FROM debian:bullseye-slim
WORKDIR /data
RUN set -e \
&& apt-get update \
&& apt-get install -y \
&& apt update \
&& apt install -y \
libreadline-dev \
libseccomp-dev \
openssl \
@@ -50,17 +51,14 @@ RUN set -e \
&& useradd -d /data zenith \
&& chown -R zenith:zenith /data
COPY --from=build --chown=zenith:zenith /home/runner/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/runner/target/release/proxy /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=zenith:zenith /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=pg-build /pg/tmp_install/ /usr/local/
COPY --from=pg-build /postgres_install.tar.gz /data/
COPY docker-entrypoint.sh /docker-entrypoint.sh
COPY --from=pg-build /home/nonroot/tmp_install/ /usr/local/
COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
VOLUME ["/data"]
USER zenith
EXPOSE 6400
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["pageserver"]

View File

@@ -1,22 +1,25 @@
# First transient image to build compute_tools binaries
# NB: keep in sync with rust image version in .github/workflows/build_and_test.yml
FROM neondatabase/rust:1.58 AS rust-build
FROM 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned AS rust-build
WORKDIR /home/nonroot
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build.
ARG RUSTC_WRAPPER=cachepot
ARG CACHEPOT_BUCKET=zenith-rust-cachepot
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
ENV AWS_REGION=eu-central-1
ENV CACHEPOT_S3_KEY_PREFIX=cachepot
ARG CACHEPOT_BUCKET=neon-github-dev
#ARG AWS_ACCESS_KEY_ID
#ARG AWS_SECRET_ACCESS_KEY
COPY . .
RUN set -e \
&& sudo -E "PATH=$PATH" mold -run cargo build -p compute_tools --release \
&& mold -run cargo build -p compute_tools --release \
&& cachepot -s
# Final image that only has one binary
FROM debian:buster-slim
FROM debian:bullseye-slim
COPY --from=rust-build /home/runner/target/release/compute_ctl /usr/local/bin/compute_ctl
COPY --from=rust-build /home/nonroot/target/release/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -204,6 +204,8 @@ postgres=# select * from t;
## Running tests
Ensure your dependencies are installed as described [here](https://github.com/neondatabase/neon#dependency-installation-notes).
```sh
git clone --recursive https://github.com/neondatabase/neon.git
make # builds also postgres and installs it to ./tmp_install

264
bindings/python/neon-dev-utils/Cargo.lock generated Normal file
View File

@@ -0,0 +1,264 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "indoc"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47741a8bc60fb26eb8d6e0238bbb26d8575ff623fdc97b1a2c00c050b9684ed8"
dependencies = [
"indoc-impl",
"proc-macro-hack",
]
[[package]]
name = "indoc-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce046d161f000fffde5f432a0d034d0341dc152643b2598ed5bfce44c4f3a8f0"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
"unindent",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "libc"
version = "0.2.132"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
[[package]]
name = "lock_api"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f80bf5aacaf25cbfc8210d1cfb718f2bf3b11c4c54e5afe36c236853a8ec390"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "neon-dev-utils"
version = "0.1.0"
dependencies = [
"pyo3",
]
[[package]]
name = "once_cell"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]]
name = "paste"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45ca20c77d80be666aef2b45486da86238fabe33e38306bd3118fe4af33fa880"
dependencies = [
"paste-impl",
"proc-macro-hack",
]
[[package]]
name = "paste-impl"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d95a7db200b97ef370c8e6de0088252f7e0dfff7d047a28528e47456c0fc98b6"
dependencies = [
"proc-macro-hack",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
dependencies = [
"unicode-ident",
]
[[package]]
name = "pyo3"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d41d50a7271e08c7c8a54cd24af5d62f73ee3a6f6a314215281ebdec421d5752"
dependencies = [
"cfg-if",
"indoc",
"libc",
"parking_lot",
"paste",
"pyo3-build-config",
"pyo3-macros",
"unindent",
]
[[package]]
name = "pyo3-build-config"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "779239fc40b8e18bc8416d3a37d280ca9b9fb04bda54b98037bb6748595c2410"
dependencies = [
"once_cell",
]
[[package]]
name = "pyo3-macros"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b247e8c664be87998d8628e86f282c25066165f1f8dda66100c48202fdb93a"
dependencies = [
"pyo3-macros-backend",
"quote",
"syn",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a8c2812c412e00e641d99eeb79dd478317d981d938aa60325dfa7157b607095"
dependencies = [
"proc-macro2",
"pyo3-build-config",
"quote",
"syn",
]
[[package]]
name = "quote"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "smallvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
[[package]]
name = "syn"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
[[package]]
name = "unindent"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ee9362deb4a96cef4d437d1ad49cffc9b9e92d202b6995674e928ce684f112"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

View File

@@ -0,0 +1,16 @@
[package]
name = "neon-dev-utils"
version = "0.1.0"
edition = "2021"
[lib]
name = "neon_dev_utils"
# "cdylib" is necessary to produce a shared library for Python to import from.
#
# Downstream Rust code (including code in `bin/`, `examples/`, and `tests/`) will not be able
# to `use string_sum;` unless the "rlib" or "lib" crate type is also included, e.g.:
# crate-type = ["cdylib", "rlib"]
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.15.1", features = ["extension-module"] }

View File

@@ -0,0 +1,31 @@
[[package]]
name = "maturin"
version = "0.13.2"
description = "Build and publish crates with pyo3, rust-cpython and cffi bindings as well as rust binaries as python packages"
category = "dev"
optional = false
python-versions = ">=3.7"
[package.dependencies]
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
[package.extras]
zig = ["ziglang (>=0.9.0,<0.10.0)"]
patchelf = ["patchelf"]
[[package]]
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
category = "dev"
optional = false
python-versions = ">=3.7"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "4e177514d6cf74b58bcd8ca30ef300c10a833b3e6b1d809aa57337ee20efeb47"
[metadata.files]
maturin = []
tomli = []

View File

@@ -0,0 +1,15 @@
[tool.poetry]
name = "neon-dev-utils"
version = "0.1.0"
description = "Python bindings for common neon development utils"
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.10"
[tool.poetry.dev-dependencies]
maturin = "^0.13.2"
[build-system]
requires = ["maturin>=0.13.2", "poetry-core>=1.0.0"]
build-backend = "maturin"

View File

@@ -0,0 +1,17 @@
use pyo3::prelude::*;
/// Formats the sum of two numbers as string.
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
Ok((a + b).to_string())
}
/// A Python module implemented in Rust. The name of this function must match
/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to
/// import the module.
#[pymodule]
fn neon_dev_utils(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
Ok(())
}

View File

@@ -9,7 +9,7 @@ postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
toml = "0.5"
lazy_static = "1.4"
once_cell = "1.13.0"
regex = "1"
anyhow = "1.0"
thiserror = "1"

View File

@@ -30,14 +30,14 @@ pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
let etcd_stdout_file =
fs::File::create(etcd_data_dir.join("etcd.stdout.log")).with_context(|| {
format!(
"Failed to create ectd stout file in directory {}",
"Failed to create etcd stout file in directory {}",
etcd_data_dir.display()
)
})?;
let etcd_stderr_file =
fs::File::create(etcd_data_dir.join("etcd.stderr.log")).with_context(|| {
format!(
"Failed to create ectd stderr file in directory {}",
"Failed to create etcd stderr file in directory {}",
etcd_data_dir.display()
)
})?;

View File

@@ -51,7 +51,11 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] {
for env_key in [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}

View File

@@ -24,7 +24,7 @@ use crate::safekeeper::SafekeeperNode;
// This data structures represents neon_local CLI config
//
// It is deserialized from the .neon/config file, or the config file passed
// to 'zenith init --config=<path>' option. See control_plane/simple.conf for
// to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
// an example.
//
#[serde_as]
@@ -320,7 +320,7 @@ impl LocalEnv {
if !repopath.exists() {
bail!(
"Zenith config is not found in {}. You need to run 'zenith init' first",
"Zenith config is not found in {}. You need to run 'neon_local init' first",
repopath.to_str().unwrap()
);
}
@@ -337,12 +337,12 @@ impl LocalEnv {
}
pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> {
// Currently, the user first passes a config file with 'zenith init --config=<path>'
// Currently, the user first passes a config file with 'neon_local init --config=<path>'
// We read that in, in `create_config`, and fill any missing defaults. Then it's saved
// to .neon/config. TODO: We lose any formatting and comments along the way, which is
// a bit sad.
let mut conf_content = r#"# This file describes a locale deployment of the page server
# and safekeeeper node. It is read by the 'zenith' command-line
# and safekeeeper node. It is read by the 'neon_local' command-line
# utility.
"#
.to_string();
@@ -382,7 +382,7 @@ impl LocalEnv {
}
//
// Initialize a new Zenith repository
// Initialize a new Neon repository
//
pub fn init(&mut self) -> anyhow::Result<()> {
// check if config already exists

View File

@@ -5,7 +5,7 @@
/// enough to extract a few settings we need in Zenith, assuming you don't do
/// funny stuff like include-directives or funny escaping.
use anyhow::{bail, Context, Result};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::HashMap;
use std::fmt;
@@ -19,9 +19,7 @@ pub struct PostgresConf {
hash: HashMap<String, String>,
}
lazy_static! {
static ref CONF_LINE_RE: Regex = Regex::new(r"^((?:\w|\.)+)\s*=\s*(\S+)$").unwrap();
}
static CONF_LINE_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^((?:\w|\.)+)\s*=\s*(\S+)$").unwrap());
impl PostgresConf {
pub fn new() -> PostgresConf {
@@ -139,10 +137,10 @@ fn escape_str(s: &str) -> String {
//
// This regex is a bit more conservative than the rules in guc-file.l, so we quote some
// strings that PostgreSQL would accept without quoting, but that's OK.
lazy_static! {
static ref UNQUOTED_RE: Regex =
Regex::new(r"(^[-+]?[0-9]+[a-zA-Z]*$)|(^[a-zA-Z][a-zA-Z0-9]*$)").unwrap();
}
static UNQUOTED_RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(^[-+]?[0-9]+[a-zA-Z]*$)|(^[a-zA-Z][a-zA-Z0-9]*$)").unwrap());
if UNQUOTED_RE.is_match(s) {
s.to_string()
} else {

View File

@@ -1,5 +1,4 @@
use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
use std::process::Command;
use std::sync::Arc;
@@ -52,7 +51,7 @@ impl ResponseErrorMessageExt for Response {
Err(SafekeeperHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {url}.", status.as_u16()),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
}
@@ -241,40 +240,28 @@ impl SafekeeperNode {
),
}
let address = connection_address(&self.pg_connection_config);
// Wait until process is gone
for i in 0..600 {
let signal = None; // Send no signal, just get the error code
match kill(pid, signal) {
Ok(_) => (), // Process exists, keep waiting
Err(Errno::ESRCH) => {
// Process not found, we're done
println!("done!");
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to pageserver with pid {}: {}",
pid,
err.desc()
),
};
// TODO Remove this "timeout" and handle it on caller side instead.
// Shutting down may take a long time,
// if safekeeper flushes a lot of data
let mut tcp_stopped = false;
for _ in 0..100 {
if !tcp_stopped {
if let Err(err) = TcpStream::connect(&address) {
tcp_stopped = true;
if err.kind() != io::ErrorKind::ConnectionRefused {
eprintln!("\nSafekeeper connection failed with error: {err}");
}
}
if i % 10 == 0 {
print!(".");
io::stdout().flush().unwrap();
}
if tcp_stopped {
// Also check status on the HTTP port
match self.check_status() {
Err(SafekeeperHttpError::Transport(err)) if err.is_connect() => {
println!("done!");
return Ok(());
}
Err(err) => {
eprintln!("\nSafekeeper status check failed with error: {err}");
return Ok(());
}
Ok(()) => {
// keep waiting
}
}
}
print!(".");
io::stdout().flush().unwrap();
thread::sleep(Duration::from_secs(1));
thread::sleep(Duration::from_millis(100));
}
bail!("Failed to stop safekeeper with pid {}", pid);

View File

@@ -1,9 +1,8 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Write};
use std::net::TcpStream;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::Duration;
use std::{io, result, thread};
@@ -103,23 +102,19 @@ impl PageServerNode {
/// Construct libpq connection string for connecting to the pageserver.
fn pageserver_connection_config(password: &str, listen_addr: &str) -> Config {
format!("postgresql://no_user:{}@{}/no_db", password, listen_addr)
format!("postgresql://no_user:{password}@{listen_addr}/no_db")
.parse()
.unwrap()
}
pub fn init(
pub fn initialize(
&self,
create_tenant: Option<ZTenantId>,
initial_timeline_id: Option<ZTimelineId>,
config_overrides: &[&str],
) -> anyhow::Result<ZTimelineId> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let base_data_dir_param = self.env.base_data_dir.display().to_string();
let pg_distrib_dir_param =
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
@@ -139,67 +134,52 @@ impl PageServerNode {
.collect::<Vec<_>>()
.join(",")
);
let mut args = Vec::with_capacity(20);
args.push("--init");
args.extend(["-D", &base_data_dir_param]);
args.extend(["-c", &pg_distrib_dir_param]);
args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]);
args.extend(["-c", &broker_endpoints_param]);
args.extend(["-c", &id]);
let broker_etcd_prefix_param = self
.env
.etcd_broker
.broker_etcd_prefix
.as_ref()
.map(|prefix| format!("broker_etcd_prefix='{prefix}'"));
if let Some(broker_etcd_prefix_param) = broker_etcd_prefix_param.as_deref() {
args.extend(["-c", broker_etcd_prefix_param]);
}
for config_override in config_overrides {
args.extend(["-c", config_override]);
let mut init_config_overrides = config_overrides.to_vec();
init_config_overrides.push(&id);
init_config_overrides.push(&pg_distrib_dir_param);
init_config_overrides.push(&authg_type_param);
init_config_overrides.push(&listen_http_addr_param);
init_config_overrides.push(&listen_pg_addr_param);
init_config_overrides.push(&broker_endpoints_param);
if let Some(broker_etcd_prefix_param) = broker_etcd_prefix_param.as_deref() {
init_config_overrides.push(broker_etcd_prefix_param);
}
if self.env.pageserver.auth_type != AuthType::Trust {
args.extend([
"-c",
"auth_validation_public_key_path='auth_public_key.pem'",
]);
init_config_overrides.push("auth_validation_public_key_path='auth_public_key.pem'");
}
let create_tenant = create_tenant.map(|id| id.to_string());
if let Some(tenant_id) = create_tenant.as_deref() {
args.extend(["--create-tenant", tenant_id])
self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?;
let init_result = self
.try_init_timeline(create_tenant, initial_timeline_id)
.context("Failed to create initial tenant and timeline for pageserver");
match &init_result {
Ok(initial_timeline_id) => {
println!("Successfully initialized timeline {initial_timeline_id}")
}
Err(e) => eprintln!("{e:#}"),
}
self.stop(false)?;
init_result
}
let initial_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
let initial_timeline_id_string = initial_timeline_id.to_string();
args.extend(["--initial-timeline-id", &initial_timeline_id_string]);
let cmd_with_args = cmd.args(args);
let init_output = fill_rust_env_vars(cmd_with_args)
.output()
.with_context(|| {
format!("failed to init pageserver with command {:?}", cmd_with_args)
})?;
if !init_output.status.success() {
bail!(
"init invocation failed, {}\nStdout: {}\nStderr: {}",
init_output.status,
String::from_utf8_lossy(&init_output.stdout),
String::from_utf8_lossy(&init_output.stderr)
);
}
// echo the captured output of the init command
println!("{}", String::from_utf8_lossy(&init_output.stdout));
Ok(initial_timeline_id)
fn try_init_timeline(
&self,
new_tenant_id: Option<ZTenantId>,
new_timeline_id: Option<ZTimelineId>,
) -> anyhow::Result<ZTimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info =
self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?;
Ok(initial_timeline_info.timeline_id)
}
pub fn repo_path(&self) -> PathBuf {
@@ -211,15 +191,35 @@ impl PageServerNode {
}
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
print!(
self.start_node(config_overrides, &self.repo_path(), false)
}
fn start_node(
&self,
config_overrides: &[&str],
datadir: &Path,
update_config: bool,
) -> anyhow::Result<()> {
println!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
self.repo_path().display()
datadir.display()
);
io::stdout().flush().unwrap();
io::stdout().flush()?;
let repo_path = self.repo_path();
let mut args = vec!["-D", repo_path.to_str().unwrap()];
let mut args = vec![
"-D",
datadir.to_str().with_context(|| {
format!(
"Datadir path '{}' cannot be represented as a unicode string",
datadir.display()
)
})?,
];
if update_config {
args.push("--update-config");
}
for config_override in config_overrides {
args.extend(["-c", config_override]);
@@ -231,8 +231,8 @@ impl PageServerNode {
if !filled_cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",
self.repo_path().join("pageserver.log").display()
"Pageserver failed to start. See console output and '{}' for details.",
datadir.join("pageserver.log").display()
);
}
@@ -241,7 +241,7 @@ impl PageServerNode {
const RETRIES: i8 = 15;
for retries in 1..RETRIES {
match self.check_status() {
Ok(_) => {
Ok(()) => {
println!("\nPageserver started");
return Ok(());
}
@@ -255,21 +255,18 @@ impl PageServerNode {
if retries == 5 {
println!() // put a line break after dots for second message
}
println!(
"Pageserver not responding yet, err {} retrying ({})...",
err, retries
);
println!("Pageserver not responding yet, err {err} retrying ({retries})...");
}
}
PageserverHttpError::Response(msg) => {
bail!("pageserver failed to start: {} ", msg)
bail!("pageserver failed to start: {msg} ")
}
}
thread::sleep(Duration::from_secs(1));
}
}
}
bail!("pageserver failed to start in {} seconds", RETRIES);
bail!("pageserver failed to start in {RETRIES} seconds");
}
///
@@ -299,63 +296,46 @@ impl PageServerNode {
match kill(pid, sig) {
Ok(_) => (),
Err(Errno::ESRCH) => {
println!(
"Pageserver with pid {} does not exist, but a PID file was found",
pid
);
println!("Pageserver with pid {pid} does not exist, but a PID file was found");
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to pageserver with pid {}: {}",
pid,
"Failed to send signal to pageserver with pid {pid}: {}",
err.desc()
),
}
let address = connection_address(&self.pg_connection_config);
// TODO Remove this "timeout" and handle it on caller side instead.
// Shutting down may take a long time,
// if pageserver checkpoints a lot of data
let mut tcp_stopped = false;
for _ in 0..100 {
if !tcp_stopped {
if let Err(err) = TcpStream::connect(&address) {
tcp_stopped = true;
if err.kind() != io::ErrorKind::ConnectionRefused {
eprintln!("\nPageserver connection failed with error: {err}");
}
// Wait until process is gone
for i in 0..600 {
let signal = None; // Send no signal, just get the error code
match kill(pid, signal) {
Ok(_) => (), // Process exists, keep waiting
Err(Errno::ESRCH) => {
// Process not found, we're done
println!("done!");
return Ok(());
}
}
if tcp_stopped {
// Also check status on the HTTP port
Err(err) => bail!(
"Failed to send signal to pageserver with pid {}: {}",
pid,
err.desc()
),
};
match self.check_status() {
Err(PageserverHttpError::Transport(err)) if err.is_connect() => {
println!("done!");
return Ok(());
}
Err(err) => {
eprintln!("\nPageserver status check failed with error: {err}");
return Ok(());
}
Ok(()) => {
// keep waiting
}
}
if i % 10 == 0 {
print!(".");
io::stdout().flush().unwrap();
}
print!(".");
io::stdout().flush().unwrap();
thread::sleep(Duration::from_secs(1));
thread::sleep(Duration::from_millis(100));
}
bail!("Failed to stop pageserver with pid {}", pid);
bail!("Failed to stop pageserver with pid {pid}");
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
println!("Pageserver query: '{}'", sql);
println!("Pageserver query: '{sql}'");
client.simple_query(sql).unwrap()
}
@@ -390,15 +370,15 @@ impl PageServerNode {
&self,
new_tenant_id: Option<ZTenantId>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<Option<ZTenantId>> {
let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
) -> anyhow::Result<ZTenantId> {
self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest {
new_tenant_id,
checkpoint_distance: settings
.get("checkpoint_distance")
.map(|x| x.parse::<u64>())
.transpose()?,
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.get("compaction_target_size")
.map(|x| x.parse::<u64>())
@@ -430,18 +410,16 @@ impl PageServerNode {
})
.send()?
.error_from_body()?
.json::<Option<String>>()?;
tenant_id_string
.map(|id| {
id.parse().with_context(|| {
format!(
"Failed to parse tennat creation response as tenant id: {}",
id
)
.json::<Option<String>>()
.with_context(|| {
format!("Failed to parse tenant creation response for tenant id: {new_tenant_id:?}")
})?
.context("No tenant id was found in the tenant creation response")
.and_then(|tenant_id_string| {
tenant_id_string.parse().with_context(|| {
format!("Failed to parse response string as tenant id: '{tenant_id_string}'")
})
})
.transpose()
}
pub fn tenant_config(&self, tenant_id: ZTenantId, settings: HashMap<&str, &str>) -> Result<()> {
@@ -453,6 +431,7 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.get("compaction_target_size")
.map(|x| x.parse::<u64>())
@@ -511,22 +490,27 @@ impl PageServerNode {
new_timeline_id: Option<ZTimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<ZTimelineId>,
) -> anyhow::Result<Option<TimelineInfo>> {
let timeline_info_response = self
.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
) -> anyhow::Result<TimelineInfo> {
self.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)
.json(&TimelineCreateRequest {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
})
.send()?
.error_from_body()?
.json::<Option<TimelineInfo>>()
.with_context(|| {
format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
})?
.with_context(|| {
format!(
"No timeline id was found in the timeline creation response for tenant {tenant_id}"
)
.json(&TimelineCreateRequest {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
})
.send()?
.error_from_body()?
.json::<Option<TimelineInfo>>()?;
Ok(timeline_info_response)
})
}
/// Import a basebackup prepared using either:

View File

@@ -1,20 +0,0 @@
#!/bin/sh
set -eux
broker_endpoints_param="${BROKER_ENDPOINT:-absent}"
if [ "$broker_endpoints_param" != "absent" ]; then
broker_endpoints_param="-c broker_endpoints=['$broker_endpoints_param']"
else
broker_endpoints_param=''
fi
if [ "$1" = 'pageserver' ]; then
if [ ! -d "/data/tenants" ]; then
echo "Initializing pageserver data directory"
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10" $broker_endpoints_param
fi
echo "Staring pageserver at 0.0.0.0:6400"
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" $broker_endpoints_param -D /data
else
"$@"
fi

View File

@@ -52,10 +52,8 @@
- [multitenancy.md](./multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
- [settings.md](./settings.md)
#FIXME: move these under sourcetree.md
#- [pageserver/README.md](/pageserver/README.md)
#- [postgres_ffi/README.md](/libs/postgres_ffi/README.md)
#- [test_runner/README.md](/test_runner/README.md)
#- [safekeeper/README.md](/safekeeper/README.md)
# RFCs

View File

@@ -75,7 +75,7 @@ layer's Segment and range of LSNs.
There are two kinds of layers, in-memory and on-disk layers. In-memory
layers are used to ingest incoming WAL, and provide fast access
to the recent page versions. On-disk layers are stored as files on disk, and
are immutable. See pageserver/src/layered_repository/README.md for more.
are immutable. See [pageserver-storage.md](./pageserver-storage.md) for more.
### Layer file (on-disk layer)
@@ -111,7 +111,7 @@ PostgreSQL LSNs and functions to monitor them:
* `pg_last_wal_replay_lsn ()` - Returns the last write-ahead log location that has been replayed during recovery. If recovery is still in progress this will increase monotonically.
[source PostgreSQL documentation](https://www.postgresql.org/docs/devel/functions-admin.html):
Neon safekeeper LSNs. For more check [safekeeper/README_PROTO.md](/safekeeper/README_PROTO.md)
Neon safekeeper LSNs. See [safekeeper protocol section](safekeeper-protocol.md) for more information.
* `CommitLSN`: position in WAL confirmed by quorum safekeepers.
* `RestartLSN`: position in WAL confirmed by all safekeepers.
* `FlushLSN`: part of WAL persisted to the disk by safekeeper.

View File

@@ -68,8 +68,6 @@ There are the following implementations present:
* local filesystem — to use in tests mainly
* AWS S3 - to use in production
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
The backup service is disabled by default and can be enabled to interact with a single remote storage.
CLI examples:
@@ -118,7 +116,7 @@ implemented by the LayeredRepository object in
`layered_repository.rs`. There is only that one implementation of the
Repository trait, but it's still a useful abstraction that keeps the
interface for the low-level storage functionality clean. The layered
storage format is described in layered_repository/README.md.
storage format is described in [pageserver-storage.md](./pageserver-storage.md).
Each repository consists of multiple Timelines. Timeline is a
workhorse that accepts page changes from the WAL, and serves

View File

@@ -15,7 +15,7 @@ listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
checkpoint_distance = '268435456' # in bytes
checkpoint_period = '1 s'
checkpoint_timeout = '10m'
gc_period = '100 s'
gc_horizon = '67108864'
@@ -46,7 +46,7 @@ Note the `[remote_storage]` section: it's a [table](https://toml.io/en/v1.0.0#ta
All values can be passed as an argument to the pageserver binary, using the `-c` parameter and specified as a valid TOML string. All tables should be passed in the inline form.
Example: `${PAGESERVER_BIN} -c "checkpoint_period = '100 s'" -c "remote_storage={local_path='/some/local/path/'}"`
Example: `${PAGESERVER_BIN} -c "checkpoint_timeout = '10 m'" -c "remote_storage={local_path='/some/local/path/'}"`
Note that TOML distinguishes between strings and integers, the former require single or double quotes around them.
@@ -82,6 +82,14 @@ S3.
The unit is # of bytes.
#### checkpoint_timeout
Apart from `checkpoint_distance`, open layer flushing is also triggered
`checkpoint_timeout` after the last flush. This makes WAL eventually uploaded to
s3 when activity is stopped.
The default is 10m.
#### compaction_period
Every `compaction_period` seconds, the page server checks if

View File

@@ -28,7 +28,7 @@ The pageserver has a few different duties:
- Receive WAL from the WAL service and decode it.
- Replay WAL that's applicable to the chunks that the Page Server maintains
For more detailed info, see [/pageserver/README](/pageserver/README.md)
For more detailed info, see [pageserver-services.md](./pageserver-services.md)
`/proxy`:
@@ -57,7 +57,7 @@ PostgreSQL extension that contains functions needed for testing and debugging.
The zenith WAL service that receives WAL from a primary compute nodes and streams it to the pageserver.
It acts as a holding area and redistribution center for recently generated WAL.
For more detailed info, see [/safekeeper/README](/safekeeper/README.md)
For more detailed info, see [walservice.md](./walservice.md)
`/workspace_hack`:
The workspace_hack crate exists only to pin down some dependencies.

View File

@@ -75,8 +75,8 @@ safekeepers. The Paxos and crash recovery algorithm ensures that only
one primary node can be actively streaming WAL to the quorum of
safekeepers.
See README_PROTO.md for a more detailed description of the consensus
protocol. spec/ contains TLA+ specification of it.
See [this section](safekeeper-protocol.md) for a more detailed description of
the consensus protocol. spec/ contains TLA+ specification of it.
# Q&A

View File

@@ -0,0 +1,13 @@
[package]
name = "integration_tests"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
utils = { path = "../libs/utils" }
pg_bin = { path = "../libs/pg_bin" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
anyhow = "1.0.62"

View File

@@ -0,0 +1,36 @@
#[cfg(test)]
mod tests {
use pg_bin::PgDatadir;
use std::path::PathBuf;
use tokio_postgres::NoTls;
#[tokio::test]
async fn test_postgres_select_1() -> anyhow::Result<()> {
// Test setup
let output = PathBuf::from("/home/bojan/tmp/");
let pg_prefix = PathBuf::from("/home/bojan/src/neondatabase/neon/tmp_install/bin/");
// Init datadir
let pg_datadir_path = PathBuf::from("/home/bojan/tmp/t1/");
let pg_datadir = PgDatadir::new_initdb(pg_datadir_path, &pg_prefix, &output, true);
// Get a postgres
let postgres = pg_datadir.spawn_postgres(pg_prefix, output);
let conn_info = postgres.admin_conn_info();
// Get client, run connection
let (client, connection) = conn_info.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
// Run "select 1"
let rows = client.query("SELECT 'hello';", &[]).await?;
let value: &str = rows[0].get(0);
assert_eq!(value, "hello");
Ok(())
}
}

View File

@@ -0,0 +1 @@
mod basic;

View File

@@ -9,7 +9,7 @@
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "1.12.0"
once_cell = "1.8.0"
once_cell = "1.13.0"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -6,5 +6,5 @@ edition = "2021"
[dependencies]
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
libc = "0.2"
lazy_static = "1.4"
once_cell = "1.13.0"
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -2,7 +2,7 @@
//! make sure that we use the same dep version everywhere.
//! Otherwise, we might not see all metrics registered via
//! a default registry.
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
@@ -41,19 +41,22 @@ pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
prometheus::gather()
}
lazy_static! {
static ref DISK_IO_BYTES: IntGaugeVec = register_int_gauge_vec!(
static DISK_IO_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"libmetrics_disk_io_bytes_total",
"Bytes written and read from disk, grouped by the operation (read|write)",
&["io_operation"]
)
.expect("Failed to register disk i/o bytes int gauge vec");
static ref MAXRSS_KB: IntGauge = register_int_gauge!(
.expect("Failed to register disk i/o bytes int gauge vec")
});
static MAXRSS_KB: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"libmetrics_maxrss_kb",
"Memory usage (Maximum Resident Set Size)"
)
.expect("Failed to register maxrss_kb int gauge");
}
.expect("Failed to register maxrss_kb int gauge")
});
pub const DISK_WRITE_SECONDS_BUCKETS: &[f64] = &[
0.000_050, 0.000_100, 0.000_500, 0.001, 0.003, 0.005, 0.01, 0.05, 0.1, 0.3, 0.5,

View File

@@ -10,13 +10,13 @@ use std::io::{Read, Result, Write};
/// # use std::io::{Result, Read};
/// # use metrics::{register_int_counter, IntCounter};
/// # use metrics::CountedReader;
/// # use once_cell::sync::Lazy;
/// #
/// # lazy_static::lazy_static! {
/// # static ref INT_COUNTER: IntCounter = register_int_counter!(
/// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
/// # "int_counter",
/// # "let's count something!"
/// # ).unwrap();
/// # }
/// # ).unwrap()
/// # });
/// #
/// fn do_some_reads(stream: impl Read, count: usize) -> Result<Vec<u8>> {
/// let mut reader = CountedReader::new(stream, |cnt| {
@@ -85,13 +85,13 @@ impl<T: Read> Read for CountedReader<'_, T> {
/// # use std::io::{Result, Write};
/// # use metrics::{register_int_counter, IntCounter};
/// # use metrics::CountedWriter;
/// # use once_cell::sync::Lazy;
/// #
/// # lazy_static::lazy_static! {
/// # static ref INT_COUNTER: IntCounter = register_int_counter!(
/// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
/// # "int_counter",
/// # "let's count something!"
/// # ).unwrap();
/// # }
/// # ).unwrap()
/// # });
/// #
/// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> {
/// let mut writer = CountedWriter::new(stream, |cnt| {

10
libs/pg_bin/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "pg_bin"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
utils = { path = "../utils" }

106
libs/pg_bin/src/lib.rs Normal file
View File

@@ -0,0 +1,106 @@
//! Utils for runnig postgres binaries as subprocesses.
use std::{fs::{File, remove_dir_all}, path::PathBuf, process::{Child, Command}, time::Duration};
use std::io::Write;
use utils::command_extensions::NeonCommandExtensions;
pub struct PgDatadir {
path: PathBuf
}
impl PgDatadir {
pub fn new_initdb(
path: PathBuf,
pg_prefix: &PathBuf,
command_output_dir: &PathBuf,
remove_if_exists: bool
) -> Self {
if remove_if_exists {
remove_dir_all(path.clone()).ok();
}
let status = Command::new(pg_prefix.join("initdb"))
.arg("-D")
.arg(path.clone())
.capture_to_files(command_output_dir.clone(), "initdb")
.status()
.expect("failed to get status");
assert!(status.success());
Self {
path
}
}
pub fn load_existing(path: PathBuf) -> Self{
Self {
path
}
}
pub fn path(&self) -> PathBuf {
self.path.clone()
}
pub fn spawn_postgres(self, pg_prefix: PathBuf, command_output_dir: PathBuf) -> LocalPostgres {
let port = 54729;
// Write conf
// TODO don't override existing conf
// - instead infer port from conf
let mut conf = File::create(self.path().join("postgresql.conf")).expect("failed to create file");
writeln!(&mut conf, "port = {}", port).expect("failed to write conf");
let process = Command::new(pg_prefix.join("postgres"))
.env("PGDATA", self.path())
.capture_to_files(command_output_dir, "pg")
.spawn()
.expect("postgres failed to spawn");
// Wait until ready. TODO improve this
std::thread::sleep(Duration::from_millis(300));
LocalPostgres {
datadir: self,
port: 54729,
process,
}
}
}
pub struct LocalPostgres {
datadir: PgDatadir,
port: u16,
process: Child,
}
impl LocalPostgres {
pub fn admin_conn_info(&self) -> tokio_postgres::Config {
// I don't like this, but idk what else to do
let whoami = Command::new("whoami").output().unwrap().stdout;
let user = String::from_utf8_lossy(&whoami);
let user = user.trim();
let mut config = tokio_postgres::Config::new();
config
.host("127.0.0.1")
.port(self.port)
.dbname("postgres")
.user(&user);
config
}
pub fn stop(mut self) -> PgDatadir {
self.process.kill().expect("failed to kill child");
PgDatadir {
path: self.datadir.path.clone()
}
}
}
impl Drop for LocalPostgres {
fn drop(&mut self) {
self.process.kill().expect("failed to kill child");
}
}

View File

@@ -12,7 +12,7 @@ byteorder = "1.4.3"
anyhow = "1.0"
crc32c = "0.6.0"
hex = "0.4.3"
lazy_static = "1.4"
once_cell = "1.13.0"
log = "0.4.14"
memoffset = "0.6.2"
thiserror = "1.0"

View File

@@ -44,7 +44,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
fn main() {
// Tell cargo to invalidate the built crate whenever the wrapper changes
println!("cargo:rerun-if-changed=pg_control_ffi.h");
println!("cargo:rerun-if-changed=bindgen_deps.h");
// Finding the location of C headers for the Postgres server:
// - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `<project_root>/tmp_install`
@@ -88,9 +88,9 @@ fn main() {
// the resulting bindings.
let bindings = bindgen::Builder::default()
//
// All the needed PostgreSQL headers are included from 'pg_control_ffi.h'
// All the needed PostgreSQL headers are included from 'bindgen_deps.h'
//
.header("pg_control_ffi.h")
.header("bindgen_deps.h")
//
// Tell cargo to invalidate the built crate whenever any of the
// included header files changed.

View File

@@ -23,7 +23,7 @@
//! information. You can use PostgreSQL's pg_controldata utility to view its
//! contents.
//!
use crate::{ControlFileData, PG_CONTROL_FILE_SIZE};
use super::bindings::{ControlFileData, PG_CONTROL_FILE_SIZE};
use anyhow::{bail, Result};
use bytes::{Bytes, BytesMut};

View File

@@ -7,21 +7,62 @@
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
use serde::{Deserialize, Serialize};
use utils::lsn::Lsn;
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
macro_rules! postgres_ffi {
($version:ident) => {
#[path = "."]
pub mod $version {
// fixme: does this have to be 'pub'?
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod waldecoder;
pub mod xlog_utils;
use serde::{Deserialize, Serialize};
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
}
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod waldecoder;
pub mod xlog_utils;
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
}
};
}
postgres_ffi!(v14);
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
pub use v14::bindings::{MultiXactId, TransactionId};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
pub const XLOG_BLCKSZ: usize = 8192;
// PG timeline is always 1, changing it doesn't have any useful meaning in Neon.
//
// NOTE: this is not to be confused with Neon timelines; different concept!
//
// It's a shaky assumption, that it's always 1. We might import a
// PostgreSQL data directory that has gone through timeline bumps,
// for example. FIXME later.
pub const PG_TLI: u32 = 1;
// See TransactionIdIsNormal in transam.h
pub const fn transaction_id_is_normal(id: TransactionId) -> bool {
id > pg_constants::FIRST_NORMAL_TRANSACTION_ID
id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID
}
// See TransactionIdPrecedes in transam.c

View File

@@ -1,11 +1,12 @@
//!
//! Common utilities for dealing with PostgreSQL non-relation files.
//!
use crate::{pg_constants, transaction_id_precedes};
use crate::transaction_id_precedes;
use super::pg_constants;
use bytes::BytesMut;
use log::*;
use crate::MultiXactId;
use super::bindings::MultiXactId;
pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) {
trace!(

View File

@@ -7,7 +7,8 @@
//! comments on them.
//!
use crate::PageHeaderData;
use super::bindings::PageHeaderData;
use crate::BLCKSZ;
//
// From pg_tablespace_d.h
@@ -31,11 +32,6 @@ pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
pub const SMGR_TRUNCATE_VM: u32 = 0x0002;
pub const SMGR_TRUNCATE_FSM: u32 = 0x0004;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// From bufpage.h
//
@@ -213,7 +209,6 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* FIXME: pageserver should request wal_seg_size from compute node */
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const XLOG_BLCKSZ: usize = 8192;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_LONG_HEADER: u16 = 0x0002;

View File

@@ -1,11 +1,11 @@
//!
//! Common utilities for dealing with PostgreSQL relation files.
//!
use crate::pg_constants;
use lazy_static::lazy_static;
use super::pg_constants;
use once_cell::sync::OnceCell;
use regex::Regex;
#[derive(Debug, Clone, thiserror::Error, PartialEq)]
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
InvalidForkName,
@@ -54,11 +54,14 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
/// See functions relpath() and _mdfd_segpath() in PostgreSQL sources.
///
pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
lazy_static! {
static ref RELFILE_RE: Regex =
Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
}
static RELFILE_RE: OnceCell<Regex> = OnceCell::new();
RELFILE_RE.get_or_init(|| {
Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap()
});
let caps = RELFILE_RE
.get()
.unwrap()
.captures(fname)
.ok_or(FilePathError::InvalidFileName)?;

View File

@@ -10,27 +10,30 @@
//!
use super::pg_constants;
use super::xlog_utils::*;
use super::XLogLongPageHeaderData;
use super::XLogPageHeaderData;
use super::XLogRecord;
use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crc32c::*;
use log::*;
use std::cmp::min;
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;
enum State {
WaitingForRecord,
ReassemblingRecord {
recordbuf: BytesMut,
contlen: NonZeroU32,
},
SkippingEverything {
skip_until_lsn: Lsn,
},
}
pub struct WalStreamDecoder {
lsn: Lsn,
startlsn: Lsn, // LSN where this record starts
contlen: u32,
padlen: u32,
inputbuf: BytesMut,
/// buffer used to reassemble records that cross page boundaries.
recordbuf: BytesMut,
state: State,
}
#[derive(Error, Debug, Clone)]
@@ -48,13 +51,8 @@ impl WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
startlsn: Lsn(0),
contlen: 0,
padlen: 0,
inputbuf: BytesMut::new(),
recordbuf: BytesMut::new(),
state: State::WaitingForRecord,
}
}
@@ -67,6 +65,58 @@ impl WalStreamDecoder {
self.inputbuf.extend_from_slice(buf);
}
fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
let validate_impl = || {
if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
return Err(format!(
"invalid xlog page header: xlp_magic={}, expected {}",
hdr.xlp_magic, XLOG_PAGE_MAGIC
));
}
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(format!(
"invalid xlog page header: xlp_pageaddr={}, expected {}",
hdr.xlp_pageaddr, self.lsn
));
}
match self.state {
State::WaitingForRecord => {
if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD != 0 {
return Err(
"invalid xlog page header: unexpected XLP_FIRST_IS_CONTRECORD".into(),
);
}
if hdr.xlp_rem_len != 0 {
return Err(format!(
"invalid xlog page header: xlp_rem_len={}, but it's not a contrecord",
hdr.xlp_rem_len
));
}
}
State::ReassemblingRecord { contlen, .. } => {
if hdr.xlp_info & XLP_FIRST_IS_CONTRECORD == 0 {
return Err(
"invalid xlog page header: XLP_FIRST_IS_CONTRECORD expected, not found"
.into(),
);
}
if hdr.xlp_rem_len != contlen.get() {
return Err(format!(
"invalid xlog page header: xlp_rem_len={}, expected {}",
hdr.xlp_rem_len,
contlen.get()
));
}
}
State::SkippingEverything { .. } => {
panic!("Should not be validating page header in the SkippingEverything state");
}
};
Ok(())
};
validate_impl().map_err(|msg| WalDecodeError { msg, lsn: self.lsn })
}
/// Attempt to decode another WAL record from the input that has been fed to the
/// decoder so far.
///
@@ -76,128 +126,121 @@ impl WalStreamDecoder {
/// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
///
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
let recordbuf;
// Run state machine that validates page headers, and reassembles records
// that cross page boundaries.
loop {
// parse and verify page boundaries as we go
if self.padlen > 0 {
// We should first skip padding, as we may have to skip some page headers if we're processing the XLOG_SWITCH record.
if self.inputbuf.remaining() < self.padlen as usize {
return Ok(None);
}
// However, we may have to skip some page headers if we're processing the XLOG_SWITCH record or skipping padding for whatever reason.
match self.state {
State::WaitingForRecord | State::ReassemblingRecord { .. } => {
if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
// parse long header
// skip padding
self.inputbuf.advance(self.padlen as usize);
self.lsn += self.padlen as u64;
self.padlen = 0;
} else if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
// parse long header
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
return Ok(None);
}
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
return Ok(None);
}
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
|e| WalDecodeError {
msg: format!("long header deserialization failed {}", e),
lsn: self.lsn,
},
)?;
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("long header deserialization failed {}", e),
lsn: self.lsn,
self.validate_page_header(&hdr.std)?;
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
return Ok(None);
}
let hdr =
XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("header deserialization failed {}", e),
lsn: self.lsn,
}
})?;
self.validate_page_header(&hdr)?;
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
}
})?;
if hdr.std.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog segment header".into(),
lsn: self.lsn,
});
}
// TODO: verify the remaining fields in the header
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
continue;
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
return Ok(None);
}
let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("header deserialization failed {}", e),
lsn: self.lsn,
State::SkippingEverything { .. } => {}
}
match &mut self.state {
State::WaitingForRecord => {
// need to have at least the xl_tot_len field
if self.inputbuf.remaining() < 4 {
return Ok(None);
}
})?;
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog page header".into(),
lsn: self.lsn,
});
// peek xl_tot_len at the beginning of the record.
// FIXME: assumes little-endian
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {}", xl_tot_len),
lsn: self.lsn,
});
}
// Fast path for the common case that the whole record fits on the page.
let pageleft = self.lsn.remaining_in_block() as u32;
if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
self.lsn += xl_tot_len as u64;
let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
return Ok(Some(self.complete_record(recordbuf)?));
} else {
// Need to assemble the record from pieces. Remember the size of the
// record, and loop back. On next iteration, we will reach the 'else'
// branch below, and copy the part of the record that was on this page
// to 'recordbuf'. Subsequent iterations will skip page headers, and
// append the continuations from the next pages to 'recordbuf'.
self.state = State::ReassemblingRecord {
recordbuf: BytesMut::with_capacity(xl_tot_len as usize),
contlen: NonZeroU32::new(xl_tot_len).unwrap(),
}
}
}
// TODO: verify the remaining fields in the header
State::ReassemblingRecord { recordbuf, contlen } => {
// we're continuing a record, possibly from previous page.
let pageleft = self.lsn.remaining_in_block() as u32;
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
continue;
} else if self.contlen == 0 {
assert!(self.recordbuf.is_empty());
// read the rest of the record, or as much as fits on this page.
let n = min(contlen.get(), pageleft) as usize;
// need to have at least the xl_tot_len field
if self.inputbuf.remaining() < 4 {
return Ok(None);
if self.inputbuf.remaining() < n {
return Ok(None);
}
recordbuf.put(self.inputbuf.split_to(n));
self.lsn += n as u64;
*contlen = match NonZeroU32::new(contlen.get() - n as u32) {
Some(x) => x,
None => {
// The record is now complete.
let recordbuf = std::mem::replace(recordbuf, BytesMut::new()).freeze();
return Ok(Some(self.complete_record(recordbuf)?));
}
}
}
// peek xl_tot_len at the beginning of the record.
// FIXME: assumes little-endian
self.startlsn = self.lsn;
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {}", xl_tot_len),
lsn: self.lsn,
});
State::SkippingEverything { skip_until_lsn } => {
assert!(*skip_until_lsn >= self.lsn);
let n = skip_until_lsn.0 - self.lsn.0;
if self.inputbuf.remaining() < n as usize {
return Ok(None);
}
self.inputbuf.advance(n as usize);
self.lsn += n;
self.state = State::WaitingForRecord;
}
// Fast path for the common case that the whole record fits on the page.
let pageleft = self.lsn.remaining_in_block() as u32;
if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
// Take the record from the 'inputbuf', and validate it.
recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
self.lsn += xl_tot_len as u64;
break;
} else {
// Need to assemble the record from pieces. Remember the size of the
// record, and loop back. On next iteration, we will reach the 'else'
// branch below, and copy the part of the record that was on this page
// to 'recordbuf'. Subsequent iterations will skip page headers, and
// append the continuations from the next pages to 'recordbuf'.
self.recordbuf.reserve(xl_tot_len as usize);
self.contlen = xl_tot_len;
continue;
}
} else {
// we're continuing a record, possibly from previous page.
let pageleft = self.lsn.remaining_in_block() as u32;
// read the rest of the record, or as much as fits on this page.
let n = min(self.contlen, pageleft) as usize;
if self.inputbuf.remaining() < n {
return Ok(None);
}
self.recordbuf.put(self.inputbuf.split_to(n));
self.lsn += n as u64;
self.contlen -= n as u32;
if self.contlen == 0 {
// The record is now complete.
recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()).freeze();
break;
}
continue;
}
}
}
fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
// We now have a record in the 'recordbuf' local variable.
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
@@ -219,18 +262,20 @@ impl WalStreamDecoder {
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
if xlogrec.is_xlog_switch_record() {
let next_lsn = if xlogrec.is_xlog_switch_record() {
trace!("saw xlog switch record at {}", self.lsn);
self.padlen = self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32;
self.lsn + self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64)
} else {
// Pad to an 8-byte boundary
self.padlen = self.lsn.calc_padding(8u32) as u32;
}
self.lsn.align()
};
self.state = State::SkippingEverything {
skip_until_lsn: next_lsn,
};
// We should return LSN of the next record, not the last byte of this record or
// the byte immediately after. Note that this handles both XLOG_SWITCH and usual
// records, the former "spans" until the next WAL segment (see test_xlog_switch).
let result = (self.lsn + self.padlen as u64, recordbuf);
Ok(Some(result))
Ok((next_lsn, recordbuf))
}
}

View File

@@ -7,31 +7,33 @@
// have been named the same as the corresponding PostgreSQL functions instead.
//
use crate::pg_constants;
use crate::CheckPoint;
use crate::FullTransactionId;
use crate::XLogLongPageHeaderData;
use crate::XLogPageHeaderData;
use crate::XLogRecord;
use crate::XLOG_PAGE_MAGIC;
use crc32c::crc32c_append;
use super::bindings::{
CheckPoint, FullTransactionId, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord,
XLOG_PAGE_MAGIC,
};
use super::pg_constants;
use super::pg_constants::WAL_SEGMENT_SIZE;
use crate::v14::waldecoder::WalStreamDecoder;
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use crate::pg_constants::WAL_SEGMENT_SIZE;
use anyhow::{bail, ensure};
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use bytes::{Buf, Bytes};
use crc32c::*;
use log::*;
use std::cmp::max;
use std::cmp::min;
use std::fs::{self, File};
use serde::Serialize;
use std::fs::File;
use std::io::prelude::*;
use std::io::ErrorKind;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use utils::bin_ser::DeserializeError;
use utils::bin_ser::SerializeError;
use utils::const_assert;
use utils::lsn::Lsn;
pub const XLOG_FNAME_LEN: usize = 24;
@@ -47,9 +49,6 @@ pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
#[allow(clippy::identity_op)]
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
// PG timeline is always 1, changing it doesn't have useful meaning in Zenith.
pub const PG_TLI: u32 = 1;
pub type XLogRecPtr = u64;
pub type TimeLineID = u32;
pub type TimestampTz = i64;
@@ -80,12 +79,12 @@ pub fn XLogSegNoOffsetToRecPtr(
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
return format!(
format!(
"{:>08X}{:>08X}{:>08X}",
tli,
logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
);
)
}
#[allow(non_snake_case)]
@@ -140,335 +139,93 @@ pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
}
}
/// Return offset of the last valid record in the segment segno, starting
/// looking at start_offset. Returns start_offset if no records found.
fn find_end_of_wal_segment(
data_dir: &Path,
segno: XLogSegNo,
tli: TimeLineID,
wal_seg_size: usize,
start_offset: usize, // start reading at this point
) -> anyhow::Result<u32> {
// step back to the beginning of the page to read it in...
let mut offs: usize = start_offset - start_offset % XLOG_BLCKSZ;
let mut skipping_first_contrecord: bool = false;
let mut contlen: usize = 0;
let mut xl_crc: u32 = 0;
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut buf = [0u8; XLOG_BLCKSZ];
let file_name = XLogFileName(tli, segno, wal_seg_size);
let mut last_valid_rec_pos: usize = start_offset; // assume at given start_offset begins new record
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
file.seek(SeekFrom::Start(offs as u64))?;
// xl_crc is the last field in XLogRecord, will not be read into rec_hdr
const_assert!(XLOG_RECORD_CRC_OFFS + 4 == XLOG_SIZE_OF_XLOG_RECORD);
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
trace!("find_end_of_wal_segment(data_dir={}, segno={}, tli={}, wal_seg_size={}, start_offset=0x{:x})", data_dir.display(), segno, tli, wal_seg_size, start_offset);
while offs < wal_seg_size {
// we are at the beginning of the page; read it in
if offs % XLOG_BLCKSZ == 0 {
trace!("offs=0x{:x}: new page", offs);
let bytes_read = file.read(&mut buf)?;
if bytes_read != buf.len() {
bail!(
"failed to read {} bytes from {} at {}",
XLOG_BLCKSZ,
file_name,
offs
);
}
let xlp_magic = LittleEndian::read_u16(&buf[0..2]);
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
trace!(
" xlp_magic=0x{:x}, xlp_info=0x{:x}, xlp_rem_len={}",
xlp_magic,
xlp_info,
xlp_rem_len
);
// this is expected in current usage when valid WAL starts after page header
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
trace!(
" invalid WAL file {}.partial magic {} at {:?}",
file_name,
xlp_magic,
Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)),
);
}
if offs == 0 {
offs += XLOG_SIZE_OF_XLOG_LONG_PHD;
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
trace!(" first record is contrecord");
skipping_first_contrecord = true;
contlen = xlp_rem_len as usize;
if offs < start_offset {
// Pre-condition failed: the beginning of the segment is unexpectedly corrupted.
ensure!(start_offset - offs >= contlen,
"start_offset is in the middle of the first record (which happens to be a contrecord), \
expected to be on a record boundary. Is beginning of the segment corrupted?");
contlen = 0;
// keep skipping_first_contrecord to avoid counting the contrecord as valid, we did not check it.
}
} else {
trace!(" first record is not contrecord");
}
} else {
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
}
// ... and step forward again if asked
trace!(" skipped header to 0x{:x}", offs);
offs = max(offs, start_offset);
// beginning of the next record
} else if contlen == 0 {
let page_offs = offs % XLOG_BLCKSZ;
let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize;
trace!("offs=0x{:x}: new record, xl_tot_len={}", offs, xl_tot_len);
if xl_tot_len == 0 {
info!(
"find_end_of_wal_segment reached zeros at {:?}, last records ends at {:?}",
Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)),
Lsn(XLogSegNoOffsetToRecPtr(
segno,
last_valid_rec_pos as u32,
wal_seg_size
))
);
break; // zeros, reached the end
}
if skipping_first_contrecord {
skipping_first_contrecord = false;
trace!(" first contrecord has been just completed");
} else {
trace!(
" updating last_valid_rec_pos: 0x{:x} --> 0x{:x}",
last_valid_rec_pos,
offs
);
last_valid_rec_pos = offs;
}
offs += 4;
rec_offs = 4;
contlen = xl_tot_len - 4;
trace!(
" reading rec_hdr[0..4] <-- [0x{:x}; 0x{:x})",
page_offs,
page_offs + 4
);
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
} else {
// we're continuing a record, possibly from previous page.
let page_offs = offs % XLOG_BLCKSZ;
let pageleft = XLOG_BLCKSZ - page_offs;
// read the rest of the record, or as much as fits on this page.
let n = min(contlen, pageleft);
trace!(
"offs=0x{:x}, record continuation, pageleft={}, contlen={}",
offs,
pageleft,
contlen
);
// fill rec_hdr header up to (but not including) xl_crc field
trace!(
" rec_offs={}, XLOG_RECORD_CRC_OFFS={}, XLOG_SIZE_OF_XLOG_RECORD={}",
rec_offs,
XLOG_RECORD_CRC_OFFS,
XLOG_SIZE_OF_XLOG_RECORD
);
if rec_offs < XLOG_RECORD_CRC_OFFS {
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
trace!(
" reading rec_hdr[{}..{}] <-- [0x{:x}; 0x{:x})",
rec_offs,
rec_offs + len,
page_offs,
page_offs + len
);
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
}
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
// All records are aligned on 8-byte boundary, so their 8-byte frames
// cannot be split between pages. As xl_crc is the last field,
// its content is always on the same page.
const_assert!(XLOG_RECORD_CRC_OFFS % 8 == 4);
// We should always start reading aligned records even in incorrect WALs so if
// the condition is false it is likely a bug. However, it is localized somewhere
// in this function, hence we do not crash and just report failure instead.
ensure!(crc_offs % 8 == 4, "Record is not aligned properly (bug?)");
xl_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
trace!(
" reading xl_crc: [0x{:x}; 0x{:x}) = 0x{:x}",
crc_offs,
crc_offs + 4,
xl_crc
);
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
trace!(
" initializing crc: [0x{:x}; 0x{:x}); crc = 0x{:x}",
crc_offs + 4,
page_offs + n,
crc
);
} else if rec_offs > XLOG_RECORD_CRC_OFFS {
// As all records are 8-byte aligned, the header is already fully read and `crc` is initialized in the branch above.
ensure!(rec_offs >= XLOG_SIZE_OF_XLOG_RECORD);
let old_crc = crc;
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
trace!(
" appending to crc: [0x{:x}; 0x{:x}); 0x{:x} --> 0x{:x}",
page_offs,
page_offs + n,
old_crc,
crc
);
} else {
// Correct because of the way conditions are written above.
assert!(rec_offs + n < XLOG_SIZE_OF_XLOG_RECORD);
// If `skipping_first_contrecord == true`, we may be reading from a middle of a record
// which started in the previous segment. Hence there is no point in validating the header.
if !skipping_first_contrecord && rec_offs + n > XLOG_RECORD_CRC_OFFS {
info!(
"Curiously corrupted WAL: a record stops inside the header; \
offs=0x{:x}, record continuation, pageleft={}, contlen={}",
offs, pageleft, contlen
);
break;
}
// Do nothing: we are still reading the header. It's accounted in CRC in the end of the record.
}
rec_offs += n;
offs += n;
contlen -= n;
if contlen == 0 {
trace!(" record completed at 0x{:x}", offs);
crc = crc32c_append(crc, &rec_hdr);
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
trace!(
" padded offs to 0x{:x}, crc is {:x}, expected crc is {:x}",
offs,
crc,
xl_crc
);
if skipping_first_contrecord {
// do nothing, the flag will go down on next iteration when we're reading new record
trace!(" first conrecord has been just completed");
} else if crc == xl_crc {
// record is valid, advance the result to its end (with
// alignment to the next record taken into account)
trace!(
" updating last_valid_rec_pos: 0x{:x} --> 0x{:x}",
last_valid_rec_pos,
offs
);
last_valid_rec_pos = offs;
} else {
info!(
"CRC mismatch {} vs {} at {}",
crc, xl_crc, last_valid_rec_pos
);
break;
}
}
}
}
trace!("last_valid_rec_pos=0x{:x}", last_valid_rec_pos);
Ok(last_valid_rec_pos as u32)
}
///
/// Scan a directory that contains PostgreSQL WAL files, for the end of WAL.
/// If precise, returns end LSN (next insertion point, basically);
/// otherwise, start of the last segment.
/// Returns (0, 0) if there is no WAL.
///
// Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
// start_lsn must point to some previously known record boundary (beginning of
// the next record). If no valid record after is found, start_lsn is returned
// back.
pub fn find_end_of_wal(
data_dir: &Path,
wal_seg_size: usize,
precise: bool,
start_lsn: Lsn, // start reading WAL at this point or later
) -> anyhow::Result<(XLogRecPtr, TimeLineID)> {
let mut high_segno: XLogSegNo = 0;
let mut high_tli: TimeLineID = 0;
let mut high_ispartial = false;
start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
) -> anyhow::Result<Lsn> {
let mut result = start_lsn;
let mut curr_lsn = start_lsn;
let mut buf = [0u8; XLOG_BLCKSZ];
let mut decoder = WalStreamDecoder::new(start_lsn);
for entry in fs::read_dir(data_dir).unwrap().flatten() {
let ispartial: bool;
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
/*
* Check if the filename looks like an xlog file, or a .partial file.
*/
if IsXLogFileName(fname) {
ispartial = false;
} else if IsPartialXLogFileName(fname) {
ispartial = true;
} else {
continue;
}
let (segno, tli) = XLogFromFileName(fname, wal_seg_size);
if !ispartial && entry.metadata().unwrap().len() != wal_seg_size as u64 {
continue;
}
if segno > high_segno
|| (segno == high_segno && tli > high_tli)
|| (segno == high_segno && tli == high_tli && high_ispartial && !ispartial)
{
high_segno = segno;
high_tli = tli;
high_ispartial = ispartial;
}
}
if high_segno > 0 {
let mut high_offs = 0;
/*
* Move the starting pointer to the start of the next segment, if the
* highest one we saw was completed.
*/
if !high_ispartial {
high_segno += 1;
} else if precise {
/* otherwise locate last record in last partial segment */
if start_lsn.segment_number(wal_seg_size) > high_segno {
bail!(
"provided start_lsn {:?} is beyond highest segno {:?} available",
start_lsn,
high_segno,
// loop over segments
loop {
let segno = curr_lsn.segment_number(wal_seg_size);
let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let seg_file_path = data_dir.join(seg_file_name);
match open_wal_segment(&seg_file_path)? {
None => {
// no more segments
info!(
"find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
result, seg_file_path
);
return Ok(result);
}
Some(mut segment) => {
let seg_offs = curr_lsn.segment_offset(wal_seg_size);
segment.seek(SeekFrom::Start(seg_offs as u64))?;
// loop inside segment
loop {
let bytes_read = segment.read(&mut buf)?;
if bytes_read == 0 {
break; // EOF
}
curr_lsn += bytes_read as u64;
decoder.feed_bytes(&buf[0..bytes_read]);
// advance result past all completely read records
loop {
match decoder.poll_decode() {
Ok(Some(record)) => result = record.0,
Err(e) => {
info!(
"find_end_of_wal reached end at {:?}, decode error: {:?}",
result, e
);
return Ok(result);
}
Ok(None) => break, // need more data
}
}
}
}
let start_offset = if start_lsn.segment_number(wal_seg_size) == high_segno {
start_lsn.segment_offset(wal_seg_size)
} else {
0
};
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
start_offset,
)?;
}
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
return Ok((high_ptr, high_tli));
}
Ok((0, 0))
}
// Open .partial or full WAL segment file, if present.
fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result<Option<File>> {
let mut partial_path = seg_file_path.to_owned();
partial_path.set_extension("partial");
match File::open(partial_path) {
Ok(file) => Ok(Some(file)),
Err(e) => match e.kind() {
ErrorKind::NotFound => {
// .partial not found, try full
match File::open(seg_file_path) {
Ok(file) => Ok(Some(file)),
Err(e) => match e.kind() {
ErrorKind::NotFound => Ok(None),
_ => Err(e.into()),
},
}
}
_ => Err(e.into()),
},
}
}
pub fn main() {
let mut data_dir = PathBuf::new();
data_dir.push(".");
let (wal_end, tli) = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, true, Lsn(0)).unwrap();
println!(
"wal_end={:>08X}{:>08X}, tli={}",
(wal_end >> 32) as u32,
wal_end as u32,
tli
);
let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap();
println!("wal_end={:?}", wal_end);
}
impl XLogRecord {
@@ -588,11 +345,93 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, Seriali
Ok(seg_buf.freeze())
}
#[repr(C)]
#[derive(Serialize)]
struct XlLogicalMessage {
db_id: Oid,
transactional: uint32, // bool, takes 4 bytes due to alignment in C structures
prefix_size: uint64,
message_size: uint64,
}
impl XlLogicalMessage {
pub fn encode(&self) -> Bytes {
use utils::bin_ser::LeSer;
self.ser().unwrap().into()
}
}
/// Create new WAL record for non-transactional logical message.
/// Used for creating artificial WAL for tests, as LogicalMessage
/// record is basically no-op.
///
/// NOTE: This leaves the xl_prev field zero. The safekeeper and
/// pageserver tolerate that, but PostgreSQL does not.
pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
let mut prefix_bytes: Vec<u8> = Vec::with_capacity(prefix.len() + 1);
prefix_bytes.write_all(prefix.as_bytes()).unwrap();
prefix_bytes.push(0);
let message_bytes = message.as_bytes();
let logical_message = XlLogicalMessage {
db_id: 0,
transactional: 0,
prefix_size: prefix_bytes.len() as u64,
message_size: message_bytes.len() as u64,
};
let mainrdata = logical_message.encode();
let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len();
// only short mainrdata is supported for now
assert!(mainrdata_len <= 255);
let mainrdata_len = mainrdata_len as u8;
let mut data: Vec<u8> = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len];
data.extend_from_slice(&mainrdata);
data.extend_from_slice(&prefix_bytes);
data.extend_from_slice(message_bytes);
let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len();
let mut header = XLogRecord {
xl_tot_len: total_len as u32,
xl_xid: 0,
xl_prev: 0,
xl_info: 0,
xl_rmid: 21,
__bindgen_padding_0: [0u8; 2usize],
xl_crc: 0, // crc will be calculated later
};
let header_bytes = header.encode().expect("failed to encode header");
let crc = crc32c_append(0, &data);
let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]);
header.xl_crc = crc;
let mut wal: Vec<u8> = Vec::new();
wal.extend_from_slice(&header.encode().expect("failed to encode header"));
wal.extend_from_slice(&data);
// WAL start position must be aligned at 8 bytes,
// this will add padding for the next WAL record.
const PADDING: usize = 8;
let padding_rem = wal.len() % PADDING;
if padding_rem != 0 {
wal.resize(wal.len() + PADDING - padding_rem, 0);
}
wal
}
#[cfg(test)]
mod tests {
use super::*;
use regex::Regex;
use std::cmp::min;
use std::fs;
use std::{env, str::FromStr};
use utils::const_assert;
fn init_logging() {
let _ = env_logger::Builder::from_env(
@@ -603,10 +442,7 @@ mod tests {
.try_init();
}
fn test_end_of_wal<C: wal_craft::Crafter>(
test_name: &str,
expected_end_of_wal_non_partial: Lsn,
) {
fn test_end_of_wal<C: wal_craft::Crafter>(test_name: &str) {
use wal_craft::*;
// Craft some WAL
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
@@ -627,7 +463,7 @@ mod tests {
.iter()
.map(|&lsn| u64::from(lsn).into())
.collect();
let expected_end_of_wal_partial: Lsn = u64::from(expected_end_of_wal_partial).into();
let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into();
srv.kill();
// Check find_end_of_wal on the initial WAL
@@ -639,10 +475,10 @@ mod tests {
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal_partial);
for start_lsn in std::iter::once(Lsn(0))
.chain(intermediate_lsns)
.chain(std::iter::once(expected_end_of_wal_partial))
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal);
for start_lsn in intermediate_lsns
.iter()
.chain(std::iter::once(&expected_end_of_wal))
{
// Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
// We assume that `start_lsn` is non-decreasing.
@@ -657,7 +493,7 @@ mod tests {
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(start_lsn) {
if seg_start_lsn > u64::from(*start_lsn) {
continue;
}
let mut f = File::options().write(true).open(file.path()).unwrap();
@@ -665,18 +501,12 @@ mod tests {
f.write_all(
&ZEROS[0..min(
WAL_SEGMENT_SIZE,
(u64::from(start_lsn) - seg_start_lsn) as usize,
(u64::from(*start_lsn) - seg_start_lsn) as usize,
)],
)
.unwrap();
}
check_end_of_wal(
&cfg,
&last_segment,
start_lsn,
expected_end_of_wal_non_partial,
expected_end_of_wal_partial,
);
check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
}
}
@@ -713,18 +543,15 @@ mod tests {
cfg: &wal_craft::Conf,
last_segment: &str,
start_lsn: Lsn,
expected_end_of_wal_non_partial: Lsn,
expected_end_of_wal_partial: Lsn,
expected_end_of_wal: Lsn,
) {
// Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
let (wal_end, tli) =
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
let wal_end = Lsn(wal_end);
info!(
"find_end_of_wal returned (wal_end={}, tli={}) with non-partial WAL segment",
wal_end, tli
);
assert_eq!(wal_end, expected_end_of_wal_non_partial);
// let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
// info!(
// "find_end_of_wal returned wal_end={} with non-partial WAL segment",
// wal_end
// );
// assert_eq!(wal_end, expected_end_of_wal_non_partial);
// Rename file to partial to actually find last valid lsn, then rename it back.
fs::rename(
@@ -732,14 +559,12 @@ mod tests {
cfg.wal_dir().join(format!("{}.partial", last_segment)),
)
.unwrap();
let (wal_end, tli) =
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
let wal_end = Lsn(wal_end);
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
info!(
"find_end_of_wal returned (wal_end={}, tli={}) with partial WAL segment",
wal_end, tli
"find_end_of_wal returned wal_end={} with partial WAL segment",
wal_end
);
assert_eq!(wal_end, expected_end_of_wal_partial);
assert_eq!(wal_end, expected_end_of_wal);
fs::rename(
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir().join(last_segment),
@@ -752,10 +577,7 @@ mod tests {
#[test]
pub fn test_find_end_of_wal_simple() {
init_logging();
test_end_of_wal::<wal_craft::Simple>(
"test_find_end_of_wal_simple",
"0/2000000".parse::<Lsn>().unwrap(),
);
test_end_of_wal::<wal_craft::Simple>("test_find_end_of_wal_simple");
}
#[test]
@@ -763,17 +585,14 @@ mod tests {
init_logging();
test_end_of_wal::<wal_craft::WalRecordCrossingSegmentFollowedBySmallOne>(
"test_find_end_of_wal_crossing_segment_followed_by_small_one",
"0/3000000".parse::<Lsn>().unwrap(),
);
}
#[test]
#[ignore = "not yet fixed, needs correct parsing of pre-last segments"] // TODO
pub fn test_find_end_of_wal_last_crossing_segment() {
init_logging();
test_end_of_wal::<wal_craft::LastWalRecordCrossingSegment>(
"test_find_end_of_wal_last_crossing_segment",
"0/3000000".parse::<Lsn>().unwrap(),
);
}
@@ -806,4 +625,15 @@ mod tests {
checkpoint.update_next_xid(1024);
assert_eq!(checkpoint.nextXid.value, 2048);
}
#[test]
pub fn test_encode_logical_message() {
let expected = [
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255,
38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114,
101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
];
let actual = encode_logical_message("prefix", "message");
assert_eq!(expected, actual[..]);
}
}

View File

@@ -10,7 +10,7 @@ anyhow = "1.0"
clap = "3.0"
env_logger = "0.9"
log = "0.4"
once_cell = "1.8.0"
once_cell = "1.13.0"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres_ffi = { path = "../" }
tempfile = "3.2"

View File

@@ -4,8 +4,8 @@ use log::*;
use once_cell::sync::Lazy;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::pg_constants::WAL_SEGMENT_SIZE;
use postgres_ffi::xlog_utils::{
use postgres_ffi::v14::pg_constants::WAL_SEGMENT_SIZE;
use postgres_ffi::v14::xlog_utils::{
XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::cmp::Ordering;

View File

@@ -7,7 +7,7 @@ edition = "2021"
anyhow = { version = "1.0", features = ["backtrace"] }
async-trait = "0.1"
metrics = { version = "0.1", path = "../metrics" }
once_cell = "1.8.0"
once_cell = "1.13.0"
rusoto_core = "0.48"
rusoto_s3 = "0.48"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -66,6 +66,9 @@ pub trait RemoteStorage: Send + Sync {
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
/// Lists all top level subdirectories for a given prefix
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
/// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
/// so this method doesnt need to.
async fn list_prefixes(
&self,
prefix: Option<Self::RemoteObjectId>,

View File

@@ -116,7 +116,7 @@ impl RemoteStorage for LocalFs {
prefix: Option<Self::RemoteObjectId>,
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
let path = match prefix {
Some(prefix) => Cow::Owned(self.storage_root.join(prefix)),
Some(prefix) => Cow::Owned(prefix),
None => Cow::Borrowed(&self.storage_root),
};
get_all_files(path.as_ref(), false).await

View File

@@ -171,17 +171,25 @@ impl S3Bucket {
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
// session token is used when authorizing through sso
// which is typically the case when testing locally on developer machine
let session_token = std::env::var("AWS_SESSION_TOKEN").ok();
let client = if access_key_id.is_none() && secret_access_key.is_none() {
debug!("Using IAM-based AWS access");
S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region)
} else {
debug!("Using credentials-based AWS access");
debug!(
"Using credentials-based AWS access. Session token is set: {}",
session_token.is_some()
);
S3Client::new_with(
request_dispatcher,
StaticProvider::new_minimal(
StaticProvider::new(
access_key_id.unwrap_or_default(),
secret_access_key.unwrap_or_default(),
session_token,
None,
),
region,
)
@@ -304,32 +312,24 @@ impl RemoteStorage for S3Bucket {
Ok(document_keys)
}
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(
&self,
prefix: Option<Self::RemoteObjectId>,
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
let list_prefix = match prefix {
Some(prefix) => {
let mut prefix_in_bucket = self.prefix_in_bucket.clone().unwrap_or_default();
// if there is no trailing / in default prefix and
// supplied prefix does not start with "/" insert it
if !(prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR)
|| prefix.0.starts_with(S3_PREFIX_SEPARATOR))
{
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
}
prefix_in_bucket.push_str(&prefix.0);
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| p.0)
.or_else(|| self.prefix_in_bucket.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if !prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) {
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
if !p.ends_with(S3_PREFIX_SEPARATOR) {
p.push(S3_PREFIX_SEPARATOR);
}
Some(prefix_in_bucket)
}
None => self.prefix_in_bucket.clone(),
};
p
});
let mut document_keys = Vec::new();

View File

@@ -8,7 +8,6 @@ anyhow = "1.0"
bincode = "1.3"
bytes = "1.0.1"
hyper = { version = "0.14.7", features = ["full"] }
lazy_static = "1.4.0"
pin-project-lite = "0.2.7"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
@@ -28,6 +27,8 @@ rustls = "0.20.2"
rustls-split = "0.3.0"
git-version = "0.3.5"
serde_with = "1.12.0"
once_cell = "1.13.0"
metrics = { path = "../metrics" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -265,7 +265,7 @@ mod tests {
use serde::{Deserialize, Serialize};
use std::io::Cursor;
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShortStruct {
a: u8,
b: u32,
@@ -286,7 +286,7 @@ mod tests {
const SHORT2_ENC_LE: &[u8] = &[8, 0, 0, 3, 7];
const SHORT2_ENC_LE_TRAILING: &[u8] = &[8, 0, 0, 3, 7, 0xff, 0xff, 0xff];
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LongMsg {
pub tag: u8,
pub blockpos: u32,

View File

@@ -0,0 +1,21 @@
use std::path::PathBuf;
use std::{os::unix::prelude::CommandExt, process::Command};
use std::fs::File;
pub trait NeonCommandExtensions: CommandExt {
fn capture_to_files(&mut self, path: PathBuf, name: &str) -> &mut Command;
}
impl NeonCommandExtensions for Command {
fn capture_to_files(&mut self, path: PathBuf, name: &str) -> &mut Command {
let out_file = File::create(path.join(format!("{}.out", name)))
.expect("can't make file");
let err_file = File::create(path.join(format!("{}.out", name)))
.expect("can't make file");
// TODO touch files?
self.stdout(out_file).stderr(err_file)
}
}

View File

@@ -4,8 +4,8 @@ use crate::zid::ZTenantId;
use anyhow::anyhow;
use hyper::header::AUTHORIZATION;
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
use lazy_static::lazy_static;
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
@@ -16,13 +16,13 @@ use std::net::TcpListener;
use super::error::ApiError;
lazy_static! {
static ref SERVE_METRICS_COUNT: IntCounter = register_int_counter!(
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
"Number of metric requests made"
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
async fn logger(res: Response<Body>, info: RequestInfo) -> Result<Response<Body>, ApiError> {
info!("{} {} {}", info.method(), info.uri().path(), res.status(),);

View File

@@ -10,12 +10,10 @@ pub fn get_request_param<'a>(
) -> Result<&'a str, ApiError> {
match request.param(param_name) {
Some(arg) => Ok(arg),
None => {
return Err(ApiError::BadRequest(format!(
"no {} specified in path param",
param_name
)))
}
None => Err(ApiError::BadRequest(format!(
"no {} specified in path param",
param_name
))),
}
}

View File

@@ -54,6 +54,9 @@ pub mod nonblock;
// Default signal handling
pub mod signals;
// Helpers for running commands
pub mod command_extensions;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -18,7 +18,7 @@ pub const XLOG_BLCKSZ: u32 = 8192;
pub struct Lsn(pub u64);
/// We tried to parse an LSN from a string, but failed
#[derive(Debug, PartialEq, thiserror::Error)]
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("LsnParseError")]
pub struct LsnParseError;

View File

@@ -50,7 +50,7 @@ pub trait Handler {
/// PostgresBackend protocol state.
/// XXX: The order of the constructors matters.
#[derive(Clone, Copy, PartialEq, PartialOrd)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd)]
pub enum ProtoState {
Initialization,
Encrypted,

View File

@@ -930,7 +930,7 @@ impl<'a> BeMessage<'a> {
// Neon extension of postgres replication protocol
// See NEON_STATUS_UPDATE_TAG_BYTE
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReplicationFeedback {
// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,

View File

@@ -9,7 +9,7 @@ use std::sync::Mutex;
use std::time::Duration;
/// An error happened while waiting for a number
#[derive(Debug, PartialEq, thiserror::Error)]
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("SeqWaitError")]
pub enum SeqWaitError {
/// The wait timeout was reached

View File

@@ -4,7 +4,7 @@ use serde::Deserialize;
use std::io::Read;
use utils::bin_ser::LeSer;
#[derive(Debug, PartialEq, Deserialize)]
#[derive(Debug, PartialEq, Eq, Deserialize)]
pub struct HeaderData {
magic: u16,
info: u16,

View File

@@ -7,7 +7,7 @@ use std::{
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use utils::postgres_backend::{AuthType, Handler, PostgresBackend};
@@ -19,18 +19,20 @@ fn make_tcp_pair() -> (TcpStream, TcpStream) {
(server_stream, client_stream)
}
lazy_static! {
static ref KEY: rustls::PrivateKey = {
let mut cursor = Cursor::new(include_bytes!("key.pem"));
rustls::PrivateKey(rustls_pemfile::rsa_private_keys(&mut cursor).unwrap()[0].clone())
};
static ref CERT: rustls::Certificate = {
let mut cursor = Cursor::new(include_bytes!("cert.pem"));
rustls::Certificate(rustls_pemfile::certs(&mut cursor).unwrap()[0].clone())
};
}
static KEY: Lazy<rustls::PrivateKey> = Lazy::new(|| {
let mut cursor = Cursor::new(include_bytes!("key.pem"));
rustls::PrivateKey(rustls_pemfile::rsa_private_keys(&mut cursor).unwrap()[0].clone())
});
static CERT: Lazy<rustls::Certificate> = Lazy::new(|| {
let mut cursor = Cursor::new(include_bytes!("cert.pem"));
rustls::Certificate(rustls_pemfile::certs(&mut cursor).unwrap()[0].clone())
});
#[test]
// [false-positive](https://github.com/rust-lang/rust-clippy/issues/9274),
// we resize the vector so doing some modifications after all
#[allow(clippy::read_zero_byte_vec)]
fn ssl() {
let (mut client_sock, server_sock) = make_tcp_pair();

View File

@@ -501,10 +501,10 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
// default_tenantid was generated by the `env.init()` call above
let initial_tenant_id = env.default_tenant_id.unwrap();
// Call 'pageserver init'.
// Initialize pageserver, create initial tenant and timeline.
let pageserver = PageServerNode::from_env(&env);
let initial_timeline_id = pageserver
.init(
.initialize(
Some(initial_tenant_id),
initial_timeline_id_arg,
&pageserver_config_overrides(init_match),
@@ -551,25 +551,15 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.values_of("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let new_tenant_id = pageserver
.tenant_create(initial_tenant_id, tenant_conf)?
.ok_or_else(|| {
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
})?;
println!(
"tenant {} successfully created on the pageserver",
new_tenant_id
);
let new_tenant_id = pageserver.tenant_create(initial_tenant_id, tenant_conf)?;
println!("tenant {new_tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
let timeline = pageserver
.timeline_create(new_tenant_id, new_timeline_id, None, None)?
.context(format!(
"Failed to create initial timeline for tenant {new_tenant_id}"
))?;
let new_timeline_id = timeline.timeline_id;
let last_record_lsn = timeline
let timeline_info =
pageserver.timeline_create(new_tenant_id, new_timeline_id, None, None)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info
.local
.context(format!("Failed to get last record LSN: no local timeline info for timeline {new_timeline_id}"))?
.last_record_lsn;
@@ -616,20 +606,18 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_branch_name = create_match
.value_of("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
let timeline = pageserver
.timeline_create(tenant_id, None, None, None)?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id;
let timeline_info = pageserver.timeline_create(tenant_id, None, None, None)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline
let last_record_lsn = timeline_info
.local
.expect("no local timeline info")
.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {} for tenant: {}",
timeline.timeline_id, last_record_lsn, tenant_id,
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
timeline_info.timeline_id
);
}
Some(("import", import_match)) => {
@@ -680,10 +668,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let ancestor_timeline_id = env
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
.ok_or_else(|| {
anyhow!(
"Found no timeline id for branch name '{}'",
ancestor_branch_name
)
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
})?;
let start_lsn = branch_match
@@ -691,12 +676,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.map(Lsn::from_str)
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let timeline = pageserver
.timeline_create(tenant_id, None, start_lsn, Some(ancestor_timeline_id))?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id;
let timeline_info = pageserver.timeline_create(
tenant_id,
None,
start_lsn,
Some(ancestor_timeline_id),
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline
let last_record_lsn = timeline_info
.local
.expect("no local timeline info")
.last_record_lsn;
@@ -704,11 +692,11 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {} for tenant: {}. Ancestor timeline: '{}'",
timeline.timeline_id, last_record_lsn, tenant_id, ancestor_branch_name,
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
timeline_info.timeline_id
);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
None => bail!("no tenant subcommand provided"),
}
@@ -884,7 +872,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
match sub_match.subcommand() {
Some(("start", start_match)) => {
if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) {
eprintln!("pageserver start failed: {}", e);
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
@@ -906,10 +894,19 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) {
eprintln!("pageserver start failed: {}", e);
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("status", _)) => match PageServerNode::from_env(env).check_status() {
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);
exit(1);
}
},
Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
None => bail!("no pageserver subcommand provided"),
}

View File

@@ -21,7 +21,6 @@ futures = "0.3.13"
hex = "0.4.3"
hyper = "0.14"
itertools = "0.10.3"
lazy_static = "1.4.0"
clap = "3.0"
daemonize = "0.4.1"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
@@ -48,7 +47,7 @@ tracing = "0.1.27"
signal-hook = "0.3.10"
url = "2"
nix = "0.23"
once_cell = "1.8.0"
once_cell = "1.13.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
git-version = "0.3.5"

View File

@@ -24,8 +24,13 @@ use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::DatadirTimeline;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName};
use postgres_ffi::v14::{CheckPoint, ControlFileData};
use postgres_ffi::TransactionId;
use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE};
use utils::lsn::Lsn;
/// This is short-living object only for the time of tarball creation,
@@ -200,7 +205,7 @@ where
}
// Add a file for each chunk of blocks (aka segment)
let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize);
let chunks = (0..nblocks).chunks(RELSEG_SIZE as usize);
for (seg, blocks) in chunks.into_iter().enumerate() {
let mut segment_data: Vec<u8> = vec![];
for blknum in blocks {
@@ -220,23 +225,19 @@ where
fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
let nblocks = self.timeline.get_slru_segment_size(slru, segno, self.lsn)?;
let mut slru_buf: Vec<u8> =
Vec::with_capacity(nblocks as usize * pg_constants::BLCKSZ as usize);
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
for blknum in 0..nblocks {
let img = self
.timeline
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)?;
if slru == SlruKind::Clog {
ensure!(
img.len() == pg_constants::BLCKSZ as usize
|| img.len() == pg_constants::BLCKSZ as usize + 8
);
ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
} else {
ensure!(img.len() == pg_constants::BLCKSZ as usize);
ensure!(img.len() == BLCKSZ as usize);
}
slru_buf.extend_from_slice(&img[..pg_constants::BLCKSZ as usize]);
slru_buf.extend_from_slice(&img[..BLCKSZ as usize]);
}
let segname = format!("{}/{:>04X}", slru.to_str(), segno);

View File

@@ -1,6 +1,6 @@
//! Main entry point for the Page Server executable.
use std::{env, path::Path, str::FromStr};
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use tracing::*;
use anyhow::{bail, Context, Result};
@@ -13,7 +13,7 @@ use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
thread_mgr::ThreadKind,
timelines, virtual_file, LOG_FILE_NAME,
virtual_file, LOG_FILE_NAME,
};
use utils::{
auth::JwtAuth,
@@ -24,7 +24,6 @@ use utils::{
shutdown::exit_now,
signals::{self, Signal},
tcp_listener,
zid::{ZTenantId, ZTimelineId},
};
project_git_version!(GIT_VERSION);
@@ -42,6 +41,7 @@ fn main() -> anyhow::Result<()> {
.about("Materializes WAL stream to pages and serves them to the postgres")
.version(&*version())
.arg(
Arg::new("daemonize")
.short('d')
.long("daemonize")
@@ -52,7 +52,7 @@ fn main() -> anyhow::Result<()> {
Arg::new("init")
.long("init")
.takes_value(false)
.help("Initialize pageserver service: creates an initial config, tenant and timeline, if specified"),
.help("Initialize pageserver with all given config overrides"),
)
.arg(
Arg::new("workdir")
@@ -61,20 +61,6 @@ fn main() -> anyhow::Result<()> {
.takes_value(true)
.help("Working directory for the pageserver"),
)
.arg(
Arg::new("create-tenant")
.long("create-tenant")
.takes_value(true)
.help("Create tenant during init")
.requires("init"),
)
.arg(
Arg::new("initial-timeline-id")
.long("initial-timeline-id")
.takes_value(true)
.help("Use a specific timeline id during init and tenant creation")
.requires("create-tenant"),
)
// See `settings.md` for more details on the extra configuration patameters pageserver can process
.arg(
Arg::new("config-override")
@@ -85,6 +71,9 @@ fn main() -> anyhow::Result<()> {
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
)
.arg(Arg::new("update-config").long("update-config").takes_value(false).help(
"Update the config file when started",
))
.arg(
Arg::new("enabled-features")
.long("enabled-features")
@@ -110,18 +99,6 @@ fn main() -> anyhow::Result<()> {
.with_context(|| format!("Error opening workdir '{}'", workdir.display()))?;
let cfg_file_path = workdir.join("pageserver.toml");
let init = arg_matches.is_present("init");
let create_tenant = arg_matches
.value_of("create-tenant")
.map(ZTenantId::from_str)
.transpose()
.context("Failed to parse tenant id from the arguments")?;
let initial_timeline_id = arg_matches
.value_of("initial-timeline-id")
.map(ZTimelineId::from_str)
.transpose()
.context("Failed to parse timeline id from the arguments")?;
// Set CWD to workdir for non-daemon modes
env::set_current_dir(&workdir).with_context(|| {
format!(
@@ -131,30 +108,86 @@ fn main() -> anyhow::Result<()> {
})?;
let daemonize = arg_matches.is_present("daemonize");
if init && daemonize {
bail!("--daemonize cannot be used with --init")
}
let mut toml = if init {
// We're initializing the repo, so there's no config file yet
DEFAULT_CONFIG_FILE
.parse::<toml_edit::Document>()
.context("could not parse built-in config file")?
} else {
// Supplement the CLI arguments with the config file
let cfg_file_contents = std::fs::read_to_string(&cfg_file_path)
.with_context(|| format!("No pageserver config at '{}'", cfg_file_path.display()))?;
cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| {
format!(
"Failed to read '{}' as pageserver config",
cfg_file_path.display()
)
})?
let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? {
ControlFlow::Continue(conf) => conf,
ControlFlow::Break(()) => {
info!("Pageserver config init successful");
return Ok(());
}
};
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
utils::crashsafe_dir::create_dir_all(conf.tenants_path()).with_context(|| {
format!(
"Failed to create tenants root dir at '{}'",
tenants_path.display()
)
})?;
}
// Initialize up failpoints support
let scenario = FailScenario::setup();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
}
fn initialize_config(
cfg_file_path: &Path,
arg_matches: clap::ArgMatches,
workdir: &Path,
) -> anyhow::Result<ControlFlow<(), &'static PageServerConf>> {
let init = arg_matches.is_present("init");
let update_config = init || arg_matches.is_present("update-config");
let (mut toml, config_file_exists) = if cfg_file_path.is_file() {
if init {
anyhow::bail!(
"Config file '{}' already exists, cannot init it, use --update-config to update it",
cfg_file_path.display()
);
}
// Supplement the CLI arguments with the config file
let cfg_file_contents = std::fs::read_to_string(&cfg_file_path).with_context(|| {
format!(
"Failed to read pageserver config at '{}'",
cfg_file_path.display()
)
})?;
(
cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| {
format!(
"Failed to parse '{}' as pageserver config",
cfg_file_path.display()
)
})?,
true,
)
} else if cfg_file_path.exists() {
anyhow::bail!(
"Config file '{}' exists but is not a regular file",
cfg_file_path.display()
);
} else {
// We're initializing the repo, so there's no config file yet
(
DEFAULT_CONFIG_FILE
.parse::<toml_edit::Document>()
.context("could not parse built-in config file")?,
false,
)
};
// Process any extra options given with -c
if let Some(values) = arg_matches.values_of("config-override") {
for option_line in values {
let doc = toml_edit::Document::from_str(option_line).with_context(|| {
@@ -165,49 +198,38 @@ fn main() -> anyhow::Result<()> {
})?;
for (key, item) in doc.iter() {
if key == "id" {
anyhow::ensure!(
init,
"node id can only be set during pageserver init and cannot be overridden"
);
if config_file_exists && update_config && key == "id" && toml.contains_key(key) {
anyhow::bail!("Pageserver config file exists at '{}' and has node id already, it cannot be overridden", cfg_file_path.display());
}
toml.insert(key, item.clone());
}
}
}
trace!("Resulting toml: {}", toml);
let conf = PageServerConf::parse_and_validate(&toml, &workdir)
debug!("Resulting toml: {toml}");
let conf = PageServerConf::parse_and_validate(&toml, workdir)
.context("Failed to parse pageserver configuration")?;
// The configuration is all set up now. Turn it into a 'static
// that can be freely stored in structs and passed across threads
// as a ref.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
if update_config {
info!("Writing pageserver config to '{}'", cfg_file_path.display());
// Initialize up failpoints support
let scenario = FailScenario::setup();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
// Create repo and exit if init was requested
if init {
timelines::init_pageserver(conf, create_tenant, initial_timeline_id)
.context("Failed to init pageserver")?;
// write the config file
std::fs::write(&cfg_file_path, toml.to_string()).with_context(|| {
format!(
"Failed to initialize pageserver config at '{}'",
"Failed to write pageserver config to '{}'",
cfg_file_path.display()
)
})?;
} else {
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
info!(
"Config successfully written to '{}'",
cfg_file_path.display()
)
}
scenario.teardown();
Ok(())
Ok(if init {
ControlFlow::Break(())
} else {
ControlFlow::Continue(Box::leak(Box::new(conf)))
})
}
fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> {

View File

@@ -59,6 +59,7 @@ pub mod defaults {
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
@@ -452,6 +453,13 @@ impl PageServerConf {
Some(parse_toml_u64("checkpoint_distance", checkpoint_distance)?);
}
if let Some(checkpoint_timeout) = item.get("checkpoint_timeout") {
t_conf.checkpoint_timeout = Some(parse_toml_duration(
"checkpoint_timeout",
checkpoint_timeout,
)?);
}
if let Some(compaction_target_size) = item.get("compaction_target_size") {
t_conf.compaction_target_size = Some(parse_toml_u64(
"compaction_target_size",

View File

@@ -32,6 +32,7 @@ pub struct TenantCreateRequest {
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<ZTenantId>,
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
@@ -70,6 +71,7 @@ pub struct TenantConfigRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
@@ -87,6 +89,7 @@ impl TenantConfigRequest {
TenantConfigRequest {
tenant_id,
checkpoint_distance: None,
checkpoint_timeout: None,
compaction_target_size: None,
compaction_period: None,
compaction_threshold: None,

View File

@@ -560,6 +560,8 @@ components:
type: string
checkpoint_distance:
type: integer
checkpoint_timeout:
type: string
compaction_period:
type: string
compaction_threshold:
@@ -578,6 +580,8 @@ components:
type: string
checkpoint_distance:
type: integer
checkpoint_timeout:
type: string
compaction_period:
type: string
compaction_threshold:

View File

@@ -11,14 +11,13 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::layered_repository::metadata::TimelineMetadata;
use crate::layered_repository::{metadata::TimelineMetadata, LayeredTimeline};
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::{LocalTimelineState, RepositoryTimeline};
use crate::repository::{Repository, Timeline};
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant_config::TenantConfOpt;
use crate::TimelineImpl;
use crate::{config::PageServerConf, tenant_mgr, timelines};
use utils::{
auth::JwtAuth,
@@ -86,7 +85,7 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
// Helper functions to construct a LocalTimelineInfo struct for a timeline
fn local_timeline_info_from_loaded_timeline(
timeline: &TimelineImpl,
timeline: &LayeredTimeline,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
@@ -161,13 +160,13 @@ fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> Lo
}
fn local_timeline_info_from_repo_timeline(
repo_timeline: &RepositoryTimeline<TimelineImpl>,
repo_timeline: &RepositoryTimeline<LayeredTimeline>,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
match repo_timeline {
RepositoryTimeline::Loaded(timeline) => local_timeline_info_from_loaded_timeline(
&*timeline,
timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
),
@@ -623,6 +622,11 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout =
Some(humantime::parse_duration(&checkpoint_timeout).map_err(ApiError::from_err)?);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
@@ -683,6 +687,10 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout =
Some(humantime::parse_duration(&checkpoint_timeout).map_err(ApiError::from_err)?);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;

View File

@@ -15,13 +15,24 @@ use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::v14::relfile_utils::*;
use postgres_ffi::v14::waldecoder::*;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::Oid;
use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
// Returns checkpoint LSN from controlfile
pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
// Read control file to extract the LSN
let controlfile_path = path.join("global").join("pg_control");
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
let lsn = controlfile.checkPoint;
Ok(Lsn(lsn))
}
///
/// Import all relation data pages from local disk into the repository.
///
@@ -37,7 +48,7 @@ pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
// Import all but pg_wal
@@ -56,12 +67,12 @@ pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
pg_control = Some(control_file);
}
modification.flush(lsn)?;
modification.flush()?;
}
}
// We're done importing all the data files.
modification.commit(lsn)?;
modification.commit()?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -110,8 +121,8 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
let mut buf: [u8; 8192] = [0u8; 8192];
ensure!(len % pg_constants::BLCKSZ as usize == 0);
let nblocks = len / pg_constants::BLCKSZ as usize;
ensure!(len % BLCKSZ as usize == 0);
let nblocks = len / BLCKSZ as usize;
let rel = RelTag {
spcnode: spcoid,
@@ -120,7 +131,7 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
forknum,
};
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
// Call put_rel_creation for every segment of the relation,
// because there is no guarantee about the order in which we are processing segments.
@@ -144,8 +155,7 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
let relative_blknum =
blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
break;
}
@@ -184,8 +194,8 @@ fn import_slru<T: DatadirTimeline, Reader: Read>(
.to_string_lossy();
let segno = u32::from_str_radix(filename, 16)?;
ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / pg_constants::BLCKSZ as usize;
ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / BLCKSZ as usize;
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
@@ -267,7 +277,7 @@ fn import_wal<T: DatadirTimeline>(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
@@ -301,7 +311,7 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
@@ -319,7 +329,7 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush(base_lsn)?;
modification.flush()?;
}
tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -333,7 +343,7 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit(base_lsn)?;
modification.commit()?;
Ok(())
}
@@ -385,7 +395,7 @@ pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification();
let mut modification = tline.begin_modification(end_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {

View File

@@ -1,5 +1,5 @@
use crate::repository::{key_range_size, singleton_range, Key};
use postgres_ffi::pg_constants;
use postgres_ffi::BLCKSZ;
use std::ops::Range;
///
@@ -19,7 +19,7 @@ impl KeySpace {
///
pub fn partition(&self, target_size: u64) -> KeyPartitioning {
// Assume that each value is 8k in size.
let target_nblocks = (target_size / pg_constants::BLCKSZ as u64) as usize;
let target_nblocks = (target_size / BLCKSZ as u64) as usize;
let mut parts = Vec::new();
let mut current_part = Vec::new();

View File

@@ -5,7 +5,7 @@
//! get/put call, walking back the timeline branching history as needed.
//!
//! The files are stored in the .neon/tenants/<tenantid>/timelines/<timelineid>
//! directory. See layered_repository/README for how the files are managed.
//! directory. See docs/pageserver-storage.md for how the files are managed.
//! In addition to the layer files, there is a metadata file in the same
//! directory that contains information about the timeline, in particular its
//! parent timeline, and the last LSN that has been written to disk.
@@ -59,7 +59,9 @@ mod storage_layer;
mod timeline;
use storage_layer::Layer;
use timeline::{LayeredTimeline, LayeredTimelineEntry};
use timeline::LayeredTimelineEntry;
pub use timeline::LayeredTimeline;
// re-export this function so that page_cache.rs can use it.
pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file;
@@ -433,6 +435,13 @@ impl LayeredRepository {
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
pub fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
pub fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf

View File

@@ -5,7 +5,7 @@
use crate::page_cache;
use crate::page_cache::{ReadBufResult, PAGE_SZ};
use bytes::Bytes;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
use std::sync::atomic::AtomicU64;
@@ -117,9 +117,7 @@ where
}
}
lazy_static! {
static ref NEXT_ID: AtomicU64 = AtomicU64::new(1);
}
static NEXT_ID: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(1));
/// An adapter for reading a (virtual) file using the page cache.
///

View File

@@ -209,7 +209,7 @@ where
reader: R,
}
#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VisitDirection {
Forwards,
Backwards,

View File

@@ -8,7 +8,7 @@ use crate::page_cache;
use crate::page_cache::PAGE_SZ;
use crate::page_cache::{ReadBufResult, WriteBufResult};
use crate::virtual_file::VirtualFile;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use std::cmp::min;
use std::collections::HashMap;
use std::fs::OpenOptions;
@@ -21,15 +21,15 @@ use utils::zid::{ZTenantId, ZTimelineId};
use std::os::unix::fs::FileExt;
lazy_static! {
///
/// This is the global cache of file descriptors (File objects).
///
static ref EPHEMERAL_FILES: RwLock<EphemeralFiles> = RwLock::new(EphemeralFiles {
///
/// This is the global cache of file descriptors (File objects).
///
static EPHEMERAL_FILES: Lazy<RwLock<EphemeralFiles>> = Lazy::new(|| {
RwLock::new(EphemeralFiles {
next_file_id: 1,
files: HashMap::new(),
});
}
})
});
pub struct EphemeralFiles {
next_file_id: u64,

View File

@@ -15,19 +15,18 @@ use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::{range_eq, range_overlaps};
use crate::repository::Key;
use anyhow::Result;
use lazy_static::lazy_static;
use metrics::{register_int_gauge, IntGauge};
use once_cell::sync::Lazy;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
lazy_static! {
static ref NUM_ONDISK_LAYERS: IntGauge =
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric");
}
static NUM_ONDISK_LAYERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric")
});
///
/// LayerMap tracks what layers exist on a timeline.

View File

@@ -4,11 +4,12 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
use lazy_static::lazy_static;
use metrics::core::{AtomicU64, GenericCounter};
use once_cell::sync::Lazy;
use tracing::*;
use std::cmp::{max, min, Ordering};
use std::collections::HashSet;
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::Write;
@@ -16,7 +17,7 @@ use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::atomic::{self, AtomicBool, AtomicIsize, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant, SystemTime};
use metrics::{
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
@@ -38,11 +39,13 @@ use crate::layered_repository::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
use crate::DatadirTimeline;
use postgres_ffi::xlog_utils::to_pg_timestamp;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use utils::{
lsn::{AtomicLsn, Lsn, RecordLsn},
seqwait::SeqWait,
@@ -58,76 +61,102 @@ use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::{page_cache, storage_sync};
/// Prometheus histogram buckets (in seconds) that capture the majority of
/// latencies in the microsecond range but also extend far enough up to distinguish
/// "bad" from "really bad".
fn get_buckets_for_critical_operations() -> Vec<f64> {
let buckets_per_digit = 5;
let min_exponent = -6;
let max_exponent = 2;
let mut buckets = vec![];
// Compute 10^(exp / buckets_per_digit) instead of 10^(1/buckets_per_digit)^exp
// because it's more numerically stable and doesn't result in numbers like 9.999999
for exp in (min_exponent * buckets_per_digit)..=(max_exponent * buckets_per_digit) {
buckets.push(10_f64.powf(exp as f64 / buckets_per_digit as f64))
}
buckets
}
// Metrics collected on operations on the storage repository.
lazy_static! {
pub static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
pub static STORAGE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_storage_operations_seconds",
"Time spent on storage operations",
&["operation", "tenant_id", "timeline_id"]
&["operation", "tenant_id", "timeline_id"],
get_buckets_for_critical_operations(),
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref RECONSTRUCT_TIME: HistogramVec = register_histogram_vec!(
static RECONSTRUCT_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value",
&["tenant_id", "timeline_id"]
&["tenant_id", "timeline_id"],
get_buckets_for_critical_operations(),
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
lazy_static! {
static ref MATERIALIZED_PAGE_CACHE_HIT: IntCounterVec = register_int_counter_vec!(
static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
static ref WAIT_LSN_TIME: HistogramVec = register_histogram_vec!(
.expect("failed to define a metric")
});
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_wait_lsn_seconds",
"Time spent waiting for WAL to arrive",
&["tenant_id", "timeline_id"]
&["tenant_id", "timeline_id"],
get_buckets_for_critical_operations(),
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
lazy_static! {
static ref LAST_RECORD_LSN: IntGaugeVec = register_int_gauge_vec!(
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
"Last record LSN grouped by timeline",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
// Metrics for determining timeline's physical size.
// A layered timeline's physical is defined as the total size of
// (delta/image) layer files on disk.
lazy_static! {
static ref CURRENT_PHYSICAL_SIZE: UIntGaugeVec = register_uint_gauge_vec!(
static CURRENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_current_physical_size",
"Current physical size grouped by timeline",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
// or in testing they estimate how much we would upload if we did.
lazy_static! {
static ref NUM_PERSISTENT_FILES_CREATED: IntCounter = register_int_counter!(
static NUM_PERSISTENT_FILES_CREATED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_created_persistent_files_total",
"Number of files created that are meant to be uploaded to cloud storage",
)
.expect("failed to define a metric");
static ref PERSISTENT_BYTES_WRITTEN: IntCounter = register_int_counter!(
.expect("failed to define a metric")
});
static PERSISTENT_BYTES_WRITTEN: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_written_persistent_bytes_total",
"Total bytes written that are meant to be uploaded to cloud storage",
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
#[derive(Clone)]
pub enum LayeredTimelineEntry {
@@ -195,6 +224,70 @@ impl From<LayeredTimelineEntry> for RepositoryTimeline<LayeredTimeline> {
}
}
struct TimelineMetrics {
pub reconstruct_time_histo: Histogram,
pub materialized_page_cache_hit_counter: GenericCounter<AtomicU64>,
pub flush_time_histo: Histogram,
pub compact_time_histo: Histogram,
pub create_images_time_histo: Histogram,
pub init_logical_size_histo: Histogram,
pub load_layer_map_histo: Histogram,
pub last_record_gauge: IntGauge,
pub wait_lsn_time_histo: Histogram,
pub current_physical_size_gauge: UIntGauge,
}
impl TimelineMetrics {
fn new(tenant_id: &ZTenantId, timeline_id: &ZTimelineId) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let flush_time_histo = STORAGE_TIME
.get_metric_with_label_values(&["layer flush", &tenant_id, &timeline_id])
.unwrap();
let compact_time_histo = STORAGE_TIME
.get_metric_with_label_values(&["compact", &tenant_id, &timeline_id])
.unwrap();
let create_images_time_histo = STORAGE_TIME
.get_metric_with_label_values(&["create images", &tenant_id, &timeline_id])
.unwrap();
let init_logical_size_histo = STORAGE_TIME
.get_metric_with_label_values(&["init logical size", &tenant_id, &timeline_id])
.unwrap();
let load_layer_map_histo = STORAGE_TIME
.get_metric_with_label_values(&["load layer map", &tenant_id, &timeline_id])
.unwrap();
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let wait_lsn_time_histo = WAIT_LSN_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
TimelineMetrics {
reconstruct_time_histo,
materialized_page_cache_hit_counter,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
init_logical_size_histo,
load_layer_map_histo,
last_record_gauge,
wait_lsn_time_histo,
current_physical_size_gauge,
}
}
}
pub struct LayeredTimeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -205,6 +298,8 @@ pub struct LayeredTimeline {
pub layers: RwLock<LayerMap>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
// WAL redo manager
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
@@ -239,14 +334,7 @@ pub struct LayeredTimeline {
ancestor_lsn: Lsn,
// Metrics
reconstruct_time_histo: Histogram,
materialized_page_cache_hit_counter: IntCounter,
flush_time_histo: Histogram,
compact_time_histo: Histogram,
create_images_time_histo: Histogram,
last_record_gauge: IntGauge,
wait_lsn_time_histo: Histogram,
current_physical_size_gauge: UIntGauge,
metrics: TimelineMetrics,
/// If `true`, will backup its files that appear after each checkpointing to the remote storage.
upload_layers: AtomicBool,
@@ -295,6 +383,9 @@ pub struct LayeredTimeline {
/// or None if WAL receiver has not received anything for this timeline
/// yet.
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
/// Relation size cache
rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
}
pub struct WalReceiverInfo {
@@ -306,7 +397,42 @@ pub struct WalReceiverInfo {
/// Inherit all the functions from DatadirTimeline, to provide the
/// functionality to store PostgreSQL relations, SLRUs, etc. in a
/// LayeredTimeline.
impl DatadirTimeline for LayeredTimeline {}
impl DatadirTimeline for LayeredTimeline {
fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
if lsn >= *cached_lsn {
return Some(*nblocks);
}
}
None
}
fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
match rel_size_cache.entry(tag) {
Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
}
}
}
fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
rel_size_cache.insert(tag, (lsn, nblocks));
}
fn remove_cached_rel_size(&self, tag: &RelTag) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
rel_size_cache.remove(tag);
}
}
///
/// Information about how much history needs to be retained, needed by
@@ -358,7 +484,7 @@ impl Timeline for LayeredTimeline {
"wait_lsn called by WAL receiver thread"
);
self.wait_lsn_time_histo.observe_closure_duration(
self.metrics.wait_lsn_time_histo.observe_closure_duration(
|| self.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.with_context(|| {
@@ -377,8 +503,6 @@ impl Timeline for LayeredTimeline {
/// Look up the value with the given a key
fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes> {
debug_assert!(lsn <= self.get_last_record_lsn());
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
@@ -402,7 +526,8 @@ impl Timeline for LayeredTimeline {
self.get_reconstruct_data(key, lsn, &mut reconstruct_state)?;
self.reconstruct_time_histo
self.metrics
.reconstruct_time_histo
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
}
@@ -464,7 +589,7 @@ impl Timeline for LayeredTimeline {
}
fn get_physical_size(&self) -> u64 {
self.current_physical_size_gauge.get()
self.metrics.current_physical_size_gauge.get()
}
fn get_physical_size_non_incremental(&self) -> anyhow::Result<u64> {
@@ -496,6 +621,13 @@ impl LayeredTimeline {
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
@@ -531,43 +663,6 @@ impl LayeredTimeline {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
) -> LayeredTimeline {
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let flush_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"layer flush",
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let compact_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"compact",
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let create_images_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"create images",
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let wait_lsn_time_histo = WAIT_LSN_TIME
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let mut result = LayeredTimeline {
conf,
tenant_conf,
@@ -585,18 +680,12 @@ impl LayeredTimeline {
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn().0),
last_freeze_at: AtomicLsn::new(metadata.disk_consistent_lsn().0),
last_freeze_ts: RwLock::new(Instant::now()),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),
reconstruct_time_histo,
materialized_page_cache_hit_counter,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
last_record_gauge,
wait_lsn_time_histo,
current_physical_size_gauge,
metrics: TimelineMetrics::new(&tenant_id, &timeline_id),
upload_layers: AtomicBool::new(upload_layers),
@@ -618,6 +707,7 @@ impl LayeredTimeline {
repartition_threshold: 0,
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -631,6 +721,8 @@ impl LayeredTimeline {
let mut layers = self.layers.write().unwrap();
let mut num_layers = 0;
let timer = self.metrics.load_layer_map_histo.start_timer();
// Scan timeline directory and create ImageFileName and DeltaFilename
// structs representing all files on disk
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
@@ -702,7 +794,11 @@ impl LayeredTimeline {
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
);
self.current_physical_size_gauge.set(total_physical_size);
self.metrics
.current_physical_size_gauge
.set(total_physical_size);
timer.stop_and_record();
Ok(())
}
@@ -733,12 +829,16 @@ impl LayeredTimeline {
}
}
let timer = self.metrics.init_logical_size_histo.start_timer();
// Have to calculate it the hard way
let last_lsn = self.get_last_record_lsn();
let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?;
self.current_logical_size
.store(logical_size as isize, AtomicOrdering::SeqCst);
debug!("calculated logical size the hard way: {}", logical_size);
timer.stop_and_record();
Ok(())
}
@@ -803,7 +903,7 @@ impl LayeredTimeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.materialized_page_cache_hit_counter.inc_by(1);
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
return Ok(());
}
if prev_lsn <= cont_lsn {
@@ -999,7 +1099,7 @@ impl LayeredTimeline {
fn finish_write(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
self.last_record_gauge.set(new_lsn.0 as i64);
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn);
}
@@ -1029,8 +1129,11 @@ impl LayeredTimeline {
}
///
/// Check if more than 'checkpoint_distance' of WAL has been accumulated
/// in the in-memory layer, and initiate flushing it if so.
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
///
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn();
@@ -1038,21 +1141,27 @@ impl LayeredTimeline {
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}",
distance, open_layer_size
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a thread to flush the frozen layer to disk, unless
// a thread was already running. (If the thread was running
@@ -1094,7 +1203,7 @@ impl LayeredTimeline {
}
};
let timer = self.flush_time_histo.start_timer();
let timer = self.metrics.flush_time_histo.start_timer();
loop {
let layers = self.layers.read().unwrap();
@@ -1265,7 +1374,7 @@ impl LayeredTimeline {
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
self.current_physical_size_gauge.add(sz);
self.metrics.current_physical_size_gauge.add(sz);
// update metrics
NUM_PERSISTENT_FILES_CREATED.inc_by(1);
PERSISTENT_BYTES_WRITTEN.inc_by(sz);
@@ -1334,7 +1443,7 @@ impl LayeredTimeline {
}
// 3. Compact
let timer = self.compact_time_histo.start_timer();
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
@@ -1410,7 +1519,7 @@ impl LayeredTimeline {
lsn: Lsn,
force: bool,
) -> Result<HashSet<PathBuf>> {
let timer = self.create_images_time_histo.start_timer();
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers: Vec<ImageLayer> = Vec::new();
let mut layer_paths_to_upload = HashSet::new();
for partition in partitioning.parts.iter() {
@@ -1454,7 +1563,8 @@ impl LayeredTimeline {
let mut layers = self.layers.write().unwrap();
for l in image_layers {
self.current_physical_size_gauge
self.metrics
.current_physical_size_gauge
.add(l.path().metadata()?.len());
layers.insert_historic(Arc::new(l));
}
@@ -1704,7 +1814,8 @@ impl LayeredTimeline {
let new_delta_path = l.path();
// update the timeline's physical size
self.current_physical_size_gauge
self.metrics
.current_physical_size_gauge
.add(new_delta_path.metadata()?.len());
new_layer_paths.insert(new_delta_path);
@@ -1717,7 +1828,9 @@ impl LayeredTimeline {
drop(all_keys_iter);
for l in deltas_to_compact {
if let Some(path) = l.local_path() {
self.current_physical_size_gauge.sub(path.metadata()?.len());
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_paths_do_delete.insert(path);
}
l.delete()?;
@@ -1974,7 +2087,9 @@ impl LayeredTimeline {
let mut layer_paths_to_delete = HashSet::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove {
if let Some(path) = doomed_layer.local_path() {
self.current_physical_size_gauge.sub(path.metadata()?.len());
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_paths_to_delete.insert(path);
}
doomed_layer.delete()?;

View File

@@ -22,13 +22,12 @@ pub mod walreceiver;
pub mod walrecord;
pub mod walredo;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use tracing::info;
use crate::thread_mgr::ThreadKind;
use metrics::{register_int_gauge_vec, IntGaugeVec};
use layered_repository::LayeredRepository;
use pgdatadir_mapping::DatadirTimeline;
/// Current storage format version
@@ -42,14 +41,14 @@ pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
lazy_static! {
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(
static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_live_connections",
"Number of live network connections",
&["pageserver_connection_kind"]
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
pub const LOG_FILE_NAME: &str = "pageserver.log";
@@ -62,9 +61,6 @@ pub enum CheckpointConfig {
Forced,
}
pub type RepositoryImpl = LayeredRepository;
pub type TimelineImpl = <LayeredRepository as repository::Repository>::Timeline;
pub fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint thread. This prevents new connections from
// being accepted.
@@ -93,3 +89,56 @@ pub fn shutdown_pageserver(exit_code: i32) {
info!("Shut down successfully completed");
std::process::exit(exit_code);
}
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
info!(
"Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
);
tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
}
}
fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
} else {
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
}
}
#[cfg(test)]
mod backoff_defaults_tests {
use super::*;
#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;
for i in 0..10_000 {
let new_backoff_value = exponential_backoff_duration_seconds(
i,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
assert!(
old_backoff_value <= new_backoff_value,
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
)
}
}
assert_eq!(
current_backoff_value.expect("Should have produced backoff values to compare"),
DEFAULT_MAX_BACKOFF_SECONDS,
"Given big enough of retries, backoff should reach its allowed max value"
);
}
}

View File

@@ -83,7 +83,7 @@ pub fn get() -> &'static PageCache {
}
}
pub const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize;
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
const MAX_USAGE_COUNT: u8 = 5;
///

View File

@@ -11,7 +11,7 @@
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use regex::Regex;
use std::io::{self, Read};
use std::net::TcpListener;
@@ -40,9 +40,10 @@ use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::CheckpointConfig;
use metrics::{register_histogram_vec, HistogramVec};
use postgres_ffi::xlog_utils::to_pg_timestamp;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use postgres_ffi::pg_constants;
use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
@@ -434,15 +435,15 @@ const TIME_BUCKETS: &[f64] = &[
0.1, // 1/10 s
];
lazy_static! {
static ref SMGR_QUERY_TIME: HistogramVec = register_histogram_vec!(
static SMGR_QUERY_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_smgr_query_seconds",
"Time spent on smgr query handling",
&["smgr_query_type", "tenant_id", "timeline_id"],
TIME_BUCKETS.into()
)
.expect("failed to define a metric");
}
.expect("failed to define a metric")
});
impl PageServerHandler {
pub fn new(conf: &'static PageServerConf, auth: Option<Arc<JwtAuth>>) -> Self {
@@ -725,10 +726,9 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let total_blocks =
timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let total_blocks = timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64;
let db_size = total_blocks as i64 * BLCKSZ as i64;
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
db_size,
@@ -1044,6 +1044,7 @@ impl postgres_backend::Handler for PageServerHandler {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
RowDescriptor::int8_col(b"compaction_target_size"),
RowDescriptor::int8_col(b"compaction_period"),
RowDescriptor::int8_col(b"compaction_threshold"),
@@ -1054,6 +1055,12 @@ impl postgres_backend::Handler for PageServerHandler {
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(repo.get_checkpoint_distance().to_string().as_bytes()),
Some(
repo.get_checkpoint_timeout()
.as_secs()
.to_string()
.as_bytes(),
),
Some(repo.get_compaction_target_size().to_string().as_bytes()),
Some(
repo.get_compaction_period()

View File

@@ -13,8 +13,10 @@ use crate::repository::*;
use crate::walrecord::ZenithWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
use postgres_ffi::xlog_utils::TimestampTz;
use postgres_ffi::{pg_constants, Oid, TransactionId};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::TimestampTz;
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::ops::Range;
@@ -56,13 +58,16 @@ pub trait DatadirTimeline: Timeline {
/// This provides a transaction-like interface to perform a bunch
/// of modifications atomically.
///
/// To ingest a WAL record, call begin_modification() to get a
/// To ingest a WAL record, call begin_modification(lsn) to get a
/// DatadirModification object. Use the functions in the object to
/// modify the repository state, updating all the pages and metadata
/// that the WAL record affects. When you're done, call commit(lsn) to
/// commit the changes. All the changes will be stamped with the specified LSN.
/// that the WAL record affects. When you're done, call commit() to
/// commit the changes.
///
/// Calling commit(lsn) will flush all the changes and reset the state,
/// Lsn stored in modification is advanced by `ingest_record` and
/// is used by `commit()` to update `last_record_lsn`.
///
/// Calling commit() will flush all the changes and reset the state,
/// so the `DatadirModification` struct can be reused to perform the next modification.
///
/// Note that any pending modifications you make through the
@@ -70,7 +75,7 @@ pub trait DatadirTimeline: Timeline {
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
fn begin_modification(&self) -> DatadirModification<Self>
fn begin_modification(&self, lsn: Lsn) -> DatadirModification<Self>
where
Self: Sized,
{
@@ -79,6 +84,7 @@ pub trait DatadirTimeline: Timeline {
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
lsn,
}
}
@@ -120,6 +126,10 @@ pub trait DatadirTimeline: Timeline {
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
ensure!(tag.relnode != 0, "invalid relnode");
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
return Ok(nblocks);
}
if (tag.forknum == pg_constants::FSM_FORKNUM
|| tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, lsn)?
@@ -133,13 +143,21 @@ pub trait DatadirTimeline: Timeline {
let key = rel_size_to_key(tag);
let mut buf = self.get(key, lsn)?;
Ok(buf.get_u32_le())
let nblocks = buf.get_u32_le();
// Update relation size cache
self.update_cached_rel_size(tag, lsn, nblocks);
Ok(nblocks)
}
/// Does relation exist?
fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
ensure!(tag.relnode != 0, "invalid relnode");
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
return Ok(true);
}
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = self.get(key, lsn)?;
@@ -281,9 +299,9 @@ pub trait DatadirTimeline: Timeline {
let clog_page =
self.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)?;
if clog_page.len() == pg_constants::BLCKSZ as usize + 8 {
if clog_page.len() == BLCKSZ as usize + 8 {
let mut timestamp_bytes = [0u8; 8];
timestamp_bytes.copy_from_slice(&clog_page[pg_constants::BLCKSZ as usize..]);
timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);
if timestamp >= search_timestamp {
@@ -366,7 +384,7 @@ pub trait DatadirTimeline: Timeline {
total_size += relsize as usize;
}
}
Ok(total_size * pg_constants::BLCKSZ as usize)
Ok(total_size * BLCKSZ as usize)
}
///
@@ -445,6 +463,18 @@ pub trait DatadirTimeline: Timeline {
Ok(result.to_keyspace())
}
/// Get cached size of relation if it not updated after specified LSN
fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber>;
/// Update cached relation size if there is no more recent update
fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber);
/// Store cached relation size
fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber);
/// Remove cached relation size
fn remove_cached_rel_size(&self, tag: &RelTag);
}
/// DatadirModification represents an operation to ingest an atomic set of
@@ -457,6 +487,9 @@ pub struct DatadirModification<'a, T: DatadirTimeline> {
/// in the state in 'tline' yet.
pub tline: &'a T,
/// Lsn assigned by begin_modification
pub lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
@@ -666,26 +699,36 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
self.pending_nblocks += nblocks as isize;
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
// Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
// caller.
Ok(())
}
/// Truncate relation
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
ensure!(rel.relnode != 0, "invalid relnode");
let size_key = rel_size_to_key(rel);
let last_lsn = self.tline.get_last_record_lsn();
if self.tline.get_rel_exists(rel, last_lsn)? {
let size_key = rel_size_to_key(rel);
// Fetch the old size first
let old_size = self.get(size_key)?.get_u32_le();
// Fetch the old size first
let old_size = self.get(size_key)?.get_u32_le();
// Update the entry with the new size.
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
// Update the entry with the new size.
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
// Update logical database size.
self.pending_nblocks -= old_size as isize - nblocks as isize;
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
// Update logical database size.
self.pending_nblocks -= old_size as isize - nblocks as isize;
}
Ok(())
}
@@ -703,6 +746,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
self.pending_nblocks += nblocks as isize - old_size as isize;
}
Ok(())
@@ -728,6 +774,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
let old_size = self.get(size_key)?.get_u32_le();
self.pending_nblocks -= old_size as isize;
// Remove enty from relation size cache
self.tline.remove_cached_rel_size(&rel);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel));
@@ -842,7 +891,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self, lsn: Lsn) -> Result<()> {
pub fn flush(&mut self) -> Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -856,7 +905,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
let mut result: Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, lsn, value);
result = writer.put(key, self.lsn, value);
false
} else {
true
@@ -865,7 +914,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
result?;
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize);
self.pending_nblocks = 0;
}
@@ -877,9 +926,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
pub fn commit(&mut self) -> Result<()> {
let writer = self.tline.writer();
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
@@ -893,7 +942,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
writer.finish_write(lsn);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize);
}
Ok(())
@@ -919,8 +968,8 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
bail!("unexpected pending WAL record");
}
} else {
let last_lsn = self.tline.get_last_record_lsn();
self.tline.get(key, last_lsn)
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn)
}
}
@@ -967,7 +1016,7 @@ struct SlruSegmentDirectory {
segments: HashSet<u32>,
}
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; pg_constants::BLCKSZ as usize]);
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// Layout of the Key address space
//
@@ -1324,9 +1373,9 @@ pub fn create_test_timeline<R: Repository>(
timeline_id: utils::zid::ZTimelineId,
) -> Result<std::sync::Arc<R::Timeline>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let mut m = tline.begin_modification();
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit(Lsn(8))?;
m.commit()?;
Ok(tline)
}

View File

@@ -2,8 +2,9 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::{pg_constants, Oid};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::relfile_utils::forknumber_to_name;
use postgres_ffi::Oid;
///
/// Relation data file segment id throughout the Postgres cluster.

View File

@@ -408,11 +408,10 @@ pub trait TimelineWriter<'a> {
#[cfg(test)]
pub mod repo_harness {
use bytes::BytesMut;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{fs, path::PathBuf};
use crate::RepositoryImpl;
use crate::{
config::PageServerConf,
layered_repository::LayeredRepository,
@@ -439,14 +438,13 @@ pub mod repo_harness {
buf.freeze()
}
lazy_static! {
static ref LOCK: RwLock<()> = RwLock::new(());
}
static LOCK: Lazy<RwLock<()>> = Lazy::new(|| RwLock::new(()));
impl From<TenantConf> for TenantConfOpt {
fn from(tenant_conf: TenantConf) -> Self {
Self {
checkpoint_distance: Some(tenant_conf.checkpoint_distance),
checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
compaction_target_size: Some(tenant_conf.compaction_target_size),
compaction_period: Some(tenant_conf.compaction_period),
compaction_threshold: Some(tenant_conf.compaction_threshold),
@@ -509,11 +507,11 @@ pub mod repo_harness {
})
}
pub fn load(&self) -> RepositoryImpl {
pub fn load(&self) -> LayeredRepository {
self.try_load().expect("failed to load test repo")
}
pub fn try_load(&self) -> Result<RepositoryImpl> {
pub fn try_load(&self) -> Result<LayeredRepository> {
let walredo_mgr = Arc::new(TestRedoManager);
let repo = LayeredRepository::new(
@@ -589,11 +587,10 @@ mod tests {
//use std::sync::Arc;
use bytes::BytesMut;
use hex_literal::hex;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
lazy_static! {
static ref TEST_KEY: Key = Key::from_slice(&hex!("112222222233333333444444445500000001"));
}
static TEST_KEY: Lazy<Key> =
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
#[test]
fn test_basic() -> Result<()> {

View File

@@ -155,8 +155,7 @@ use std::{
use anyhow::{anyhow, bail, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::{
fs,
@@ -173,6 +172,7 @@ use self::{
};
use crate::{
config::PageServerConf,
exponential_backoff,
layered_repository::{
ephemeral_file::is_ephemeral_file,
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
@@ -184,8 +184,8 @@ use crate::{
};
use metrics::{
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
HistogramVec, IntCounter, IntCounterVec, IntGauge,
register_histogram_vec, register_int_counter_vec, register_int_gauge, HistogramVec,
IntCounterVec, IntGauge,
};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -193,32 +193,33 @@ use self::download::download_index_parts;
pub use self::download::gather_tenant_timelines_index_parts;
pub use self::download::TEMP_DOWNLOAD_EXTENSION;
lazy_static! {
static ref REMAINING_SYNC_ITEMS: IntGauge = register_int_gauge!(
static REMAINING_SYNC_ITEMS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_remote_storage_remaining_sync_items",
"Number of storage sync items left in the queue"
)
.expect("failed to register pageserver remote storage remaining sync items int gauge");
static ref FATAL_TASK_FAILURES: IntCounter = register_int_counter!(
"pageserver_remote_storage_fatal_task_failures_total",
"Number of critically failed tasks"
)
.expect("failed to register pageserver remote storage remaining sync items int gauge");
static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!(
.expect("failed to register pageserver remote storage remaining sync items int gauge")
});
static IMAGE_SYNC_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_storage_image_sync_seconds",
"Time took to synchronize (download or upload) a whole pageserver image. \
Grouped by tenant and timeline ids, `operation_kind` (upload|download) and `status` (success|failure)",
&["tenant_id", "timeline_id", "operation_kind", "status"],
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 3.0, 10.0, 20.0]
)
.expect("failed to register pageserver image sync time histogram vec");
static ref REMOTE_INDEX_UPLOAD: IntCounterVec = register_int_counter_vec!(
.expect("failed to register pageserver image sync time histogram vec")
});
static REMOTE_INDEX_UPLOAD: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_storage_remote_index_uploads_total",
"Number of remote index uploads",
&["tenant_id", "timeline_id"],
)
.expect("failed to register pageserver remote index upload vec");
}
.expect("failed to register pageserver remote index upload vec")
});
static SYNC_QUEUE: OnceCell<SyncQueue> = OnceCell::new();
@@ -969,14 +970,19 @@ fn storage_sync_loop<P, S>(
}
}
// needed to check whether the download happened
// more informative than just a bool
#[derive(Debug)]
enum DownloadMarker {
enum DownloadStatus {
Downloaded,
Nothing,
}
#[derive(Debug)]
enum UploadStatus {
Uploaded,
Failed(anyhow::Error),
Nothing,
}
async fn process_batches<P, S>(
conf: &'static PageServerConf,
max_sync_errors: NonZeroU32,
@@ -1016,7 +1022,7 @@ where
"Finished storage sync task for sync id {sync_id} download marker {:?}",
download_marker
);
if matches!(download_marker, DownloadMarker::Downloaded) {
if matches!(download_marker, DownloadStatus::Downloaded) {
downloaded_timelines.insert(sync_id.tenant_id);
}
}
@@ -1030,7 +1036,7 @@ async fn process_sync_task_batch<P, S>(
max_sync_errors: NonZeroU32,
sync_id: ZTenantTimelineId,
batch: SyncTaskBatch,
) -> DownloadMarker
) -> DownloadStatus
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
@@ -1047,66 +1053,71 @@ where
// When operating in a system without tasks failing over the error threshold,
// current batching and task processing systems aim to update the layer set and metadata files (remote and local),
// without "losing" such layer files.
let (upload_result, status_update) = tokio::join!(
let (upload_status, download_status) = tokio::join!(
async {
if let Some(upload_data) = upload_data {
match validate_task_retries(upload_data, max_sync_errors)
let upload_retries = upload_data.retries;
match validate_task_retries(upload_retries, max_sync_errors)
.instrument(info_span!("retries_validation"))
.await
{
ControlFlow::Continue(new_upload_data) => {
ControlFlow::Continue(()) => {
upload_timeline_data(
conf,
(storage.as_ref(), &index, sync_queue),
current_remote_timeline.as_ref(),
sync_id,
new_upload_data,
upload_data,
sync_start,
"upload",
)
.await;
return Some(());
}
ControlFlow::Break(failed_upload_data) => {
if let Err(e) = update_remote_data(
conf,
storage.as_ref(),
&index,
sync_id,
RemoteDataUpdate::Upload {
uploaded_data: failed_upload_data.data,
upload_failed: true,
},
)
.await
{
error!("Failed to update remote timeline {sync_id}: {e:?}");
}
}
ControlFlow::Break(()) => match update_remote_data(
conf,
storage.as_ref(),
&index,
sync_id,
RemoteDataUpdate::Upload {
uploaded_data: upload_data.data,
upload_failed: true,
},
)
.await
{
Ok(()) => UploadStatus::Failed(anyhow::anyhow!(
"Aborted after retries validation, current retries: {upload_retries}, max retries allowed: {max_sync_errors}"
)),
Err(e) => {
error!("Failed to update remote timeline {sync_id}: {e:?}");
UploadStatus::Failed(e)
}
},
}
} else {
UploadStatus::Nothing
}
None
}
.instrument(info_span!("upload_timeline_data")),
async {
if let Some(download_data) = download_data {
match validate_task_retries(download_data, max_sync_errors)
match validate_task_retries(download_data.retries, max_sync_errors)
.instrument(info_span!("retries_validation"))
.await
{
ControlFlow::Continue(new_download_data) => {
ControlFlow::Continue(()) => {
return download_timeline_data(
conf,
(storage.as_ref(), &index, sync_queue),
current_remote_timeline.as_ref(),
sync_id,
new_download_data,
download_data,
sync_start,
"download",
)
.await;
}
ControlFlow::Break(_) => {
ControlFlow::Break(()) => {
index
.write()
.await
@@ -1115,51 +1126,53 @@ where
}
}
}
DownloadMarker::Nothing
DownloadStatus::Nothing
}
.instrument(info_span!("download_timeline_data")),
);
if let Some(mut delete_data) = batch.delete {
if upload_result.is_some() {
match validate_task_retries(delete_data, max_sync_errors)
.instrument(info_span!("retries_validation"))
.await
{
ControlFlow::Continue(new_delete_data) => {
delete_timeline_data(
conf,
(storage.as_ref(), &index, sync_queue),
sync_id,
new_delete_data,
sync_start,
"delete",
)
.instrument(info_span!("delete_timeline_data"))
.await;
}
ControlFlow::Break(failed_delete_data) => {
if let Err(e) = update_remote_data(
conf,
storage.as_ref(),
&index,
sync_id,
RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
)
if let Some(delete_data) = batch.delete {
match upload_status {
UploadStatus::Uploaded | UploadStatus::Nothing => {
match validate_task_retries(delete_data.retries, max_sync_errors)
.instrument(info_span!("retries_validation"))
.await
{
error!("Failed to update remote timeline {sync_id}: {e:?}");
{
ControlFlow::Continue(()) => {
delete_timeline_data(
conf,
(storage.as_ref(), &index, sync_queue),
sync_id,
delete_data,
sync_start,
"delete",
)
.instrument(info_span!("delete_timeline_data"))
.await;
}
ControlFlow::Break(()) => {
if let Err(e) = update_remote_data(
conf,
storage.as_ref(),
&index,
sync_id,
RemoteDataUpdate::Delete(&delete_data.data.deleted_layers),
)
.await
{
error!("Failed to update remote timeline {sync_id}: {e:?}");
}
}
}
}
} else {
delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
UploadStatus::Failed(e) => {
warn!("Skipping delete task due to failed upload tasks, reenqueuing. Upload data: {:?}, delete data: {delete_data:?}. Upload failure: {e:#}", batch.upload);
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
}
}
}
status_update
download_status
}
async fn download_timeline_data<P, S>(
@@ -1170,7 +1183,7 @@ async fn download_timeline_data<P, S>(
new_download_data: SyncData<LayersDownload>,
sync_start: Instant,
task_name: &str,
) -> DownloadMarker
) -> DownloadStatus
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
@@ -1199,7 +1212,7 @@ where
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
return DownloadMarker::Downloaded;
return DownloadStatus::Downloaded;
}
Err(e) => {
error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}");
@@ -1215,7 +1228,7 @@ where
}
}
DownloadMarker::Nothing
DownloadStatus::Nothing
}
async fn update_local_metadata(
@@ -1338,7 +1351,8 @@ async fn upload_timeline_data<P, S>(
new_upload_data: SyncData<LayersUpload>,
sync_start: Instant,
task_name: &str,
) where
) -> UploadStatus
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
@@ -1351,9 +1365,9 @@ async fn upload_timeline_data<P, S>(
)
.await
{
UploadedTimeline::FailedAndRescheduled => {
UploadedTimeline::FailedAndRescheduled(e) => {
register_sync_status(sync_id, sync_start, task_name, Some(false));
return;
return UploadStatus::Failed(e);
}
UploadedTimeline::Successful(upload_data) => upload_data,
};
@@ -1372,12 +1386,14 @@ async fn upload_timeline_data<P, S>(
{
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
UploadStatus::Uploaded
}
Err(e) => {
error!("Failed to update remote timeline {sync_id}: {e:?}");
uploaded_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
register_sync_status(sync_id, sync_start, task_name, Some(false));
UploadStatus::Failed(e)
}
}
}
@@ -1480,25 +1496,17 @@ where
.context("Failed to upload new index part")
}
async fn validate_task_retries<T>(
sync_data: SyncData<T>,
async fn validate_task_retries(
current_attempt: u32,
max_sync_errors: NonZeroU32,
) -> ControlFlow<SyncData<T>, SyncData<T>> {
let current_attempt = sync_data.retries;
) -> ControlFlow<(), ()> {
let max_sync_errors = max_sync_errors.get();
if current_attempt >= max_sync_errors {
error!(
"Aborting task that failed {current_attempt} times, exceeding retries threshold of {max_sync_errors}",
);
return ControlFlow::Break(sync_data);
return ControlFlow::Break(());
}
if current_attempt > 0 {
let seconds_to_wait = 2.0_f64.powf(current_attempt as f64 - 1.0).min(30.0);
info!("Waiting {seconds_to_wait} seconds before starting the task");
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
}
ControlFlow::Continue(sync_data)
exponential_backoff(current_attempt, 1.0, 30.0).await;
ControlFlow::Continue(())
}
fn schedule_first_sync_tasks(

View File

@@ -95,6 +95,8 @@ where
debug!("Reenqueuing failed delete task for timeline {sync_id}");
delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
} else {
info!("Successfully deleted all layers");
}
errored
}

View File

@@ -130,6 +130,7 @@ where
tenant_path.display()
)
})?;
let timelines = storage
.list_prefixes(Some(tenant_storage_path))
.await
@@ -140,6 +141,13 @@ where
)
})?;
if timelines.is_empty() {
anyhow::bail!(
"no timelines found on the remote storage for tenant {}",
tenant_id
)
}
let mut sync_ids = HashSet::new();
for timeline_remote_storage_key in timelines {

View File

@@ -4,7 +4,7 @@ use std::{fmt::Debug, path::PathBuf};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use remote_storage::RemoteStorage;
use tokio::fs;
use tracing::{debug, error, info, warn};
@@ -20,14 +20,14 @@ use crate::{
};
use metrics::{register_int_counter_vec, IntCounterVec};
lazy_static! {
static ref NO_LAYERS_UPLOAD: IntCounterVec = register_int_counter_vec!(
static NO_LAYERS_UPLOAD: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_storage_no_layers_uploads_total",
"Number of skipped uploads due to no layers",
&["tenant_id", "timeline_id"],
)
.expect("failed to register pageserver no layers upload vec");
}
.expect("failed to register pageserver no layers upload vec")
});
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<P, S>(
@@ -75,7 +75,7 @@ where
#[derive(Debug)]
pub(super) enum UploadedTimeline {
/// Upload failed due to some error, the upload task is rescheduled for another retry.
FailedAndRescheduled,
FailedAndRescheduled(anyhow::Error),
/// No issues happened during the upload, all task files were put into the remote storage.
Successful(SyncData<LayersUpload>),
}
@@ -179,7 +179,7 @@ where
})
.collect::<FuturesUnordered<_>>();
let mut errors_happened = false;
let mut errors = Vec::new();
while let Some(upload_result) = upload_tasks.next().await {
match upload_result {
Ok(uploaded_path) => {
@@ -188,13 +188,13 @@ where
}
Err(e) => match e {
UploadError::Other(e) => {
errors_happened = true;
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
errors.push(format!("{e:#}"));
}
UploadError::MissingLocalFile(source_path, e) => {
if source_path.exists() {
errors_happened = true;
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
errors.push(format!("{e:#}"));
} else {
// We have run the upload sync task, but the file we wanted to upload is gone.
// This is "fine" due the asynchronous nature of the sync loop: it only reacts to events and might need to
@@ -217,14 +217,17 @@ where
}
}
if errors_happened {
if errors.is_empty() {
info!("Successfully uploaded all layers");
UploadedTimeline::Successful(upload_data)
} else {
debug!("Reenqueuing failed upload task for timeline {sync_id}");
upload_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Upload(upload_data));
UploadedTimeline::FailedAndRescheduled
} else {
info!("Successfully uploaded all layers");
UploadedTimeline::Successful(upload_data)
UploadedTimeline::FailedAndRescheduled(anyhow::anyhow!(
"Errors appeared during layer uploads: {:?}",
errors
))
}
}

View File

@@ -23,6 +23,7 @@ pub mod defaults {
// which is good for now to trigger bugs.
// This parameter actually determines L0 layer file size.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
@@ -36,7 +37,7 @@ pub mod defaults {
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
}
@@ -48,6 +49,9 @@ pub struct TenantConf {
// page server crashes.
// This parameter actually determines L0 layer file size.
pub checkpoint_distance: u64,
// Inmemory layer is also flushed at least once in checkpoint_timeout to
// eventually upload WAL after activity is stopped.
pub checkpoint_timeout: Duration,
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
@@ -90,6 +94,7 @@ pub struct TenantConf {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct TenantConfOpt {
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<Duration>,
pub compaction_target_size: Option<u64>,
#[serde(with = "humantime_serde")]
pub compaction_period: Option<Duration>,
@@ -113,6 +118,9 @@ impl TenantConfOpt {
checkpoint_distance: self
.checkpoint_distance
.unwrap_or(global_conf.checkpoint_distance),
checkpoint_timeout: self
.checkpoint_timeout
.unwrap_or(global_conf.checkpoint_timeout),
compaction_target_size: self
.compaction_target_size
.unwrap_or(global_conf.compaction_target_size),
@@ -142,6 +150,9 @@ impl TenantConfOpt {
if let Some(checkpoint_distance) = other.checkpoint_distance {
self.checkpoint_distance = Some(checkpoint_distance);
}
if let Some(checkpoint_timeout) = other.checkpoint_timeout {
self.checkpoint_timeout = Some(checkpoint_timeout);
}
if let Some(compaction_target_size) = other.compaction_target_size {
self.compaction_target_size = Some(compaction_target_size);
}
@@ -181,6 +192,8 @@ impl TenantConf {
TenantConf {
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_timeout: humantime::parse_duration(DEFAULT_CHECKPOINT_TIMEOUT)
.expect("cannot parse default checkpoint timeout"),
compaction_target_size: DEFAULT_COMPACTION_TARGET_SIZE,
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
@@ -212,6 +225,7 @@ impl TenantConf {
pub fn dummy_conf() -> Self {
TenantConf {
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_timeout: Duration::from_secs(600),
compaction_target_size: 4 * 1024 * 1024,
compaction_period: Duration::from_secs(10),
compaction_threshold: defaults::DEFAULT_COMPACTION_THRESHOLD,

Some files were not shown because too many files have changed in this diff Show More